AsyncExecutor API¶
The antflow.executor module provides a familiar interface for concurrent async execution, modeled after Python's concurrent.futures.
Overview¶
The AsyncExecutor manages a pool of workers to execute async functions concurrently. It is ideal for simple parallel processing tasks where you don't need the full complexity of a multi-stage pipeline.
Key features:
submit(): Schedule a single task.map(): Apply a function to an iterable and return results as a list.map_iter(): Apply a function to an iterable, yielding results (async iterator).as_completed(): Iterate over futures as they finish.wait(): Wait for a collection of futures with flexible conditions.
Usage Example¶
import asyncio
from antflow import AsyncExecutor
async def process_item(x):
await asyncio.sleep(0.1)
return x * 2
async def main():
# Use as a context manager
async with AsyncExecutor(max_workers=5) as executor:
# 1. Submit a single task
future = executor.submit(process_item, 10)
result = await future.result()
print(f"Result: {result}")
# 2. Map over a list - returns list directly
results = await executor.map(process_item, range(5))
print(f"Results: {results}")
# 3. Map with streaming (async iterator)
async for res in executor.map_iter(process_item, range(5)):
print(f"Streamed: {res}")
asyncio.run(main())
Class Reference¶
AsyncFuture¶
AsyncFuture
¶
An async-compatible future that holds the result of an async task. Similar to concurrent.futures.Future but for asyncio.
set_exception
¶
Set an exception and mark the future as done.
result
async
¶
Wait for and return the result.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
timeout
|
float | None
|
Maximum time to wait in seconds |
None
|
Returns:
| Type | Description |
|---|---|
Any
|
The task result |
Raises:
| Type | Description |
|---|---|
TimeoutError
|
If timeout is exceeded |
Exception
|
The exception set by set_exception() |
AsyncExecutor¶
AsyncExecutor
¶
An async executor with concurrent.futures-style API. Manages a pool of workers that execute async tasks concurrently.
Initialize the executor.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
max_workers
|
int
|
Maximum number of concurrent workers |
5
|
submit
¶
submit(fn: Callable[..., Any], *args: Any, retries: int = 0, retry_delay: float = 0.1, semaphore: Semaphore | None = None, **kwargs: Any) -> AsyncFuture
Submit a task for execution.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
fn
|
Callable[..., Any]
|
Async callable to execute |
required |
*args
|
Any
|
Positional arguments for fn |
()
|
retries
|
int
|
Number of retries on failure |
0
|
retry_delay
|
float
|
Delay between retries in seconds |
0.1
|
semaphore
|
Semaphore | None
|
Optional semaphore to limit concurrency |
None
|
**kwargs
|
Any
|
Keyword arguments for fn |
{}
|
Returns:
| Type | Description |
|---|---|
AsyncFuture
|
AsyncFuture that can be awaited for the result |
Raises:
| Type | Description |
|---|---|
ExecutorShutdownError
|
If executor has been shut down |
map
async
¶
map(fn: Callable[[T], Any], *iterables: Iterable[T], timeout: float | None = None, retries: int = 0, retry_delay: float = 0.1) -> List[R]
Map an async function over iterables and return results as a list.
Similar to concurrent.futures.Executor.map(), but returns a list directly
instead of an iterator (since list(executor.map(...)) is the common pattern).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
fn
|
Callable[[T], Any]
|
Async callable to map |
required |
*iterables
|
Iterable[T]
|
Iterables to map over |
()
|
timeout
|
float | None
|
Maximum time to wait for each result |
None
|
retries
|
int
|
Number of retries on failure |
0
|
retry_delay
|
float
|
Delay between retries in seconds |
0.1
|
Returns:
| Type | Description |
|---|---|
List[R]
|
List of results from fn applied to each input, in input order |
Raises:
| Type | Description |
|---|---|
ExecutorShutdownError
|
If executor has been shut down |
map_iter
async
¶
map_iter(fn: Callable[[T], Any], *iterables: Iterable[T], timeout: float | None = None, retries: int = 0, retry_delay: float = 0.1) -> AsyncIterator[R]
Map an async function over iterables, yielding results in input order.
Use this instead of map() when you need streaming behavior:
- Process results as they arrive
- Handle large datasets without loading all results into memory
- Early exit when a condition is met
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
fn
|
Callable[[T], Any]
|
Async callable to map |
required |
*iterables
|
Iterable[T]
|
Iterables to map over |
()
|
timeout
|
float | None
|
Maximum time to wait for each result |
None
|
retries
|
int
|
Number of retries on failure |
0
|
retry_delay
|
float
|
Delay between retries in seconds |
0.1
|
Yields:
| Type | Description |
|---|---|
AsyncIterator[R]
|
Results from fn applied to each input |
Raises:
| Type | Description |
|---|---|
ExecutorShutdownError
|
If executor has been shut down |
as_completed
async
¶
as_completed(futures: list[AsyncFuture], timeout: float | None = None) -> AsyncIterator[AsyncFuture]
Yield futures as they complete.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
futures
|
list[AsyncFuture]
|
List of futures to wait for |
required |
timeout
|
float | None
|
Maximum time to wait for all futures |
None
|
Yields:
| Type | Description |
|---|---|
AsyncIterator[AsyncFuture]
|
Futures as they complete |
Raises:
| Type | Description |
|---|---|
TimeoutError
|
If timeout is exceeded |
wait
async
¶
wait(futures: Iterable[AsyncFuture[R]], timeout: float | None = None, return_when: WaitStrategy = WaitStrategy.ALL_COMPLETED) -> Tuple[Set[AsyncFuture[R]], Set[AsyncFuture[R]]]
Wait for futures to complete with different strategies.
Similar to concurrent.futures.wait() but for async operations.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
futures
|
Iterable[AsyncFuture[R]]
|
Iterable of AsyncFuture objects to wait for |
required |
timeout
|
float | None
|
Maximum time to wait in seconds |
None
|
return_when
|
WaitStrategy
|
Strategy for when to return: - FIRST_COMPLETED: Return when any future completes - FIRST_EXCEPTION: Return when any future raises an exception - ALL_COMPLETED: Return when all futures complete (default) |
ALL_COMPLETED
|
Returns:
| Type | Description |
|---|---|
Tuple[Set[AsyncFuture[R]], Set[AsyncFuture[R]]]
|
Tuple of (done, not_done) sets of futures |
Raises:
| Type | Description |
|---|---|
TimeoutError
|
If timeout is exceeded (with ALL_COMPLETED) |
shutdown
async
¶
Shut down the executor.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
wait
|
bool
|
If True, wait for all pending tasks to complete |
True
|
cancel_futures
|
bool
|
If True, cancel all pending futures |
False
|
__aexit__
async
¶
Context manager exit with automatic shutdown.