Skip to content

Error Handling

AntFlow provides a robust error handling system with custom exceptions and flexible retry strategies.

Exception Hierarchy

AntFlow defines a clear exception hierarchy for different error scenarios:

AntFlowError                    # Base exception
├── ExecutorShutdownError         # Executor has been shut down
├── PipelineError                 # Pipeline-specific errors
   └── StageValidationError      # Invalid stage configuration
└── TaskFailedError               # Task execution failure

Exception Types

AntFlowError

Base exception for all AntFlow errors:

from antflow import AntFlowError

try:
    # AntFlow operations
    pass
except AntFlowError as e:
    print(f"AntFlow error: {e}")

ExecutorShutdownError

Raised when attempting to use a shut down executor:

from antflow import AsyncExecutor, ExecutorShutdownError

executor = AsyncExecutor(max_workers=5)
await executor.shutdown()

try:
    executor.submit(some_task, arg)
except ExecutorShutdownError:
    print("Cannot submit to shutdown executor")

PipelineError

Base exception for pipeline-specific errors:

from antflow import Pipeline, PipelineError

try:
    # Example: Attempting invalid operation
    raise PipelineError("Cannot modify pipeline while running")
except PipelineError as e:
    print(f"Pipeline error: {e}")

StageValidationError

Raised when stage configuration is invalid:

from antflow import Stage, StageValidationError

try:
    stage = Stage(
        name="Invalid",
        workers=0,  # Invalid: must be >= 1
        tasks=[my_task]
    )
    stage.validate()
except StageValidationError as e:
    print(f"Invalid stage: {e}")

TaskFailedError

Wrapper for task failures that preserves the original exception:

from antflow import TaskFailedError

# The original exception is available
try:
    # ... task execution
    pass
except TaskFailedError as e:
    print(f"Task {e.task_name} failed")
    print(f"Original error: {e.original_exception}")

Handling Task Failures

In AsyncExecutor

Exceptions in executor tasks are propagated to the caller:

from antflow import AsyncExecutor

async def failing_task(x):
    if x < 0:
        raise ValueError("Negative value not allowed")
    return x * 2

async with AsyncExecutor(max_workers=3) as executor:
    future = executor.submit(failing_task, -5)

    try:
        result = await future.result()
    except ValueError as e:
        print(f"Task failed: {e}")
        # Handle the error appropriately

### Automatic Retries

You can also configure automatic retries for `AsyncExecutor` tasks:

```python
# Retry 3 times with 0.1s delay
future = executor.submit(failing_task, -5, retries=3, retry_delay=0.1)
### In Pipeline

Pipeline provides multiple ways to handle failures:

#### Stage-Level Callbacks

```python
from antflow import Stage

async def on_failure(item_id, error, metadata):
    print(f"Stage failed for item {item_id}: {error}")
    # Log to monitoring system, send alert, etc.

stage = Stage(
    name="ProcessStage",
    workers=3,
    tasks=[risky_task],
    on_failure=on_failure
)

Task-Level Callbacks (via StatusTracker)

For granular tracking, use task-level callbacks on the StatusTracker:

from antflow import StatusTracker, TaskEvent

async def on_task_fail(event: TaskEvent):
    print(f"Task {event.task_name} failed for item {event.item_id}")
    print(f"Error: {event.error}")

tracker = StatusTracker(
    on_task_fail=on_task_fail
)

Collecting Failed Items

from antflow import Pipeline, Stage

failed_items = []

async def collect_failures(item_id, error, metadata):
    failed_items.append({
        'id': item_id,
        'error': error
    })

stage = Stage(
    name="MyStage",
    workers=3,
    tasks=[my_task],
    retry="per_task",
    task_attempts=3,
    on_failure=collect_failures
)

pipeline = Pipeline(stages=[stage])
results = await pipeline.run(items)

