Pipeline API¶
The antflow.pipeline module provides the core functionality for building and running multi-stage asynchronous workflows.
Overview¶
A Pipeline consists of a sequence of Stages. Each stage has its own pool of workers and processes items independently. Data flows from one stage to the next automatically.
Key components:
- Pipeline: The main orchestrator that manages stages and data flow.
- Stage: Configuration for a single processing step, including worker count and retry logic.
Usage Example¶
import asyncio
from antflow import Pipeline, Stage
async def fetch_data(url):
# ... fetch logic ...
return data
async def process_data(data):
# ... process logic ...
return result
async def main():
# Define stages
stage1 = Stage(name="Fetch", workers=5, tasks=[fetch_data])
stage2 = Stage(name="Process", workers=2, tasks=[process_data])
# Create pipeline
pipeline = Pipeline(stages=[stage1, stage2])
# Run
urls = ["http://example.com/1", "http://example.com/2"]
results = await pipeline.run(urls)
for result in results:
print(f"ID: {result.id}, Value: {result.value}")
Class Reference¶
Stage¶
Stage
dataclass
¶
Stage(name: str, workers: int, tasks: Sequence[TaskFunc], retry: str = 'per_task', task_attempts: int = 3, task_wait_seconds: float = 1.0, stage_attempts: int = 3, unpack_args: bool = False, task_concurrency_limits: Dict[str, int] = dict(), on_success: Optional[Callable[[Any, Any, Dict[str, Any]], Any]] = None, on_failure: Optional[Callable[[Any, Exception, Dict[str, Any]], Any]] = None, skip_if: Optional[Callable[[Any], bool]] = None)
A stage in the pipeline that processes items through a sequence of tasks.
validate
¶
Validate stage configuration.
Raises:
| Type | Description |
|---|---|
StageValidationError
|
If configuration is invalid |
Pipeline¶
Pipeline
¶
Pipeline(stages: List[Stage], collect_results: bool = True, status_tracker: Optional[StatusTracker] = None)
A multi-stage async pipeline with worker pools and flexible retry strategies.
Initialize the pipeline.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
stages
|
List[Stage]
|
List of Stage objects defining the pipeline |
required |
collect_results
|
bool
|
If True, collects results from the final stage into |
True
|
status_tracker
|
Optional[StatusTracker]
|
Optional StatusTracker for monitoring item status |
None
|
Raises:
| Type | Description |
|---|---|
PipelineError
|
If stages list is empty |
StageValidationError
|
If any stage configuration is invalid |
results
property
¶
Get collected results, sorted by original input sequence.
Only contains data if collect_results=True was passed to __init__.
get_stats
¶
Get current pipeline statistics.
Returns:
| Type | Description |
|---|---|
PipelineStats
|
PipelineStats with current metrics |
get_worker_names
¶
Get all worker names organized by stage.
Useful for tracking which workers exist before pipeline runs.
Returns:
| Type | Description |
|---|---|
Dict[str, List[str]]
|
Dictionary mapping stage name to list of worker names |
Example:
```python
pipeline = Pipeline(stages=[
Stage(name="Fetch", workers=3, tasks=[fetch]),
Stage(name="Process", workers=2, tasks=[process])
])
pipeline.get_worker_names()
# {
# "Fetch": ["Fetch-W0", "Fetch-W1", "Fetch-W2"],
# "Process": ["Process-W0", "Process-W1"]
# }
```
get_worker_states
¶
Get current state of all workers.
Returns:
| Type | Description |
|---|---|
Dict[str, WorkerState]
|
Dictionary mapping worker name to WorkerState |
get_worker_metrics
¶
Get performance metrics for all workers.
Returns:
| Type | Description |
|---|---|
Dict[str, WorkerMetrics]
|
Dictionary mapping worker name to WorkerMetrics |
get_dashboard_snapshot
¶
Get complete dashboard snapshot with all current state.
Returns:
| Type | Description |
|---|---|
DashboardSnapshot
|
DashboardSnapshot with worker states, metrics, and pipeline stats |
feed
async
¶
Feed items into a specific stage of the pipeline.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
items
|
Sequence[Any]
|
Sequence of items to process |
required |
target_stage
|
Optional[str]
|
Name of the stage to inject items into. If None, feeds into the first stage. |
None
|
priority
|
int
|
Priority level (lower = higher priority). Default 100. |
100
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If target_stage is provided but not found. |
feed_async
async
¶
feed_async(items: AsyncIterable[Any], target_stage: Optional[str] = None, priority: int = 100) -> None
Feed items from an async iterable into a specific stage.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
items
|
AsyncIterable[Any]
|
Async iterable of items to process |
required |
target_stage
|
Optional[str]
|
Name of the stage to inject items into. If None, feeds into the first stage. |
None
|
priority
|
int
|
Priority level (lower = higher priority). Default 100. |
100
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If target_stage is provided but not found. |
start
async
¶
Start the pipeline workers in the background.
This method initializes the worker pool and starts processing items immediately
as they are available in the queues. Use feed() to add items.
Raises:
| Type | Description |
|---|---|
PipelineError
|
If pipeline is already running |
join
async
¶
Wait for all enqueued items to be processed and stop workers.
This method: 1. Waits for all queues to be empty (all items processed) 2. Signals workers to stop 3. Waits for the worker pool to shutdown
run
async
¶
Run the pipeline end-to-end with the given items.
This is a convenience wrapper around start(), feed(), and join().
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
items
|
Sequence[Any]
|
Items to process through the pipeline. |
required |
Returns:
| Type | Description |
|---|---|
List[PipelineResult]
|
List of PipelineResult objects. |
shutdown
async
¶
Shut down the pipeline gracefully or forcefully.
If queues are not empty, this might leave items unprocessed depending on how workers react to stop_event.
__aexit__
async
¶
Context manager exit with automatic shutdown.