Skip to content

Pipeline API

The antflow.pipeline module provides the core functionality for building and running multi-stage asynchronous workflows.

Overview

A Pipeline consists of a sequence of Stages. Each stage has its own pool of workers and processes items independently. Data flows from one stage to the next automatically.

Key components:

  • Pipeline: The main orchestrator that manages stages and data flow.
  • Stage: Configuration for a single processing step, including worker count and retry logic.

Usage Example

import asyncio
from antflow import Pipeline, Stage

async def fetch_data(url):
    # ... fetch logic ...
    return data

async def process_data(data):
    # ... process logic ...
    return result

async def main():
    # Define stages
    stage1 = Stage(name="Fetch", workers=5, tasks=[fetch_data])
    stage2 = Stage(name="Process", workers=2, tasks=[process_data])

    # Create pipeline
    pipeline = Pipeline(stages=[stage1, stage2])

    # Run
    urls = ["http://example.com/1", "http://example.com/2"]
    results = await pipeline.run(urls)

    for result in results:
        print(f"ID: {result.id}, Value: {result.value}")

Class Reference

Stage

Stage dataclass

Stage(name: str, workers: int, tasks: Sequence[TaskFunc], retry: str = 'per_task', task_attempts: int = 3, task_wait_seconds: float = 1.0, stage_attempts: int = 3, unpack_args: bool = False, task_concurrency_limits: Dict[str, int] = dict(), on_success: Optional[Callable[[Any, Any, Dict[str, Any]], Any]] = None, on_failure: Optional[Callable[[Any, Exception, Dict[str, Any]], Any]] = None, skip_if: Optional[Callable[[Any], bool]] = None, queue_capacity: Optional[int] = None)

A stage in the pipeline that processes items through a sequence of tasks.

validate

validate() -> None

Validate stage configuration.

Raises:

Type Description
StageValidationError

If configuration is invalid

Pipeline

Pipeline

Pipeline(stages: List[Stage], collect_results: bool = True, status_tracker: Optional[StatusTracker] = None)

A multi-stage async pipeline with worker pools and flexible retry strategies.

Initialize the pipeline.

Parameters:

Name Type Description Default
stages List[Stage]

List of Stage objects defining the pipeline

required
collect_results bool

If True, collects results from the final stage into self.results. If False, results are discarded after processing (useful for fire-and-forget or side-effect only pipelines). Defaults to True.

True
status_tracker Optional[StatusTracker]

Optional StatusTracker for monitoring item status

None

Raises:

Type Description
PipelineError

If stages list is empty

StageValidationError

If any stage configuration is invalid

results property

results: List[PipelineResult]

Get collected results, sorted by original input sequence.

Only contains data if collect_results=True was passed to __init__.

create classmethod

create() -> PipelineBuilder

Create a pipeline using the fluent builder API.

Returns:

Type Description
PipelineBuilder

PipelineBuilder for chaining

Example
results = await (
    Pipeline.create()
    .add("Fetch", fetch, workers=10)
    .add("Process", process, workers=5)
    .run(items, progress=True)
)

quick async classmethod

quick(items: Sequence[Any], tasks: Union[TaskFunc, List[TaskFunc]], workers: int = 5, retries: int = 3, progress: bool = False, dashboard: Optional[Literal['compact', 'detailed', 'full']] = None) -> List[PipelineResult]

One-liner pipeline for simple use cases.

Creates a single-stage pipeline if one task is provided, or a multi-stage pipeline with one stage per task if a list is provided.

Parameters:

Name Type Description Default
items Sequence[Any]

Items to process

required
tasks Union[TaskFunc, List[TaskFunc]]

Single task function or list of task functions

required
workers int

Number of workers per stage

5
retries int

Number of retry attempts per task

3
progress bool

Show progress bar

False
dashboard Optional[Literal['compact', 'detailed', 'full']]

Dashboard type ("compact", "detailed", "full")

None

Returns:

Type Description
List[PipelineResult]

List of PipelineResult objects

Example
# Single task
results = await Pipeline.quick(items, process, workers=10)

# Multiple tasks (one stage per task)
results = await Pipeline.quick(
    items,
    [fetch, process, save],
    workers=5,
    progress=True
)

get_stats

get_stats() -> PipelineStats

Get current pipeline statistics.

Returns:

Type Description
PipelineStats

PipelineStats with current metrics

get_worker_names

get_worker_names() -> Dict[str, List[str]]

Get all worker names organized by stage.

Useful for tracking which workers exist before pipeline runs.

Returns:

Type Description
Dict[str, List[str]]

Dictionary mapping stage name to list of worker names

Example:

```python
pipeline = Pipeline(stages=[
    Stage(name="Fetch", workers=3, tasks=[fetch]),
    Stage(name="Process", workers=2, tasks=[process])
])
pipeline.get_worker_names()
# {
#     "Fetch": ["Fetch-W0", "Fetch-W1", "Fetch-W2"],
#     "Process": ["Process-W0", "Process-W1"]
# }
```

get_worker_states

get_worker_states() -> Dict[str, WorkerState]

Get current state of all workers.

Returns:

Type Description
Dict[str, WorkerState]

Dictionary mapping worker name to WorkerState

Example
states = pipeline.get_worker_states()
for name, state in states.items():
    if state.status == "busy":
        print(f"{name} processing item {state.current_item_id}")

get_worker_metrics

get_worker_metrics() -> Dict[str, WorkerMetrics]

Get performance metrics for all workers.

Returns:

Type Description
Dict[str, WorkerMetrics]

Dictionary mapping worker name to WorkerMetrics

