Skip to content

Advanced Examples

Complex real-world examples demonstrating advanced AntFlow features.

Pipeline with Retry Strategies

This example shows different retry strategies for handling failures:

import asyncio
import random
from antflow import Pipeline, Stage

# Tasks that may fail
async def fetch_api_data(item_id: int) -> dict:
    """Simulate fetching data from an API (may fail randomly)."""
    await asyncio.sleep(0.05)
    if random.random() < 0.2:
        raise ConnectionError(f"API connection failed for item {item_id}")
    return {"id": item_id, "data": f"raw_data_{item_id}"}

async def validate_data(data: dict) -> dict:
    """Validate fetched data."""
    await asyncio.sleep(0.03)
    if "data" not in data:
        raise ValueError("Invalid data structure")
    data["validated"] = True
    return data

async def save_to_database(data: dict) -> dict:
    """Simulate saving to database (may fail randomly)."""
    await asyncio.sleep(0.06)
    if random.random() < 0.15:
        raise IOError(f"Database write failed for item {data['id']}")
    data["saved"] = True
    return data

# Callbacks for monitoring
async def on_fetch_success(payload):
    print(f"  ✅ Fetched item {payload['id']}")

async def on_fetch_failure(payload):
    print(f"  ❌ Failed to fetch item {payload['id']}: {payload['error']}")

async def on_fetch_retry(payload):
    print(f"  🔄 Retrying fetch for item {payload['id']} (attempt {payload['attempt']})")

async def on_save_task_retry(task_name, item_id, error):
    print(f"  🔄 Task {task_name} retrying for item {item_id}: {error}")

async def on_save_task_failure(task_name, item_id, error):
    print(f"  ❌ Task {task_name} failed for item {item_id}: {error}")

async def main():
    # Fetch stage: per-stage retry for connection errors
    fetch_stage = Stage(
        name="Fetch",
        workers=3,
        tasks=[fetch_api_data],
        retry="per_stage",
        stage_attempts=3
    )

    # Process stage: per-task retry for validation
    process_stage = Stage(
        name="Process",
        workers=2,
        tasks=[validate_data],
        retry="per_task",
        task_attempts=3,
        task_wait_seconds=0.5
    )

    # Save stage: per-task retry
    save_stage = Stage(
        name="Save",
        workers=2,
        tasks=[save_to_database],
        retry="per_task",
        task_attempts=5,
        task_wait_seconds=1.0
    )

    pipeline = Pipeline(
        stages=[fetch_stage, process_stage, save_stage]
    )

    items = list(range(15))
    results = await pipeline.run(items)

    stats = pipeline.get_stats()
    print(f"\n📊 Statistics:")
    print(f"  Items processed: {stats.items_processed}")
    print(f"  Items failed: {stats.items_failed}")
    print(f"  Success rate: {stats.items_processed/(stats.items_processed + stats.items_failed)*100:.1f}%")

asyncio.run(main())

Key Features: - Per-stage retry for connection errors (transient failures) - Per-task retry for database writes - Comprehensive callbacks for monitoring - Statistics reporting

Real-World ETL Pipeline

Complete ETL example with data processing, validation, and error handling:

import asyncio
import random
from typing import Any, Dict
from antflow import Pipeline, Stage

