Concurrency Control¶
AntFlow provides multiple levels of concurrency control to help you manage resources, respect rate limits, and optimize throughput.
Levels of Control¶
- Worker Pool: Limits total concurrent tasks in an executor or stage.
- Map Concurrency: Limits concurrent items processed within a single
.map()call. - Task Limits: Limits specific task functions within a Pipeline Stage.
- Manual Semaphores: Granular control for individual
.submit()calls.
1. Worker Pool (Global Limit)¶
The most basic control is the number of workers. This sets the hard limit on how many tasks can run in parallel for that executor or stage.
In AsyncExecutor¶
from antflow import AsyncExecutor
# Maximum 10 tasks running at once
async with AsyncExecutor(max_workers=10) as executor:
...
In Pipeline Stage¶
from antflow import Stage
# This stage will never run more than 5 tasks simultaneously
stage = Stage(
name="Processing",
workers=5,
tasks=[my_task]
)
2. Map Concurrency (Batch Limit)¶
When processing a large batch of items with .map(), you might want to limit concurrency just for that batch, even if the executor has more workers available.
Use Case: You have 100 workers for general tasks, but you want to process a specific list of 1000 URLs with only 50 concurrent connections.
async with AsyncExecutor(max_workers=100) as executor:
# Process 1000 items, but only 50 at a time
async for result in executor.map(
fetch_url,
urls,
max_concurrency=50
):
print(result)
3. Pipeline Task Limits (Granular Rate Limiting)¶
In a Pipeline, a Stage might have multiple tasks (e.g., validate -> fetch_api -> save_db). You might have 50 workers to keep validate and save_db fast, but fetch_api has a strict rate limit.
Use Case: High throughput stage, but one specific function must be rate-limited.
stage = Stage(
name="Enrichment",
workers=50, # High capacity for CPU tasks
tasks=[validate, fetch_from_api, save_to_db],
# Only 'fetch_from_api' is limited to 5 concurrent executions
task_concurrency_limits={
"fetch_from_api": 5
}
)
- Key Benefit: Workers are not blocked from doing other tasks (
validate,save_to_db) while waiting for the API limit.
4. Manual Semaphores (Custom Control)¶
For complex workflows using .submit(), you can pass a shared asyncio.Semaphore to group tasks under a single limit.
Use Case: You are submitting different types of tasks manually and want to limit a specific subset of them.
import asyncio
from antflow import AsyncExecutor
async def main():
# Create a semaphore for a specific resource (e.g., DB connection)
db_semaphore = asyncio.Semaphore(10)
async with AsyncExecutor(max_workers=100) as executor:
futures = []
# Submit tasks that need the DB
for item in items:
f = executor.submit(
db_task,
item,
semaphore=db_semaphore # Share the limit
)
futures.append(f)
# Submit other tasks that don't need the DB (unlimited up to max_workers)
executor.submit(cpu_task, item)