Example
metrics = pipeline.get_worker_metrics()
for name, metric in metrics.items():
    print(f"{name}: {metric.items_processed} items, "
          f"avg {metric.avg_processing_time:.2f}s")

get_dashboard_snapshot

get_dashboard_snapshot() -> DashboardSnapshot

Get complete dashboard snapshot with all current state.

Returns:

Type Description
DashboardSnapshot

DashboardSnapshot with worker states, metrics, and pipeline stats

Example
snapshot = pipeline.get_dashboard_snapshot()
print(f"Active workers: {sum(1 for s in snapshot.worker_states.values() if s.status == 'busy')}")
print(f"Items processed: {snapshot.pipeline_stats.items_processed}")

get_error_summary

get_error_summary() -> ErrorSummary

Get aggregated error information from pipeline execution.

If a StatusTracker is configured, returns detailed error information. Otherwise, returns a summary based on pipeline-level counts.

Returns:

Type Description
ErrorSummary

ErrorSummary with failure statistics

Example
summary = pipeline.get_error_summary()
print(f"Total failed: {summary.total_failed}")
for error_type, count in summary.errors_by_type.items():
    print(f"  {error_type}: {count}")

feed async

feed(items: Sequence[Any], target_stage: Optional[str] = None, priority: int = 100) -> None

Feed items into a specific stage of the pipeline.

Parameters:

Name Type Description Default
items Sequence[Any]

Sequence of items to process

required
target_stage Optional[str]

Name of the stage to inject items into. If None, feeds into the first stage.

None
priority int

Priority level (lower = higher priority). Default 100.

100

Raises:

Type Description
ValueError

If target_stage is provided but not found.

feed_async async

feed_async(items: AsyncIterable[Any], target_stage: Optional[str] = None, priority: int = 100) -> None

Feed items from an async iterable into a specific stage.

Parameters:

Name Type Description Default
items AsyncIterable[Any]

Async iterable of items to process

required
target_stage Optional[str]

Name of the stage to inject items into. If None, feeds into the first stage.

None
priority int

Priority level (lower = higher priority). Default 100.

100

Raises:

Type Description
ValueError

If target_stage is provided but not found.

start async

start() -> None

Start the pipeline workers in the background.

This method initializes the worker pool and starts processing items immediately as they are available in the queues. Use feed() to add items.

Raises:

Type Description
PipelineError

If pipeline is already running

join async

join() -> None

Wait for all enqueued items to be processed and stop workers.

This method: 1. Waits for all queues to be empty (all items processed) 2. Signals workers to stop 3. Waits for the worker pool to shutdown

run async

run(items: Sequence[Any], progress: bool = False, dashboard: Optional[Literal['compact', 'detailed', 'full']] = None, custom_dashboard: Optional[DashboardProtocol] = None, dashboard_update_interval: float = 0.5) -> List[PipelineResult]

Run the pipeline end-to-end with the given items.

This is a convenience wrapper around start(), feed(), and join().

Parameters:

Name Type Description Default
items Sequence[Any]

Items to process through the pipeline.

required
progress bool

Show minimal progress bar (mutually exclusive with dashboard).

False
dashboard Optional[Literal['compact', 'detailed', 'full']]

Built-in dashboard type: "compact", "detailed", or "full".

None
custom_dashboard Optional[DashboardProtocol]

User-provided dashboard implementing DashboardProtocol.

None
dashboard_update_interval float

How often to update the dashboard in seconds (default: 0.5). Only applies to DashboardProtocol-based displays (progress, dashboard, custom_dashboard). Lower values = more frequent updates but higher CPU usage. Recommended range: 0.1 to 1.0 seconds.

0.5

Returns:

Type Description
List[PipelineResult]

List of PipelineResult objects.

Example
# Simple progress bar
results = await pipeline.run(items, progress=True)

# Compact dashboard with faster updates
results = await pipeline.run(items, dashboard="compact", dashboard_update_interval=0.2)

# Custom dashboard with slower updates (lower CPU usage)
results = await pipeline.run(items, custom_dashboard=MyDashboard(), dashboard_update_interval=1.0)
Note

The dashboard update mechanism uses polling: a background task calls display.on_update(snapshot) every dashboard_update_interval seconds. This is efficient because: - Snapshots are lightweight (just reading current state) - No events are generated - we only read existing data - The interval is configurable to balance responsiveness vs CPU usage

For event-driven monitoring without polling, use StatusTracker callbacks instead.

stream async

stream(items: Sequence[Any], progress: bool = False) -> AsyncIterator[PipelineResult]

Stream results as they complete.

Unlike run() which returns all results at once, stream() yields results as soon as they complete. Results are yielded in completion order, not input order.

Parameters:

Name Type Description Default
items Sequence[Any]

Items to process through the pipeline

required
progress bool

Show minimal progress bar

False

Yields:

Type Description
AsyncIterator[PipelineResult]

PipelineResult objects as they complete

Example
async for result in pipeline.stream(items):
    print(f"Got result: {result.value}")
    if some_condition:
        break  # Early exit supported
Note
  • Results are yielded in completion order, not input order
  • Use run() if you need results in input order
  • Early exit (break) is supported and will shutdown the pipeline
  • stream() uses an internal queue and does NOT modify collect_results
  • When streaming, results are NOT stored in _results regardless of collect_results setting
  • After streaming completes, _results remains empty (preserves original collect_results setting)

shutdown async

shutdown() -> None

Shut down the pipeline gracefully or forcefully.

If queues are not empty, this might leave items unprocessed depending on how workers react to stop_event.

__aenter__ async

__aenter__() -> 'Pipeline'

Context manager entry.

__aexit__ async

__aexit__(exc_type, exc_val, exc_tb) -> None

Context manager exit with automatic shutdown.