class DataProcessor:
    """Example data processor with realistic ETL operations."""

    async def fetch_user_data(self, user_id: int) -> Dict[str, Any]:
        """Fetch user data from external API."""
        await asyncio.sleep(0.1)

        if random.random() < 0.1:
            raise ConnectionError(f"Failed to fetch user {user_id}")

        return {
            "user_id": user_id,
            "name": f"User {user_id}",
            "email": f"user{user_id}@example.com",
            "created_at": "2025-01-01"
        }

    async def validate_user_data(self, data: Dict[str, Any]) -> Dict[str, Any]:
        """Validate user data structure and content."""
        await asyncio.sleep(0.05)

        required_fields = ["user_id", "name", "email"]
        for field in required_fields:
            if field not in data:
                raise ValueError(f"Missing required field: {field}")

        if "@" not in data["email"]:
            raise ValueError(f"Invalid email: {data['email']}")

        data["validated"] = True
        return data

    async def transform_user_data(self, data: Dict[str, Any]) -> Dict[str, Any]:
        """Transform data to target format."""
        await asyncio.sleep(0.05)

        return {
            "id": data["user_id"],
            "full_name": data["name"].upper(),
            "email_address": data["email"].lower(),
            "registration_date": data["created_at"],
            "status": "active"
        }

    async def enrich_with_metadata(self, data: Dict[str, Any]) -> Dict[str, Any]:
        """Enrich data with additional metadata."""
        await asyncio.sleep(0.08)

        data["metadata"] = {
            "processed_at": "2025-10-09",
            "version": "1.0",
            "source": "api"
        }

        return data

    async def calculate_metrics(self, data: Dict[str, Any]) -> Dict[str, Any]:
        """Calculate user metrics."""
        await asyncio.sleep(0.06)

        data["metrics"] = {
            "score": random.randint(1, 100),
            "rank": random.choice(["bronze", "silver", "gold"]),
            "engagement": random.uniform(0, 1)
        }

        return data

    async def save_to_database(self, data: Dict[str, Any]) -> Dict[str, Any]:
        """Save processed data to database."""
        await asyncio.sleep(0.1)

        if random.random() < 0.05:
            raise IOError(f"Database write failed for user {data['id']}")

        data["saved"] = True
        data["db_id"] = f"db_{data['id']}"

        return data

async def on_stage_complete(payload):
    stage = payload.get("stage", "Unknown")
    user_id = payload.get("id", "?")
    print(f"  [{stage}] Completed processing user {user_id}")

async def on_stage_failed(payload):
    stage = payload.get("stage", "Unknown")
    user_id = payload.get("id", "?")
    error = payload.get("error", "Unknown error")
    print(f"  [{stage}] ❌ Failed for user {user_id}: {error}")

async def main():
    print("=" * 60)
    print("Real-World ETL Pipeline: User Data Processing")
    print("=" * 60)
    print()

    processor = DataProcessor()

    # Ingestion stage with high concurrency and retry
    ingestion_stage = Stage(
        name="Ingestion",
        workers=5,
        tasks=[processor.fetch_user_data],
        retry="per_task",
        task_attempts=3,
        task_wait_seconds=1.0,
        on_success=on_stage_complete,
        on_failure=on_stage_failed
    )

    # Validation and transformation
    validation_stage = Stage(
        name="Validation",
        workers=3,
        tasks=[processor.validate_user_data, processor.transform_user_data],
        retry="per_task",
        task_attempts=2,
        task_wait_seconds=0.5,
        on_success=on_stage_complete,
        on_failure=on_stage_failed
    )

    # Data enrichment
    enrichment_stage = Stage(
        name="Enrichment",
        workers=3,
        tasks=[processor.enrich_with_metadata, processor.calculate_metrics],
        retry="per_stage",
        stage_attempts=2,
        on_success=on_stage_complete,
        on_failure=on_stage_failed
    )

    # Database persistence
    persistence_stage = Stage(
        name="Persistence",
        workers=2,
        tasks=[processor.save_to_database],
        retry="per_task",
        task_attempts=5,
        task_wait_seconds=2.0
    )

    pipeline = Pipeline(
        stages=[ingestion_stage, validation_stage, enrichment_stage, persistence_stage],
        collect_results=True
    )

    user_ids = list(range(1, 51))

    print(f"📥 Starting ETL pipeline for {len(user_ids)} users")
    print(f"📊 Pipeline configuration:")
    for stage in pipeline.stages:
        print(f"  - {stage.name}: {stage.workers} workers, "
              f"{len(stage.tasks)} tasks, retry={stage.retry}")
    print()

    results = await pipeline.run(user_ids)

    print()
    print("=" * 60)
    print("Pipeline Execution Complete")
    print("=" * 60)
    print()

    stats = pipeline.get_stats()

    print("📊 Final Statistics:")
    print(f"  Total items: {len(user_ids)}")
    print(f"  Successfully processed: {stats.items_processed}")
    print(f"  Failed: {stats.items_failed}")
    print(f"  Success rate: {stats.items_processed/len(user_ids)*100:.1f}%")

    if results:
        print(f"\n✅ Successfully processed {len(results)} users")
        print("\nSample output (first 3 users):")
        for result in results[:3]:
            data = result['value']
            print(f"\n  User ID: {data['id']}")
            print(f"  Name: {data['full_name']}")
            print(f"  Email: {data['email_address']}")
            print(f"  Metrics: Score={data['metrics']['score']}, "
                  f"Rank={data['metrics']['rank']}")

