Skip to content

Pipeline API

The antflow.pipeline module provides the core functionality for building and running multi-stage asynchronous workflows.

Overview

A Pipeline consists of a sequence of Stages. Each stage has its own pool of workers and processes items independently. Data flows from one stage to the next automatically.

Key components:

  • Pipeline: The main orchestrator that manages stages and data flow.
  • Stage: Configuration for a single processing step, including worker count and retry logic.

Usage Example

import asyncio
from antflow import Pipeline, Stage

async def fetch_data(url):
    # ... fetch logic ...
    return data

async def process_data(data):
    # ... process logic ...
    return result

async def main():
    # Define stages
    stage1 = Stage(name="Fetch", workers=5, tasks=[fetch_data])
    stage2 = Stage(name="Process", workers=2, tasks=[process_data])

    # Create pipeline
    pipeline = Pipeline(stages=[stage1, stage2])

    # Run
    urls = ["http://example.com/1", "http://example.com/2"]
    results = await pipeline.run(urls)

    for result in results:
        print(f"ID: {result.id}, Value: {result.value}")

Class Reference

Stage

Stage dataclass

Stage(name: str, workers: int, tasks: Sequence[TaskFunc], retry: str = 'per_task', task_attempts: int = 3, task_wait_seconds: float = 1.0, stage_attempts: int = 3, unpack_args: bool = False, task_concurrency_limits: Dict[str, int] = dict(), on_success: Optional[Callable[[Any, Any, Dict[str, Any]], Any]] = None, on_failure: Optional[Callable[[Any, Exception, Dict[str, Any]], Any]] = None, skip_if: Optional[Callable[[Any], bool]] = None)

A stage in the pipeline that processes items through a sequence of tasks.

validate

validate() -> None

Validate stage configuration.

Raises:

Type Description
StageValidationError

If configuration is invalid

Pipeline

Pipeline

Pipeline(stages: List[Stage], collect_results: bool = True, status_tracker: Optional[StatusTracker] = None)

A multi-stage async pipeline with worker pools and flexible retry strategies.

Initialize the pipeline.

Parameters:

Name Type Description Default
stages List[Stage]

List of Stage objects defining the pipeline

required
collect_results bool

If True, collects results from the final stage into self.results. If False, results are discarded after processing (useful for fire-and-forget or side-effect only pipelines). Defaults to True.

True
status_tracker Optional[StatusTracker]

Optional StatusTracker for monitoring item status

None

Raises:

Type Description
PipelineError

If stages list is empty

StageValidationError

If any stage configuration is invalid

results property

results: List[PipelineResult]

Get collected results, sorted by original input sequence.

Only contains data if collect_results=True was passed to __init__.

get_stats

get_stats() -> PipelineStats

Get current pipeline statistics.

Returns:

Type Description
PipelineStats

PipelineStats with current metrics

get_worker_names

get_worker_names() -> Dict[str, List[str]]

Get all worker names organized by stage.

Useful for tracking which workers exist before pipeline runs.

Returns:

Type Description
Dict[str, List[str]]

Dictionary mapping stage name to list of worker names

Example:

```python
pipeline = Pipeline(stages=[
    Stage(name="Fetch", workers=3, tasks=[fetch]),
    Stage(name="Process", workers=2, tasks=[process])
])
pipeline.get_worker_names()
# {
#     "Fetch": ["Fetch-W0", "Fetch-W1", "Fetch-W2"],
#     "Process": ["Process-W0", "Process-W1"]
# }
```

get_worker_states

get_worker_states() -> Dict[str, WorkerState]

Get current state of all workers.

Returns:

Type Description
Dict[str, WorkerState]

Dictionary mapping worker name to WorkerState

Example
states = pipeline.get_worker_states()
for name, state in states.items():
    if state.status == "busy":
        print(f"{name} processing item {state.current_item_id}")

get_worker_metrics

get_worker_metrics() -> Dict[str, WorkerMetrics]

Get performance metrics for all workers.

Returns:

Type Description
Dict[str, WorkerMetrics]

Dictionary mapping worker name to WorkerMetrics

Example
metrics = pipeline.get_worker_metrics()
for name, metric in metrics.items():
    print(f"{name}: {metric.items_processed} items, "
          f"avg {metric.avg_processing_time:.2f}s")

get_dashboard_snapshot

get_dashboard_snapshot() -> DashboardSnapshot

Get complete dashboard snapshot with all current state.

Returns:

Type Description
DashboardSnapshot

DashboardSnapshot with worker states, metrics, and pipeline stats

Example
snapshot = pipeline.get_dashboard_snapshot()
print(f"Active workers: {sum(1 for s in snapshot.worker_states.values() if s.status == 'busy')}")
print(f"Items processed: {snapshot.pipeline_stats.items_processed}")

feed async

feed(items: Sequence[Any], target_stage: Optional[str] = None, priority: int = 100) -> None

Feed items into a specific stage of the pipeline.

Parameters:

Name Type Description Default
items Sequence[Any]

Sequence of items to process

required
target_stage Optional[str]

Name of the stage to inject items into. If None, feeds into the first stage.

None
priority int

Priority level (lower = higher priority). Default 100.

100

Raises:

Type Description
ValueError

If target_stage is provided but not found.

feed_async async

feed_async(items: AsyncIterable[Any], target_stage: Optional[str] = None, priority: int = 100) -> None

Feed items from an async iterable into a specific stage.

Parameters:

Name Type Description Default
items AsyncIterable[Any]

Async iterable of items to process

required
target_stage Optional[str]

Name of the stage to inject items into. If None, feeds into the first stage.

None
priority int

Priority level (lower = higher priority). Default 100.

100

Raises:

Type Description
ValueError

If target_stage is provided but not found.

start async

start() -> None

Start the pipeline workers in the background.

This method initializes the worker pool and starts processing items immediately as they are available in the queues. Use feed() to add items.

Raises:

Type Description
PipelineError

If pipeline is already running

join async

join() -> None

Wait for all enqueued items to be processed and stop workers.

This method: 1. Waits for all queues to be empty (all items processed) 2. Signals workers to stop 3. Waits for the worker pool to shutdown

run async

run(items: Sequence[Any]) -> List[PipelineResult]

Run the pipeline end-to-end with the given items.

This is a convenience wrapper around start(), feed(), and join().

Parameters:

Name Type Description Default
items Sequence[Any]

Items to process through the pipeline.

required

Returns:

Type Description
List[PipelineResult]

List of PipelineResult objects.

shutdown async

shutdown() -> None

Shut down the pipeline gracefully or forcefully.

If queues are not empty, this might leave items unprocessed depending on how workers react to stop_event.

__aenter__ async

__aenter__() -> Pipeline

Context manager entry.

__aexit__ async

__aexit__(exc_type, exc_val, exc_tb) -> None

Context manager exit with automatic shutdown.