Skip to content

Pipeline Guide

The Pipeline class enables multi-stage async processing with configurable worker pools, retry strategies, and real-time status tracking.

Core Concepts

Stages

A Stage represents a processing step with: - One or more sequential tasks - A pool of workers executing in parallel - A retry strategy (per-task or per-stage) - Automatic status tracking when StatusTracker is configured

Pipeline

A Pipeline connects multiple stages together: - Items flow from one stage to the next - Each stage has its own queue and workers - Order can be preserved across stages - Results are collected from the final stage

Stage Configuration Reference

The Stage class is the building block of your pipeline. Here are all the available configuration options:

Parameter Type Default Description
name str Required Unique identifier for the stage. Used in logs and status events.
workers int Required Number of concurrent workers for this stage.
tasks List[TaskFunc] Required List of async functions to execute sequentially for each item.
retry str "per_task" Retry strategy: "per_task" (retry individual function) or "per_stage" (restart stage from first task).
task_attempts int 3 Max attempts per task (used in "per_task" mode).
stage_attempts int 3 Max attempts per stage (used in "per_stage" mode).
task_wait_seconds float 1.0 Time to wait between retries.
task_concurrency_limits Dict[str, int] None Max concurrent executions for specific task functions (see example below).
unpack_args bool False If True, unpacks the input item (*args/**kwargs) when calling the first task.
on_success Callable None Callback or async function to run on successful item completion.
on_failure Callable None Callback or async function to run on item failure (after all retries).
skip_if Callable None Predicate function (item) -> bool. If True, the item skips this stage entirely.

Use Case: OpenAI Batch Processing

task_concurrency_limits is powerful when you have a large worker pool for general processing but need to throttle specific operations (like API uploads) due to strict rate limits.

Scenario: You have 1000 jobs to process using OpenAI's Batch API. The process is: Generate -> Upload -> Start Job -> Poll -> Download.

The Constraints: 1. Strict Upload Limit: You can only upload 2 files at the same time (rate limit). 2. Batch Queue Capacity: You can have max 50 active jobs running in the Batch API at once.

The Problem: * If you set workers=2 (to satisfy the upload limit), you will only ever have 2 jobs running in the Batch API. You are wasting 48 slots of capacity! * If you set workers=50 (to fill the Batch API capacity), all 50 workers will try to upload files simultaneously at the start, causing you to hit the upload rate limit and crash.

Solution: Use workers=50 to maximize your Batch API usage, but use task_concurrency_limits to throttle only the upload task to 2.

async def generate_jsonl(item): ...
async def upload_file(item): ... # Strict limit!
async def start_job(item): ...
async def poll_status(item): ... # Takes a long time
async def download_results(item): ...

stage = Stage(
    name="OpenAI_Batch_Job",
    workers=50,  # 50 workers allow us to poll 50 jobs simultaneously!
    tasks=[
        generate_jsonl,
        upload_file,
        start_job,
        poll_status,
        download_results
    ],
    retry="per_task",
    # The magic happens here:
    task_concurrency_limits={
        "upload_file": 2  # Only 2 uploads happen at once, even with 50 active workers
    }
)

Architecture Comparison: Why not 2 Stages?

You might think: "Why not just use a GeneratorStage (2 workers) feeding into a PollingStage (50 workers)?"

Scenario A: Two Stages (The "Firehose" Risk) ❌

# ❌ DON'T DO THIS for this scenario
stage_gen = Stage("Generator", workers=2, tasks=[generate, upload])
stage_poll = Stage("Polling", workers=50, tasks=[poll])

# Problem: stage_gen is faster than stage_poll.
# It will pump 1000 jobs into stage_poll's queue.
# Result: 950 jobs running on OpenAI, unmonitored in the queue.
  • Structure: GeneratorStage (2 workers) -> PollingStage (50 workers).
  • Behavior: The GeneratorStage continuously produces jobs (2 at a time) and pushes them to PollingStage.
  • The Problem: If PollingStage is full (50 jobs running), the GeneratorStage keeps going. It might start 750 jobs on OpenAI.
  • Result: You have 750 jobs running on OpenAI, but only 50 workers "watching" them. 700 jobs are running unmonitored. If they fail or finish, you won't know until a worker frees up.

Scenario B: Single Stage + Limits (The "Bounded" Solution) ✅

# ✅ DO THIS
stage = Stage(
    "Combined", 
    workers=50, 
    tasks=[generate, upload, poll],
    task_concurrency_limits={"upload": 2}
)
  • Structure: One Stage (50 workers) with limit={upload: 2}.
  • Behavior: When all 50 workers are busy polling, no new upload tasks can start.
  • The Result:
    • Max 50 jobs are running on OpenAI at any time.
    • Job #51 stays in the input queue. It is not generated, not uploaded, and not started.
    • This provides true Backpressure. You usually want this to avoid overwhelming the external system or your own monitoring capacity.

Basic Usage

Creating a Simple Pipeline

from antflow import Pipeline, Stage

async def fetch(x):
    return f"data_{x}"

async def process(x):
    return x.upper()

# Define stages
fetch_stage = Stage(name="Fetch", workers=3, tasks=[fetch])
process_stage = Stage(name="Process", workers=2, tasks=[process])

# Create pipeline
pipeline = Pipeline(stages=[fetch_stage, process_stage])

# Run pipeline
results = await pipeline.run(range(10))

Interactive Execution

For more control, you can split execution into start, feed, and join steps. This allows you to inject items dynamically while the pipeline is running:

# Start background workers
await pipeline.start()

# Feed initial batch to first stage (default)
await pipeline.feed(batch_1)

# Do other work...

# Inject items directly into a specific stage (e.g., resuming from a checkpoint)
# This item will skip previous stages and start processing at 'ProcessStage'
await pipeline.feed(recovered_items, target_stage="ProcessStage")

# Wait for completion
await pipeline.join()

# Access results
print(pipeline.results)

Multiple Tasks per Stage

A stage can have multiple tasks that execute sequentially for each item:

import asyncio
from antflow import Pipeline, Stage

async def validate(x):
    if x < 0:
        raise ValueError("Negative value")
    return x

async def transform(x):
    return x * 2

async def format_output(x):
    return f"Result: {x}"

stage = Stage(
    name="ProcessStage",
    workers=3,
    tasks=[validate, transform, format_output]
)

pipeline = Pipeline(stages=[stage])
results = await pipeline.run(range(10))

Retry Strategies

Per-Task Retry

Each task retries independently using tenacity:

from antflow import Stage

stage = Stage(
    name="FetchStage",
    workers=5,
    tasks=[fetch_from_api],
    retry="per_task",
    task_attempts=3,
    task_wait_seconds=1.0
)
  • task_attempts: Maximum retry attempts per task
  • task_wait_seconds: Wait time between retries

If a task fails after all retries, the stage fails for that item.

Per-Stage Retry

The entire stage (all tasks) retries on any failure:

from antflow import Stage

stage = Stage(
    name="TransactionStage",
    workers=2,
    tasks=[begin_transaction, update_db, commit],
    retry="per_stage",
    stage_attempts=3
)
  • stage_attempts: Maximum retry attempts for the entire stage

If any task fails, the item is re-queued at the beginning of the stage.

When to Use Which

Per-Task Retry: - Independent tasks that can fail separately - Fine-grained retry control - Tasks with different failure modes

Per-Stage Retry: - Transactional operations - Tasks with dependencies - All-or-nothing processing

Status Tracking

Track items in real-time as they flow through the pipeline with StatusTracker.

Basic Status Tracking

from antflow import Pipeline, Stage, StatusTracker

tracker = StatusTracker()

stage = Stage(
    name="ProcessStage",
    workers=3,
    tasks=[my_task]
)

pipeline = Pipeline(stages=[stage], status_tracker=tracker)
results = await pipeline.run(items)

# Query statistics
stats = tracker.get_stats()
print(f"Completed: {stats['completed']}")
print(f"Failed: {stats['failed']}")

Real-Time Event Monitoring

Get notified when items change status:

from antflow import Pipeline, StatusTracker

async def on_status_change(event):
    print(f"{event.status.upper()}: Item {event.item_id} @ {event.stage}")

    if event.status == "failed":
        print(f"  Error: {event.metadata.get('error')}")
    elif event.status == "in_progress":
        print(f"  Worker: {event.worker}")

tracker = StatusTracker(on_status_change=on_status_change)
pipeline = Pipeline(stages=[stage], status_tracker=tracker)

Task-Level Event Monitoring

For granular tracking, monitor individual tasks within stages:

from antflow import StatusTracker, TaskEvent

async def on_task_retry(event: TaskEvent):
    print(f"⚠️  Task {event.task_name} retry #{event.attempt}")
    print(f"   Item: {event.item_id}, Error: {event.error}")

async def on_task_fail(event: TaskEvent):
    print(f"❌ Task {event.task_name} FAILED after {event.attempt} attempts")

    if event.task_name == "save_to_database":
        await send_critical_alert(f"Database save failed for {event.item_id}")

tracker = StatusTracker(
    on_success=handle_success,
    on_failure=handle_failure
)

### Task Concurrency Limits

You can limit the concurrency of specific tasks within a stage using `task_concurrency_limits`. This is useful when you have a high number of workers (e.g., 50) but one specific task (like an API call) has a strict rate limit (e.g., 5 concurrent requests).

See the [Concurrency Control Guide](concurrency.md) for more details.

```python
stage = Stage(
    name="ETL",
    tasks=[extract, transform, validate, save],
    retry="per_task",
    task_attempts=3,
    # Limit specific tasks (e.g., API calls) to avoid rate limits
    task_concurrency_limits={
        "extract": 5,  # Max 5 concurrent extract calls
        "save": 20     # Max 20 concurrent save calls
    }
)

pipeline = Pipeline(stages=[stage], status_tracker=tracker)
**Task Events Available:**

- `on_task_start`: Called when a task begins execution
- `on_task_complete`: Called when a task completes successfully
- `on_task_retry`: Called when a task is retrying after a failure
- `on_task_fail`: Called when a task fails after all retry attempts

Each `TaskEvent` contains:
- `item_id`: Item being processed
- `stage`: Stage name
- `task_name`: Specific task function name
- `worker`: Worker processing the task
- `event_type`: "start", "complete", "retry", or "fail"
- `attempt`: Current attempt number (1-indexed)
- `timestamp`: When the event occurred
- `error`: Exception if task failed (None otherwise)
- `duration`: Task execution time in seconds (None for start events)

### Query Item Status

```python
# Get specific item status
status = tracker.get_status(item_id=42)
print(f"Item 42: {status.status} @ {status.stage}")

# Get all failed items
failed = tracker.get_by_status("failed")
for event in failed:
    print(f"Item {event.item_id}: {event.metadata['error']}")

# Get item history
history = tracker.get_history(item_id=42)
for event in history:
    print(f"{event.timestamp}: {event.status}")

Status Types

Items progress through these states: - queued - Waiting in stage queue - in_progress - Being processed by worker - completed - Successfully finished stage - failed - Failed to complete stage

Important: Status tracking is at the stage level, not individual task level. If a stage has multiple tasks (e.g., [validate, transform, enrich]), you will know the stage failed but not which specific task caused the failure. The error message in event.metadata['error'] will contain details about the failure. For task-level granularity, consider using separate stages (one task per stage) or adding logging within tasks.

Order Preservation

Results are always returned in input order:

from antflow import Pipeline

pipeline = Pipeline(stages=[stage1, stage2])
results = await pipeline.run(items)
# Results maintain input order

Monitoring Strategies

AntFlow supports two primary ways to monitor your pipeline: Event-Driven (Callbacks) and Polling.

Strategy 1: Event-Driven (Callbacks)

Use StatusTracker callbacks to react immediately when events occur. This is best for: - Real-time dashboards (low latency) - Logging specific events (e.g., errors) - Triggering external actions (e.g., alerts)

Example: examples/rich_callback_dashboard.py demonstrates this approach.

async def on_status_change(event):
    # React immediately to status changes
    print(f"Item {event.item_id} is now {event.status}")

tracker = StatusTracker(on_status_change=on_status_change)
pipeline = Pipeline(stages=[...], status_tracker=tracker)
await pipeline.run(items)

Strategy 2: Polling (Loop)

Run a separate loop to periodically check pipeline.get_stats() or pipeline.get_dashboard_snapshot(). This is best for: - Periodic metrics aggregation - Decoupled monitoring (UI runs at its own FPS) - Reducing overhead (batching updates)

Example: examples/rich_polling_dashboard.py demonstrates this approach.

async def monitor_loop(pipeline):
    while True:
        # Poll current state every second
        stats = pipeline.get_stats()
        print(f"Processed: {stats.items_processed}, In-Flight: {stats.items_in_flight}")
        await asyncio.sleep(1.0)

async with asyncio.TaskGroup() as tg:
    tg.create_task(monitor_loop(pipeline))
    await pipeline.run(items)

Feeding Data

Synchronous Iterable

items = list(range(100))
results = await pipeline.run(items)

Async Iterable

import asyncio

async def data_generator():
    for i in range(100):
        await asyncio.sleep(0.01)
        yield i

await pipeline.feed_async(data_generator())

Dict Input

Pass dict items with custom IDs:

items = [
    {"id": "user_1", "value": {"name": "Alice"}},
    {"id": "user_2", "value": {"name": "Bob"}},
]
results = await pipeline.run(items)

Context Manager

Use pipeline as a context manager for automatic cleanup:

from antflow import Pipeline

async with Pipeline(stages=[stage1, stage2]) as pipeline:
    results = await pipeline.run(items)
# Pipeline is automatically shut down

Error Handling

Tracking Failures with StatusTracker

from antflow import Pipeline, Stage, StatusTracker

tracker = StatusTracker()

stage = Stage(
    name="RiskyStage",
    workers=3,
    tasks=[risky_task],
    retry="per_task",
    task_attempts=3
)

pipeline = Pipeline(stages=[stage], status_tracker=tracker)
results = await pipeline.run(items)

# Get statistics
stats = tracker.get_stats()
print(f"Succeeded: {stats['completed']}")
print(f"Failed: {stats['failed']}")

# Get failed items
failed_items = tracker.get_by_status("failed")
for event in failed_items:
    print(f"Item {event.item_id}: {event.metadata['error']}")

Extracting Error Information

Errors are available in the event metadata:

from antflow import StatusTracker

async def on_status_change(event):
    if event.status == "failed":
        error = event.metadata.get('error')
        print(f"Item {event.item_id} failed: {error}")

tracker = StatusTracker(on_status_change=on_status_change)

Worker-Level Tracking

Each worker has a unique ID (0 to N-1). Track which worker processes which item:

from antflow import Pipeline, StatusTracker

async def on_status_change(event):
    if event.status == "in_progress":
        print(f"Worker {event.worker_id}: processing {event.item_id}")
    elif event.status == "completed":
        print(f"Worker {event.worker_id}: completed {event.item_id}")

tracker = StatusTracker(on_status_change=on_status_change)
pipeline = Pipeline(stages=[stage], status_tracker=tracker)

worker_names = pipeline.get_worker_names()

See the Worker Tracking Guide for detailed examples including:

  • Custom item IDs for better tracking
  • Worker utilization analysis
  • Load balancing monitoring
  • Error tracking by worker

Priority Queues

AntFlow uses Priority Queues internally. You can assign a priority level to items when feeding them into the pipeline. Lower numbers indicate higher priority (processed first). The default priority is 100.

  • Items with the same priority are processed in FIFO order.
  • Priority is preserved across stages (unless a custom feed injects with different priority).
  • Retries (per-task or per-stage) currently preserve the original priority.
# Expedited items (Priority 10)
await pipeline.feed(vip_items, priority=10)

# Normal items (Priority 100)
await pipeline.feed(regular_items)

# Background/Low priority (Priority 500)
await pipeline.feed(maintenance_items, priority=500)

Complete ETL Example

import asyncio
from antflow import Pipeline, Stage

class ETLProcessor:
    async def extract(self, item_id):
        # Fetch from API/database
        await asyncio.sleep(0.1)
        return {"id": item_id, "data": f"raw_{item_id}"}

    async def validate(self, data):
        # Validate data
        if "data" not in data:
            raise ValueError("Invalid data")
        return data

    async def transform(self, data):
        # Transform data
        data["processed"] = data["data"].upper()
        return data

    async def enrich(self, data):
        # Enrich with additional data
        data["metadata"] = {"timestamp": "2025-10-09"}
        return data

    async def load(self, data):
        # Save to database
        await asyncio.sleep(0.1)
        data["saved"] = True
        return data

async def main():
    processor = ETLProcessor()

    # Extract stage with high concurrency
    extract_stage = Stage(
        name="Extract",
        workers=10,
        tasks=[processor.extract],
        retry="per_task",
        task_attempts=5,
        task_wait_seconds=2.0
    )

    # Transform stage with validation
    transform_stage = Stage(
        name="Transform",
        workers=5,
        tasks=[processor.validate, processor.transform, processor.enrich],
        retry="per_stage",  # Transactional
        stage_attempts=3
    )

    # Load stage with retries
    load_stage = Stage(
        name="Load",
        workers=3,
        tasks=[processor.load],
        retry="per_task",
        task_attempts=5,
        task_wait_seconds=3.0
    )

    # Build pipeline
    pipeline = Pipeline(
        stages=[extract_stage, transform_stage, load_stage]
    )

    # Process items
    item_ids = range(100)
    results = await pipeline.run(item_ids)

    print(f"Processed {len(results)} items")
    print(f"Stats: {pipeline.get_stats()}")

asyncio.run(main())

Best Practices

Worker Pool Sizing

  • Extract/Fetch: More workers (I/O-bound)
  • Transform: Moderate workers (CPU-bound)
  • Load/Save: Fewer workers (rate-limited)

Retry Configuration

  • Use per-task for independent operations
  • Use per-stage for transactional operations
  • Set appropriate task_wait_seconds for rate limiting
  • Increase task_attempts for flaky external services

Callbacks

  • Use callbacks for logging and monitoring
  • Keep callbacks lightweight (use queues for heavy operations)
  • Avoid long-running operations in callbacks

Error Handling

  • Always set up on_failure callbacks for production
  • Log failed items for later retry/analysis
  • Monitor items_failed metric

Advanced Internals

[!WARNING] This section covers internal implementation details. These APIs are protected (_prefix) and may change between minor versions. Use them with caution when building custom subclasses.

Internal Queue Structure (_queues)

AntFlow manages data flow between stages using a list of internal queues, accessible via self._queues.

  • Type: List[asyncio.PriorityQueue]
  • Indexing: _queues[i] corresponds to stages[i].
  • Item Structure: Items are stored as tuples to ensure correct priority ordering and FIFO stability:
    (priority, sequence_id, (payload, attempt))
    
    • priority (int): Lower number = higher priority.
    • sequence_id (int): Monotonically increasing counter to ensure FIFO order for same-priority items.
    • payload (dict): The internal item wrapper (see below).
    • attempt (int): Current retry attempt number (1-indexed).

Payload Structure (_prepare_payload)

When items enter the pipeline (via feed or run), they are wrapped in an internal dictionary structure to track metadata and ensure unique identification. The _prepare_payload(item) method handles this normalization.

Internal Payload Format:

{
    "id": Any,          # Unique identifier (extracted from dict or generated)
    "value": Any,       # The actual data being processed
    "_sequence_id": int # Global sequence ID for result ordering
}

  • ID Extraction:
    • If item is a dict and has an "id" key, that is used.
    • Otherwise, the item's index in the input iterable is used as the ID.
  • Value Extraction:
    • If item is a dict and has a "value" key, that is used.
    • Otherwise, the item itself is used as the value.

Example Override: If you need custom ID generation logic, you can override _prepare_payload in a subclass:

class CustomPipeline(Pipeline):
    def _prepare_payload(self, item):
        # Custom logic: use 'uuid' field as ID if present
        if isinstance(item, dict) and 'uuid' in item:
            return {
                "id": item['uuid'], 
                "value": item, 
                "_sequence_id": self._msg_counter
            }
        return super()._prepare_payload(item)