if __name__ == "__main__":
    random.seed(42)  # For reproducible results
    asyncio.run(main())

Key Features: - Complete ETL workflow with 4 stages - Class-based organization for related operations - Different retry strategies per stage - Comprehensive error handling and monitoring - Statistics and reporting

Monitoring Pipeline Execution

Real-time monitoring of pipeline progress:

import asyncio
from antflow import Pipeline, Stage

async def task1(x): return x + 1
async def task2(x): return x * 2

async def monitor_pipeline(pipeline, interval=1.0):
    """Monitor pipeline execution in real-time."""
    while True:
        stats = pipeline.get_stats()
        print(
            f"📊 Progress: {stats.items_processed} processed, "
            f"{stats.items_failed} failed, "
            f"{stats.items_in_flight} in-flight"
        )
        await asyncio.sleep(interval)

async def main():
    # Define your pipeline stages
    stage1 = Stage(name="Stage1", workers=5, tasks=[task1])
    stage2 = Stage(name="Stage2", workers=3, tasks=[task2])

    pipeline = Pipeline(stages=[stage1, stage2])
    items = range(100)

    # Run monitoring and pipeline concurrently
    async with asyncio.TaskGroup() as tg:
        # Start monitoring
        monitor_task = tg.create_task(monitor_pipeline(pipeline, interval=0.1))

        # Run pipeline
        results = await pipeline.run(items)

        # Stop monitoring
        monitor_task.cancel()

    print(f"Completed: {len(results)} items")

if __name__ == "__main__":
    asyncio.run(main())

Collecting and Analyzing Failures

Track failed items for retry or analysis:

import asyncio
import random
from antflow import Pipeline, Stage

class FailureCollector:
    def __init__(self):
        self.failures = []

    async def on_failure(self, item_id, error, metadata):
        self.failures.append({
            'id': item_id,
            'error': str(error),
            'metadata': metadata
        })

async def risky_task(x):
    if random.random() < 0.5:
        raise ValueError("Random failure")
    return x

async def main():
    collector = FailureCollector()
    items = range(10)

    stage = Stage(
        name="ProcessStage",
        workers=5,
        tasks=[risky_task],
        retry="per_task",
        task_attempts=3,
        on_failure=collector.on_failure
    )

    pipeline = Pipeline(stages=[stage])
    results = await pipeline.run(items)

    # Analyze failures
    print(f"Success: {len(results)} items")
    print(f"Failures: {len(collector.failures)} items")

    if collector.failures:
        print("\nFailed items:")
        for failure in collector.failures:
            print(f"  ID {failure['id']}: {failure['error']}")

        # Retry failed items with different configuration
        retry_stage = Stage(
            name="Retry",
            workers=2,
            tasks=[risky_task],
            retry="per_task",
            task_attempts=10,
            task_wait_seconds=0.1
        )

        retry_pipeline = Pipeline(stages=[retry_stage])
        # In a real scenario, you'd extract the original values to retry
        retry_items = [f['id'] for f in collector.failures] 
        retry_results = await retry_pipeline.run(retry_items)

        print(f"\nRetry results: {len(retry_results)} recovered")

if __name__ == "__main__":
    asyncio.run(main())

Context Manager with Cleanup

Use context managers for automatic resource cleanup:

import asyncio
from antflow import Pipeline, Stage

async def task(x): return x

async def main():
    stage1 = Stage(name="S1", workers=1, tasks=[task])
    stage2 = Stage(name="S2", workers=1, tasks=[task])
    stage3 = Stage(name="S3", workers=1, tasks=[task])
    items = range(5)

    async with Pipeline(stages=[stage1, stage2, stage3]) as pipeline:
        # Pipeline runs and automatically cleans up
        results = await pipeline.run(items)

        # Do something with results
        print(f"Processed {len(results)} items")

    # Pipeline is automatically shut down here
    print("Pipeline cleaned up")

if __name__ == "__main__":
    asyncio.run(main())

Next Steps