Pipeline API¶
The antflow.pipeline module provides the core functionality for building and running multi-stage asynchronous workflows.
Overview¶
A Pipeline consists of a sequence of Stages. Each stage has its own pool of workers and processes items independently. Data flows from one stage to the next automatically.
Key components:
- Pipeline: The main orchestrator that manages stages and data flow.
- Stage: Configuration for a single processing step, including worker count and retry logic.
Usage Example¶
import asyncio
from antflow import Pipeline, Stage
async def fetch_data(url):
# ... fetch logic ...
return data
async def process_data(data):
# ... process logic ...
return result
async def main():
# Define stages
stage1 = Stage(name="Fetch", workers=5, tasks=[fetch_data])
stage2 = Stage(name="Process", workers=2, tasks=[process_data])
# Create pipeline
pipeline = Pipeline(stages=[stage1, stage2])
# Run
urls = ["http://example.com/1", "http://example.com/2"]
results = await pipeline.run(urls)
for result in results:
print(f"ID: {result.id}, Value: {result.value}")
Class Reference¶
Stage¶
Stage
dataclass
¶
Stage(name: str, workers: int, tasks: Sequence[TaskFunc], retry: str = 'per_task', task_attempts: int = 3, task_wait_seconds: float = 1.0, stage_attempts: int = 3, unpack_args: bool = False, task_concurrency_limits: Dict[str, int] = dict(), on_success: Optional[Callable[[Any, Any, Dict[str, Any]], Any]] = None, on_failure: Optional[Callable[[Any, Exception, Dict[str, Any]], Any]] = None, skip_if: Optional[Callable[[Any], bool]] = None, queue_capacity: Optional[int] = None)
A stage in the pipeline that processes items through a sequence of tasks.
validate
¶
Validate stage configuration.
Raises:
| Type | Description |
|---|---|
StageValidationError
|
If configuration is invalid |
Pipeline¶
Pipeline
¶
Pipeline(stages: List[Stage], collect_results: bool = True, status_tracker: Optional[StatusTracker] = None)
A multi-stage async pipeline with worker pools and flexible retry strategies.
Initialize the pipeline.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
stages
|
List[Stage]
|
List of Stage objects defining the pipeline |
required |
collect_results
|
bool
|
If True, collects results from the final stage into |
True
|
status_tracker
|
Optional[StatusTracker]
|
Optional StatusTracker for monitoring item status |
None
|
Raises:
| Type | Description |
|---|---|
PipelineError
|
If stages list is empty |
StageValidationError
|
If any stage configuration is invalid |
results
property
¶
Get collected results, sorted by original input sequence.
Only contains data if collect_results=True was passed to __init__.
create
classmethod
¶
quick
async
classmethod
¶
quick(items: Sequence[Any], tasks: Union[TaskFunc, List[TaskFunc]], workers: int = 5, retries: int = 3, progress: bool = False, dashboard: Optional[Literal['compact', 'detailed', 'full']] = None) -> List[PipelineResult]
One-liner pipeline for simple use cases.
Creates a single-stage pipeline if one task is provided, or a multi-stage pipeline with one stage per task if a list is provided.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
items
|
Sequence[Any]
|
Items to process |
required |
tasks
|
Union[TaskFunc, List[TaskFunc]]
|
Single task function or list of task functions |
required |
workers
|
int
|
Number of workers per stage |
5
|
retries
|
int
|
Number of retry attempts per task |
3
|
progress
|
bool
|
Show progress bar |
False
|
dashboard
|
Optional[Literal['compact', 'detailed', 'full']]
|
Dashboard type ("compact", "detailed", "full") |
None
|
Returns:
| Type | Description |
|---|---|
List[PipelineResult]
|
List of PipelineResult objects |
get_stats
¶
Get current pipeline statistics.
Returns:
| Type | Description |
|---|---|
PipelineStats
|
PipelineStats with current metrics |
get_worker_names
¶
Get all worker names organized by stage.
Useful for tracking which workers exist before pipeline runs.
Returns:
| Type | Description |
|---|---|
Dict[str, List[str]]
|
Dictionary mapping stage name to list of worker names |
Example:
```python
pipeline = Pipeline(stages=[
Stage(name="Fetch", workers=3, tasks=[fetch]),
Stage(name="Process", workers=2, tasks=[process])
])
pipeline.get_worker_names()
# {
# "Fetch": ["Fetch-W0", "Fetch-W1", "Fetch-W2"],
# "Process": ["Process-W0", "Process-W1"]
# }
```
get_worker_states
¶
Get current state of all workers.
Returns:
| Type | Description |
|---|---|
Dict[str, WorkerState]
|
Dictionary mapping worker name to WorkerState |
get_worker_metrics
¶
Get performance metrics for all workers.
Returns:
| Type | Description |
|---|---|
Dict[str, WorkerMetrics]
|
Dictionary mapping worker name to WorkerMetrics |
get_dashboard_snapshot
¶
Get complete dashboard snapshot with all current state.
Returns:
| Type | Description |
|---|---|
DashboardSnapshot
|
DashboardSnapshot with worker states, metrics, and pipeline stats |
get_error_summary
¶
Get aggregated error information from pipeline execution.
If a StatusTracker is configured, returns detailed error information. Otherwise, returns a summary based on pipeline-level counts.
Returns:
| Type | Description |
|---|---|
ErrorSummary
|
ErrorSummary with failure statistics |
feed
async
¶
Feed items into a specific stage of the pipeline.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
items
|
Sequence[Any]
|
Sequence of items to process |
required |
target_stage
|
Optional[str]
|
Name of the stage to inject items into. If None, feeds into the first stage. |
None
|
priority
|
int
|
Priority level (lower = higher priority). Default 100. |
100
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If target_stage is provided but not found. |
feed_async
async
¶
feed_async(items: AsyncIterable[Any], target_stage: Optional[str] = None, priority: int = 100) -> None
Feed items from an async iterable into a specific stage.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
items
|
AsyncIterable[Any]
|
Async iterable of items to process |
required |
target_stage
|
Optional[str]
|
Name of the stage to inject items into. If None, feeds into the first stage. |
None
|
priority
|
int
|
Priority level (lower = higher priority). Default 100. |
100
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If target_stage is provided but not found. |
start
async
¶
Start the pipeline workers in the background.
This method initializes the worker pool and starts processing items immediately
as they are available in the queues. Use feed() to add items.
Raises:
| Type | Description |
|---|---|
PipelineError
|
If pipeline is already running |
join
async
¶
Wait for all enqueued items to be processed and stop workers.
This method: 1. Waits for all queues to be empty (all items processed) 2. Signals workers to stop 3. Waits for the worker pool to shutdown
run
async
¶
run(items: Sequence[Any], progress: bool = False, dashboard: Optional[Literal['compact', 'detailed', 'full']] = None, custom_dashboard: Optional[DashboardProtocol] = None, dashboard_update_interval: float = 0.5) -> List[PipelineResult]
Run the pipeline end-to-end with the given items.
This is a convenience wrapper around start(), feed(), and join().
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
items
|
Sequence[Any]
|
Items to process through the pipeline. |
required |
progress
|
bool
|
Show minimal progress bar (mutually exclusive with dashboard). |
False
|
dashboard
|
Optional[Literal['compact', 'detailed', 'full']]
|
Built-in dashboard type: "compact", "detailed", or "full". |
None
|
custom_dashboard
|
Optional[DashboardProtocol]
|
User-provided dashboard implementing DashboardProtocol. |
None
|
dashboard_update_interval
|
float
|
How often to update the dashboard in seconds (default: 0.5). Only applies to DashboardProtocol-based displays (progress, dashboard, custom_dashboard). Lower values = more frequent updates but higher CPU usage. Recommended range: 0.1 to 1.0 seconds. |
0.5
|
Returns:
| Type | Description |
|---|---|
List[PipelineResult]
|
List of PipelineResult objects. |
Example
# Simple progress bar
results = await pipeline.run(items, progress=True)
# Compact dashboard with faster updates
results = await pipeline.run(items, dashboard="compact", dashboard_update_interval=0.2)
# Custom dashboard with slower updates (lower CPU usage)
results = await pipeline.run(items, custom_dashboard=MyDashboard(), dashboard_update_interval=1.0)
Note
The dashboard update mechanism uses polling: a background task calls
display.on_update(snapshot) every dashboard_update_interval seconds.
This is efficient because:
- Snapshots are lightweight (just reading current state)
- No events are generated - we only read existing data
- The interval is configurable to balance responsiveness vs CPU usage
For event-driven monitoring without polling, use StatusTracker callbacks instead.
stream
async
¶
Stream results as they complete.
Unlike run() which returns all results at once, stream() yields results as soon as they complete. Results are yielded in completion order, not input order.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
items
|
Sequence[Any]
|
Items to process through the pipeline |
required |
progress
|
bool
|
Show minimal progress bar |
False
|
Yields:
| Type | Description |
|---|---|
AsyncIterator[PipelineResult]
|
PipelineResult objects as they complete |
Example
Note
- Results are yielded in completion order, not input order
- Use run() if you need results in input order
- Early exit (break) is supported and will shutdown the pipeline
- stream() uses an internal queue and does NOT modify
collect_results - When streaming, results are NOT stored in
_resultsregardless ofcollect_resultssetting - After streaming completes,
_resultsremains empty (preserves originalcollect_resultssetting)
shutdown
async
¶
Shut down the pipeline gracefully or forcefully.
If queues are not empty, this might leave items unprocessed depending on how workers react to stop_event.
__aexit__
async
¶
Context manager exit with automatic shutdown.