Skip to content

Error Handling

AntFlow provides a robust error handling system with custom exceptions and flexible retry strategies.

Exception Hierarchy

AntFlow defines a clear exception hierarchy for different error scenarios:

AntFlowError                    # Base exception
├── ExecutorShutdownError         # Executor has been shut down
├── PipelineError                 # Pipeline-specific errors
   └── StageValidationError      # Invalid stage configuration
└── TaskFailedError               # Task execution failure

Exception Types

AntFlowError

Base exception for all AntFlow errors:

from antflow import AntFlowError

try:
    # AntFlow operations
    pass
except AntFlowError as e:
    print(f"AntFlow error: {e}")

ExecutorShutdownError

Raised when attempting to use a shut down executor:

from antflow import AsyncExecutor, ExecutorShutdownError

executor = AsyncExecutor(max_workers=5)
await executor.shutdown()

try:
    executor.submit(some_task, arg)
except ExecutorShutdownError:
    print("Cannot submit to shutdown executor")

PipelineError

Base exception for pipeline-specific errors:

from antflow import Pipeline, PipelineError

try:
    # Example: Attempting invalid operation
    raise PipelineError("Cannot modify pipeline while running")
except PipelineError as e:
    print(f"Pipeline error: {e}")

StageValidationError

Raised when stage configuration is invalid:

from antflow import Stage, StageValidationError

try:
    stage = Stage(
        name="Invalid",
        workers=0,  # Invalid: must be >= 1
        tasks=[my_task]
    )
    stage.validate()
except StageValidationError as e:
    print(f"Invalid stage: {e}")

TaskFailedError

Wrapper for task failures that preserves the original exception:

from antflow import TaskFailedError

# The original exception is available
try:
    # ... task execution
    pass
except TaskFailedError as e:
    print(f"Task {e.task_name} failed")
    print(f"Original error: {e.original_exception}")

Handling Task Failures

In AsyncExecutor

Exceptions in executor tasks are propagated to the caller:

from antflow import AsyncExecutor

async def failing_task(x):
    if x < 0:
        raise ValueError("Negative value not allowed")
    return x * 2

async with AsyncExecutor(max_workers=3) as executor:
    future = executor.submit(failing_task, -5)

    try:
        result = await future.result()
    except ValueError as e:
        print(f"Task failed: {e}")
        # Handle the error appropriately

### Automatic Retries

You can also configure automatic retries for `AsyncExecutor` tasks:

```python
# Retry 3 times with 0.1s delay
future = executor.submit(failing_task, -5, retries=3, retry_delay=0.1)
### In Pipeline

Pipeline provides multiple ways to handle failures:

#### Stage-Level Callbacks

```python
from antflow import Stage

async def on_failure(payload):
    item_id = payload['id']
    error = payload['error']
    print(f"Stage failed for item {item_id}: {error}")
    # Log to monitoring system, send alert, etc.

stage = Stage(
    name="ProcessStage",
    workers=3,
    tasks=[risky_task],
    on_failure=on_failure
)

Task-Level Callbacks

from antflow import Stage

async def on_task_failure(task_name, item_id, error):
    print(f"Task {task_name} failed for item {item_id}")
    print(f"Error: {error}")
    # Record failure for analysis

stage = Stage(
    name="DetailedStage",
    workers=2,
    tasks=[task1, task2],
    on_task_failure=on_task_failure
)

Collecting Failed Items

from antflow import Pipeline, Stage

failed_items = []

async def collect_failures(payload):
    failed_items.append({
        'id': payload['id'],
        'error': payload['error'],
        'stage': payload['stage']
    })

stage = Stage(
    name="MyStage",
    workers=3,
    tasks=[my_task],
    retry="per_task",
    task_attempts=3,
    on_failure=collect_failures
)

pipeline = Pipeline(stages=[stage])
results = await pipeline.run(items)

# Analyze failures
print(f"Succeeded: {len(results)}")
print(f"Failed: {len(failed_items)}")
for failed in failed_items:
    print(f"  Item {failed['id']}: {failed['error']}")

Retry Strategies

AntFlow provides two retry strategies for pipelines:

Per-Task Retry

Each task retries independently using tenacity:

from antflow import Stage

stage = Stage(
    name="RobustStage",
    workers=5,
    tasks=[api_call],
    retry="per_task",
    task_attempts=5,
    task_wait_seconds=2.0
)

If a task fails after all retries, the stage fails for that item.

Per-Stage Retry

The entire stage retries on any task failure:

from antflow import Stage

stage = Stage(
    name="TransactionalStage",
    workers=2,
    tasks=[begin_tx, update_data, commit_tx],
    retry="per_stage",
    stage_attempts=3
)

Failed items are re-queued at the beginning of the stage.

Extracting Original Exceptions

AntFlow automatically extracts original exceptions from retry wrappers:

from antflow.utils import extract_exception
from tenacity import RetryError

# In callbacks, errors are already extracted
async def on_failure(payload):
    error = payload['error']  # Already the original exception message
    print(f"Original error: {error}")

# Manual extraction if needed
try:
    # ... operation with retry
    pass
except RetryError as e:
    original = extract_exception(e)
    print(f"Original exception: {original}")

Best Practices

1. Use Specific Exceptions

Catch specific exceptions rather than broad Exception types:

from antflow import AsyncExecutor, ExecutorShutdownError
import logging

logger = logging.getLogger(__name__)

try:
    result = await executor.submit(task, arg)
except ExecutorShutdownError:
    # Handle shutdown specifically
    logger.warning("Executor was shut down")
except ValueError:
    # Handle validation errors
    logger.error("Invalid input")
