AsyncExecutor API¶
The antflow.executor module provides a familiar interface for concurrent async execution, modeled after Python's concurrent.futures.
Overview¶
The AsyncExecutor manages a pool of workers to execute async functions concurrently. It is ideal for simple parallel processing tasks where you don't need the full complexity of a multi-stage pipeline.
Key features:
- submit(): Schedule a single task.
- map(): Apply a function to an iterable concurrently.
- as_completed(): Iterate over futures as they finish.
- wait(): Wait for a collection of futures with flexible conditions.
Usage Example¶
import asyncio
from antflow import AsyncExecutor
async def process_item(x):
await asyncio.sleep(0.1)
return x * 2
async def main():
# Use as a context manager
async with AsyncExecutor(max_workers=5) as executor:
# 1. Submit a single task
future = executor.submit(process_item, 10)
result = await future.result()
print(f"Result: {result}")
# 2. Map over a list
async for res in executor.map(process_item, range(5)):
print(f"Mapped: {res}")
asyncio.run(main())
Class Reference¶
AsyncFuture¶
AsyncFuture
¶
An async-compatible future that holds the result of an async task. Similar to concurrent.futures.Future but for asyncio.
set_exception
¶
Set an exception and mark the future as done.
result
async
¶
Wait for and return the result.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
timeout
|
float | None
|
Maximum time to wait in seconds |
None
|
Returns:
| Type | Description |
|---|---|
Any
|
The task result |
Raises:
| Type | Description |
|---|---|
TimeoutError
|
If timeout is exceeded |
Exception
|
The exception set by set_exception() |
AsyncExecutor¶
AsyncExecutor
¶
An async executor with concurrent.futures-style API. Manages a pool of workers that execute async tasks concurrently.
Initialize the executor.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
max_workers
|
int
|
Maximum number of concurrent workers |
5
|
submit
¶
submit(fn: Callable[..., Any], *args: Any, retries: int = 0, retry_delay: float = 0.1, semaphore: Semaphore | None = None, **kwargs: Any) -> AsyncFuture
Submit a task for execution.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
fn
|
Callable[..., Any]
|
Async callable to execute |
required |
*args
|
Any
|
Positional arguments for fn |
()
|
retries
|
int
|
Number of retries on failure |
0
|
retry_delay
|
float
|
Delay between retries in seconds |
0.1
|
semaphore
|
Semaphore | None
|
Optional semaphore to limit concurrency |
None
|
**kwargs
|
Any
|
Keyword arguments for fn |
{}
|
Returns:
| Type | Description |
|---|---|
AsyncFuture
|
AsyncFuture that can be awaited for the result |
Raises:
| Type | Description |
|---|---|
ExecutorShutdownError
|
If executor has been shut down |
map
async
¶
map(fn: Callable[[T], Any], *iterables: Iterable[T], timeout: float | None = None, retries: int = 0, retry_delay: float = 0.1, max_concurrency: int | None = None) -> AsyncIterator[R]
Map an async function over iterables, yielding results in input order.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
fn
|
Callable[[T], Any]
|
Async callable to map |
required |
*iterables
|
Iterable[T]
|
Iterables to map over |
()
|
timeout
|
float | None
|
Maximum time to wait for each result |
None
|
retries
|
int
|
Number of retries on failure |
0
|
retry_delay
|
float
|
Delay between retries in seconds |
0.1
|
max_concurrency
|
int | None
|
Maximum number of concurrent executions for this map call |
None
|
Yields:
| Type | Description |
|---|---|
AsyncIterator[R]
|
Results from fn applied to each input |
Raises:
| Type | Description |
|---|---|
ExecutorShutdownError
|
If executor has been shut down |
as_completed
async
¶
as_completed(futures: list[AsyncFuture], timeout: float | None = None) -> AsyncIterator[AsyncFuture]
Yield futures as they complete.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
futures
|
list[AsyncFuture]
|
List of futures to wait for |
required |
timeout
|
float | None
|
Maximum time to wait for all futures |
None
|
Yields:
| Type | Description |
|---|---|
AsyncIterator[AsyncFuture]
|
Futures as they complete |
Raises:
| Type | Description |
|---|---|
TimeoutError
|
If timeout is exceeded |
wait
async
¶
wait(futures: Iterable[AsyncFuture[R]], timeout: float | None = None, return_when: WaitStrategy = WaitStrategy.ALL_COMPLETED) -> Tuple[Set[AsyncFuture[R]], Set[AsyncFuture[R]]]
Wait for futures to complete with different strategies.
Similar to concurrent.futures.wait() but for async operations.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
futures
|
Iterable[AsyncFuture[R]]
|
Iterable of AsyncFuture objects to wait for |
required |
timeout
|
float | None
|
Maximum time to wait in seconds |
None
|
return_when
|
WaitStrategy
|
Strategy for when to return: - FIRST_COMPLETED: Return when any future completes - FIRST_EXCEPTION: Return when any future raises an exception - ALL_COMPLETED: Return when all futures complete (default) |
ALL_COMPLETED
|
Returns:
| Type | Description |
|---|---|
Tuple[Set[AsyncFuture[R]], Set[AsyncFuture[R]]]
|
Tuple of (done, not_done) sets of futures |
Raises:
| Type | Description |
|---|---|
TimeoutError
|
If timeout is exceeded (with ALL_COMPLETED) |
shutdown
async
¶
Shut down the executor.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
wait
|
bool
|
If True, wait for all pending tasks to complete |
True
|
cancel_futures
|
bool
|
If True, cancel all pending futures |
False
|
__aexit__
async
¶
Context manager exit with automatic shutdown.