Skip to content

AsyncExecutor API

The antflow.executor module provides a familiar interface for concurrent async execution, modeled after Python's concurrent.futures.

Overview

The AsyncExecutor manages a pool of workers to execute async functions concurrently. It is ideal for simple parallel processing tasks where you don't need the full complexity of a multi-stage pipeline.

Key features: - submit(): Schedule a single task. - map(): Apply a function to an iterable concurrently. - as_completed(): Iterate over futures as they finish. - wait(): Wait for a collection of futures with flexible conditions.

Usage Example

import asyncio
from antflow import AsyncExecutor

async def process_item(x):
    await asyncio.sleep(0.1)
    return x * 2

async def main():
    # Use as a context manager
    async with AsyncExecutor(max_workers=5) as executor:

        # 1. Submit a single task
        future = executor.submit(process_item, 10)
        result = await future.result()
        print(f"Result: {result}")

        # 2. Map over a list
        async for res in executor.map(process_item, range(5)):
            print(f"Mapped: {res}")

asyncio.run(main())

Class Reference

AsyncFuture

AsyncFuture

AsyncFuture(sequence_id: int)

An async-compatible future that holds the result of an async task. Similar to concurrent.futures.Future but for asyncio.

set_result

set_result(result: Any) -> None

Set the result and mark the future as done.

set_exception

set_exception(exception: Exception) -> None

Set an exception and mark the future as done.

result async

result(timeout: float | None = None) -> Any

Wait for and return the result.

Parameters:

Name Type Description Default
timeout float | None

Maximum time to wait in seconds

None

Returns:

Type Description
Any

The task result

Raises:

Type Description
TimeoutError

If timeout is exceeded

Exception

The exception set by set_exception()

done

done() -> bool

Return True if the future is done.

exception

exception() -> Exception | None

Return the exception set on this future, or None.

AsyncExecutor

AsyncExecutor

AsyncExecutor(max_workers: int = 5)

An async executor with concurrent.futures-style API. Manages a pool of workers that execute async tasks concurrently.

Initialize the executor.

Parameters:

Name Type Description Default
max_workers int

Maximum number of concurrent workers

5

submit

submit(fn: Callable[..., Any], *args: Any, retries: int = 0, retry_delay: float = 0.1, semaphore: Semaphore | None = None, **kwargs: Any) -> AsyncFuture

Submit a task for execution.

Parameters:

Name Type Description Default
fn Callable[..., Any]

Async callable to execute

required
*args Any

Positional arguments for fn

()
retries int

Number of retries on failure

0
retry_delay float

Delay between retries in seconds

0.1
semaphore Semaphore | None

Optional semaphore to limit concurrency

None
**kwargs Any

Keyword arguments for fn

{}

Returns:

Type Description
AsyncFuture

AsyncFuture that can be awaited for the result

Raises:

Type Description
ExecutorShutdownError

If executor has been shut down

map async

map(fn: Callable[[T], Any], *iterables: Iterable[T], timeout: float | None = None, retries: int = 0, retry_delay: float = 0.1, max_concurrency: int | None = None) -> AsyncIterator[R]

Map an async function over iterables, yielding results in input order.

Parameters:

Name Type Description Default
fn Callable[[T], Any]

Async callable to map

required
*iterables Iterable[T]

Iterables to map over

()
timeout float | None

Maximum time to wait for each result

None
retries int

Number of retries on failure

0
retry_delay float

Delay between retries in seconds

0.1
max_concurrency int | None

Maximum number of concurrent executions for this map call

None

Yields:

Type Description
AsyncIterator[R]

Results from fn applied to each input

Raises:

Type Description
ExecutorShutdownError

If executor has been shut down

as_completed async

as_completed(futures: list[AsyncFuture], timeout: float | None = None) -> AsyncIterator[AsyncFuture]

Yield futures as they complete.

Parameters:

Name Type Description Default
futures list[AsyncFuture]

List of futures to wait for

required
timeout float | None

Maximum time to wait for all futures

None

Yields:

Type Description
AsyncIterator[AsyncFuture]

Futures as they complete

Raises:

Type Description
TimeoutError

If timeout is exceeded

wait async

wait(futures: Iterable[AsyncFuture[R]], timeout: float | None = None, return_when: WaitStrategy = WaitStrategy.ALL_COMPLETED) -> Tuple[Set[AsyncFuture[R]], Set[AsyncFuture[R]]]

Wait for futures to complete with different strategies.

Similar to concurrent.futures.wait() but for async operations.

Parameters:

Name Type Description Default
futures Iterable[AsyncFuture[R]]

Iterable of AsyncFuture objects to wait for

required
timeout float | None

Maximum time to wait in seconds

None
return_when WaitStrategy

Strategy for when to return: - FIRST_COMPLETED: Return when any future completes - FIRST_EXCEPTION: Return when any future raises an exception - ALL_COMPLETED: Return when all futures complete (default)

ALL_COMPLETED

Returns:

Type Description
Tuple[Set[AsyncFuture[R]], Set[AsyncFuture[R]]]

Tuple of (done, not_done) sets of futures

Raises:

Type Description
TimeoutError

If timeout is exceeded (with ALL_COMPLETED)

Example
futures = [executor.submit(task, i) for i in range(10)]
done, pending = await executor.wait(
    futures,
    return_when=WaitStrategy.FIRST_EXCEPTION
)

shutdown async

shutdown(wait: bool = True, cancel_futures: bool = False) -> None

Shut down the executor.

Parameters:

Name Type Description Default
wait bool

If True, wait for all pending tasks to complete

True
cancel_futures bool

If True, cancel all pending futures

False

__aenter__ async

__aenter__() -> 'AsyncExecutor'

Context manager entry.

__aexit__ async

__aexit__(exc_type, exc_val, exc_tb) -> None

Context manager exit with automatic shutdown.