except Exception as e:
    # Catch-all for unexpected errors
    logger.exception("Unexpected error", exc_info=e)

2. Always Set Failure Callbacks

In production, always configure failure callbacks:

from antflow import Stage
import logging

logger = logging.getLogger(__name__)

async def log_failure(payload):
    logger.error(
        "Stage failure",
        extra={
            'item_id': payload['id'],
            'stage': payload['stage'],
            'error': payload['error']
        }
    )

stage = Stage(
    name="ProductionStage",
    workers=10,
    tasks=[process],
    on_failure=log_failure
)

3. Configure Appropriate Retries

Choose retry strategies based on your use case:

  • Idempotent operations: Use per-task retry with high attempts
  • Transactional operations: Use per-stage retry
  • External APIs: Use longer wait times between retries
from antflow import Stage

# For external API calls
api_stage = Stage(
    name="API",
    workers=5,
    tasks=[call_api],
    retry="per_task",
    task_attempts=5,
    task_wait_seconds=3.0
)

# For transactional database operations
db_stage = Stage(
    name="Database",
    workers=2,
    tasks=[begin_tx, insert, update, commit],
    retry="per_stage",
    stage_attempts=3
)

4. Monitor Failure Rates

Track failures to identify issues:

stats = pipeline.get_stats()
failure_rate = stats.items_failed / (stats.items_processed + stats.items_failed)

if failure_rate > 0.1:  # More than 10% failures
    logger.warning(f"High failure rate: {failure_rate:.2%}")
    # Alert, adjust retry settings, etc.

5. Graceful Degradation

Handle failures gracefully without stopping the entire pipeline:

from antflow import Pipeline

async def on_failure(payload):
    # Log the failure
    logger.error(f"Item {payload['id']} failed: {payload['error']}")

    # Store for later retry
    await failed_queue.put(payload)

    # Update metrics
    metrics.increment('pipeline.failures')

# Pipeline continues processing other items
pipeline = Pipeline(stages=[stage])
results = await pipeline.run(items)

# Process failures separately
await retry_failed_items(failed_queue)

Example: Robust ETL Pipeline

Here's a complete example with comprehensive error handling:

import asyncio
import logging
from antflow import Pipeline, Stage

logger = logging.getLogger(__name__)

# Track failures
failures = []

async def on_stage_failure(payload):
    failures.append(payload)
    logger.error(
        f"Stage {payload['stage']} failed for item {payload['id']}: "
        f"{payload['error']}"
    )

async def on_task_retry(task_name, item_id, error):
    logger.warning(f"Retrying {task_name} for item {item_id}: {error}")

async def fetch_data(item_id):
    # May fail due to network issues
    ...

async def validate_data(data):
    # May fail due to invalid data
    if not data.get('required_field'):
        raise ValueError("Missing required field")
    return data

async def save_data(data):
    # May fail due to database issues
    ...

async def main():
    # Fetch stage: retry on network errors
    fetch_stage = Stage(
        name="Fetch",
        workers=10,
        tasks=[fetch_data],
        retry="per_task",
        task_attempts=5,
        task_wait_seconds=2.0,
        on_failure=on_stage_failure,
        on_task_retry=on_task_retry
    )

    # Validate stage: don't retry validation errors
    validate_stage = Stage(
        name="Validate",
        workers=5,
        tasks=[validate_data],
        retry="per_task",
        task_attempts=1,
        on_failure=on_stage_failure
    )

    # Save stage: retry on transient database errors
    save_stage = Stage(
        name="Save",
        workers=3,
        tasks=[save_data],
        retry="per_task",
        task_attempts=5,
        task_wait_seconds=3.0,
        on_failure=on_stage_failure,
        on_task_retry=on_task_retry
    )

    pipeline = Pipeline(
        stages=[fetch_stage, validate_stage, save_stage]
    )

    items = range(100)
    results = await pipeline.run(items)

    # Report results
    stats = pipeline.get_stats()
    logger.info(f"Processed: {stats.items_processed}")
    logger.info(f"Failed: {stats.items_failed}")
    logger.info(f"Success rate: {stats.items_processed/len(items)*100:.1f}%")

    # Handle failures
    if failures:
        logger.warning(f"{len(failures)} items require manual intervention")
        for failure in failures:
            # Store in dead letter queue, send alert, etc.
            await handle_permanent_failure(failure)

asyncio.run(main())

Debugging Tips

Enable Debug Logging

import logging

logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger('antflow')
logger.setLevel(logging.DEBUG)

Use Task-Level Callbacks

Get detailed information about task execution:

import logging
from antflow import Stage

logger = logging.getLogger(__name__)

async def on_task_start(task_name, item_id, value):
    logger.debug(f"Starting {task_name} for item {item_id}")

async def on_task_failure(task_name, item_id, error):
    logger.error(f"Task {task_name} failed for item {item_id}: {error}")
    # Include stack trace in logs
    logger.exception("Full traceback:", exc_info=error)

stage = Stage(
    name="Debug",
    workers=1,
    tasks=[task],
    on_task_start=on_task_start,
    on_task_failure=on_task_failure
)

Check Pipeline Stats

Monitor pipeline health during execution:

import asyncio
import logging

logger = logging.getLogger(__name__)

async def monitor_pipeline(pipeline):
    while True:
        stats = pipeline.get_stats()
        logger.info(
            f"Progress: {stats.items_processed} processed, "
            f"{stats.items_failed} failed, "
            f"{stats.items_in_flight} in-flight"
        )
        await asyncio.sleep(5.0)

# Run monitoring concurrently with pipeline
async with asyncio.TaskGroup() as tg:
    tg.create_task(monitor_pipeline(pipeline))
    results = await pipeline.run(items)