# Analyze failures
print(f"Succeeded: {len(results)}")
print(f"Failed: {len(failed_items)}")
for failed in failed_items:
    print(f"  Item {failed['id']}: {failed['error']}")

## Error Summary API

As of version 0.6.0, AntFlow provides a high-level error summary API. If a `StatusTracker` is configured, it provides a detailed report of all failures.

```python
results = await pipeline.run(items)
summary = pipeline.get_error_summary()

print(f"Total failed: {summary.total_failed}")

# Grouped by stage
for stage, count in summary.errors_by_stage.items():
    print(f"Stage {stage}: {count} errors")

# Grouped by error type
for err_type, count in summary.errors_by_type.items():
    print(f"Error {err_type}: {count} occurrences")

# List of all failed items with details
for item in summary.failed_items:
    print(f"Item {item.item_id} failed in {item.stage} "
          f"after {item.attempts} attempts. Error: {item.error}")

Retry Strategies

AntFlow provides two retry strategies for pipelines:

Per-Task Retry

Each task retries independently using tenacity:

from antflow import Stage

stage = Stage(
    name="RobustStage",
    workers=5,
    tasks=[api_call],
    retry="per_task",
    task_attempts=5,
    task_wait_seconds=2.0
)

If a task fails after all retries, the stage fails for that item.

Per-Stage Retry

The entire stage retries on any task failure:

from antflow import Stage

stage = Stage(
    name="TransactionalStage",
    workers=2,
    tasks=[begin_tx, update_data, commit_tx],
    retry="per_stage",
    stage_attempts=3
)

Failed items are re-queued at the beginning of the stage.

Extracting Original Exceptions

AntFlow automatically extracts original exceptions from retry wrappers:

from antflow.utils import extract_exception
from tenacity import RetryError

# In callbacks, errors are already extracted
async def on_failure(item_id, error, metadata):
    print(f"Original error: {error}")

# Manual extraction if needed
try:
    # ... operation with retry
    pass
except RetryError as e:
    original = extract_exception(e)
    print(f"Original exception: {original}")

Best Practices

1. Use Specific Exceptions

Catch specific exceptions rather than broad Exception types:

from antflow import AsyncExecutor, ExecutorShutdownError
import logging

logger = logging.getLogger(__name__)

try:
    result = await executor.submit(task, arg)
except ExecutorShutdownError:
    # Handle shutdown specifically
    logger.warning("Executor was shut down")
except ValueError:
    # Handle validation errors
    logger.error("Invalid input")
except Exception as e:
    # Catch-all for unexpected errors
    logger.exception("Unexpected error", exc_info=e)

2. Always Set Failure Callbacks

In production, always configure failure callbacks:

from antflow import Stage
import logging

logger = logging.getLogger(__name__)

async def log_failure(item_id, error, metadata):
    logger.error(
        f"Stage failure for item {item_id}: {error}"
    )

stage = Stage(
    name="ProductionStage",
    workers=10,
    tasks=[process],
    on_failure=log_failure
)

3. Configure Appropriate Retries

Choose retry strategies based on your use case:

  • Idempotent operations: Use per-task retry with high attempts
  • Transactional operations: Use per-stage retry
  • External APIs: Use longer wait times between retries
from antflow import Stage

# For external API calls
api_stage = Stage(
    name="API",
    workers=5,
    tasks=[call_api],
    retry="per_task",
    task_attempts=5,
    task_wait_seconds=3.0
)

# For transactional database operations
db_stage = Stage(
    name="Database",
    workers=2,
    tasks=[begin_tx, insert, update, commit],
    retry="per_stage",
    stage_attempts=3
)

4. Monitor Failure Rates

Track failures to identify issues:

stats = pipeline.get_stats()
failure_rate = stats.items_failed / (stats.items_processed + stats.items_failed)

if failure_rate > 0.1:  # More than 10% failures
    logger.warning(f"High failure rate: {failure_rate:.2%}")
    # Alert, adjust retry settings, etc.

5. Graceful Degradation

Handle failures gracefully without stopping the entire pipeline:

from antflow import Pipeline

async def on_failure(item_id, error, metadata):
    # Log the failure
    logger.error(f"Item {item_id} failed: {error}")

    # Store for later retry
    await failed_queue.put({"id": item_id, "error": error})

# Pipeline continues processing other items
pipeline = Pipeline(stages=[stage])
results = await pipeline.run(items)

# Process failures separately
await retry_failed_items(failed_queue)

Example: Robust ETL Pipeline

Here's a complete example with comprehensive error handling:

import asyncio
import logging
from antflow import Pipeline, Stage

logger = logging.getLogger(__name__)

async def on_stage_failure(item_id, error, metadata):
    logger.error(f"Stage failure for item {item_id}: {error}")

async def on_task_retry(event):
    logger.warning(f"Retrying {event.task_name} for item {event.item_id}: {event.error}")

async def fetch_data(item_id):
    # May fail due to network issues
    ...

async def validate_data(data):
    # May fail due to invalid data
    if not data.get('required_field'):
        raise ValueError("Missing required field")
    return data

async def save_data(data):
    # May fail due to database issues
    ...

async def main():
    # Tracker handles task-level events
    tracker = StatusTracker(on_task_retry=on_task_retry)

    # Fetch stage: retry on network errors
    fetch_stage = Stage(
        name="Fetch",
        workers=10,
        tasks=[fetch_data],
        retry="per_task",
        task_attempts=5,
        task_wait_seconds=2.0,
        on_failure=on_stage_failure
    )

    # Validate stage: don't retry validation errors
    validate_stage = Stage(
        name="Validate",
        workers=5,
        tasks=[validate_data],
        retry="per_task",
        task_attempts=1,
        on_failure=on_stage_failure
    )

    # Save stage: retry on transient database errors
    save_stage = Stage(
        name="Save",
        workers=3,
        tasks=[save_data],
        retry="per_task",
        task_attempts=5,
        task_wait_seconds=3.0,
        on_failure=on_stage_failure
    )

    pipeline = Pipeline(
        stages=[fetch_stage, validate_stage, save_stage],
        status_tracker=tracker
    )

    items = range(100)
    results = await pipeline.run(items)

    # Report results using Error Summary API
    summary = pipeline.get_error_summary()
    logger.info(f"Processed: {len(results)}")
    logger.info(f"Failed: {summary.total_failed}")

    if summary.total_failed > 0:
        logger.warning(f"Errors by stage: {summary.errors_by_stage}")

asyncio.run(main())

Debugging Tips

Enable Debug Logging

import logging

logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger('antflow')
logger.setLevel(logging.DEBUG)

Use Task-Level Callbacks (via StatusTracker)

Get detailed information about task execution:

import logging
from antflow import StatusTracker, TaskEvent

logger = logging.getLogger(__name__)

async def on_task_start(event: TaskEvent):
    logger.debug(f"Starting {event.task_name} for item {event.item_id}")

async def on_task_fail(event: TaskEvent):
    logger.error(f"Task {event.task_name} failed for item {event.item_id}: {event.error}")

tracker = StatusTracker(
    on_task_start=on_task_start,
    on_task_fail=on_task_fail
)

pipeline = Pipeline(stages=[stage], status_tracker=tracker)

Check Pipeline Stats

Monitor pipeline health during execution:

import asyncio
import logging

logger = logging.getLogger(__name__)

async def monitor_pipeline(pipeline):
    while True:
        stats = pipeline.get_stats()
        logger.info(
            f"Progress: {stats.items_processed} processed, "
            f"{stats.items_failed} failed, "
            f"{stats.items_in_flight} in-flight"
        )
        await asyncio.sleep(5.0)

# Run monitoring concurrently with pipeline
async with asyncio.TaskGroup() as tg:
    tg.create_task(monitor_pipeline(pipeline))
    results = await pipeline.run(items)