Error Handling¶
AntFlow provides a robust error handling system with custom exceptions and flexible retry strategies.
Exception Hierarchy¶
AntFlow defines a clear exception hierarchy for different error scenarios:
AntFlowError # Base exception
├── ExecutorShutdownError # Executor has been shut down
├── PipelineError # Pipeline-specific errors
│ └── StageValidationError # Invalid stage configuration
└── TaskFailedError # Task execution failure
Exception Types¶
AntFlowError¶
Base exception for all AntFlow errors:
from antflow import AntFlowError
try:
# AntFlow operations
pass
except AntFlowError as e:
print(f"AntFlow error: {e}")
ExecutorShutdownError¶
Raised when attempting to use a shut down executor:
from antflow import AsyncExecutor, ExecutorShutdownError
executor = AsyncExecutor(max_workers=5)
await executor.shutdown()
try:
executor.submit(some_task, arg)
except ExecutorShutdownError:
print("Cannot submit to shutdown executor")
PipelineError¶
Base exception for pipeline-specific errors:
from antflow import Pipeline, PipelineError
try:
# Example: Attempting invalid operation
raise PipelineError("Cannot modify pipeline while running")
except PipelineError as e:
print(f"Pipeline error: {e}")
StageValidationError¶
Raised when stage configuration is invalid:
from antflow import Stage, StageValidationError
try:
stage = Stage(
name="Invalid",
workers=0, # Invalid: must be >= 1
tasks=[my_task]
)
stage.validate()
except StageValidationError as e:
print(f"Invalid stage: {e}")
TaskFailedError¶
Wrapper for task failures that preserves the original exception:
from antflow import TaskFailedError
# The original exception is available
try:
# ... task execution
pass
except TaskFailedError as e:
print(f"Task {e.task_name} failed")
print(f"Original error: {e.original_exception}")
Handling Task Failures¶
In AsyncExecutor¶
Exceptions in executor tasks are propagated to the caller:
from antflow import AsyncExecutor
async def failing_task(x):
if x < 0:
raise ValueError("Negative value not allowed")
return x * 2
async with AsyncExecutor(max_workers=3) as executor:
future = executor.submit(failing_task, -5)
try:
result = await future.result()
except ValueError as e:
print(f"Task failed: {e}")
# Handle the error appropriately
### Automatic Retries
You can also configure automatic retries for `AsyncExecutor` tasks:
```python
# Retry 3 times with 0.1s delay
future = executor.submit(failing_task, -5, retries=3, retry_delay=0.1)
### In Pipeline
Pipeline provides multiple ways to handle failures:
#### Stage-Level Callbacks
```python
from antflow import Stage
async def on_failure(item_id, error, metadata):
print(f"Stage failed for item {item_id}: {error}")
# Log to monitoring system, send alert, etc.
stage = Stage(
name="ProcessStage",
workers=3,
tasks=[risky_task],
on_failure=on_failure
)
Task-Level Callbacks (via StatusTracker)¶
For granular tracking, use task-level callbacks on the StatusTracker:
from antflow import StatusTracker, TaskEvent
async def on_task_fail(event: TaskEvent):
print(f"Task {event.task_name} failed for item {event.item_id}")
print(f"Error: {event.error}")
tracker = StatusTracker(
on_task_fail=on_task_fail
)
Collecting Failed Items¶
from antflow import Pipeline, Stage
failed_items = []
async def collect_failures(item_id, error, metadata):
failed_items.append({
'id': item_id,
'error': error
})
stage = Stage(
name="MyStage",
workers=3,
tasks=[my_task],
retry="per_task",
task_attempts=3,
on_failure=collect_failures
)
pipeline = Pipeline(stages=[stage])
results = await pipeline.run(items)
# Analyze failures
print(f"Succeeded: {len(results)}")
print(f"Failed: {len(failed_items)}")
for failed in failed_items:
print(f" Item {failed['id']}: {failed['error']}")
## Error Summary API
As of version 0.6.0, AntFlow provides a high-level error summary API. If a `StatusTracker` is configured, it provides a detailed report of all failures.
```python
results = await pipeline.run(items)
summary = pipeline.get_error_summary()
print(f"Total failed: {summary.total_failed}")
# Grouped by stage
for stage, count in summary.errors_by_stage.items():
print(f"Stage {stage}: {count} errors")
# Grouped by error type
for err_type, count in summary.errors_by_type.items():
print(f"Error {err_type}: {count} occurrences")
# List of all failed items with details
for item in summary.failed_items:
print(f"Item {item.item_id} failed in {item.stage} "
f"after {item.attempts} attempts. Error: {item.error}")
Retry Strategies¶
AntFlow provides two retry strategies for pipelines:
Per-Task Retry¶
Each task retries independently using tenacity:
from antflow import Stage
stage = Stage(
name="RobustStage",
workers=5,
tasks=[api_call],
retry="per_task",
task_attempts=5,
task_wait_seconds=2.0
)
If a task fails after all retries, the stage fails for that item.
Per-Stage Retry¶
The entire stage retries on any task failure:
from antflow import Stage
stage = Stage(
name="TransactionalStage",
workers=2,
tasks=[begin_tx, update_data, commit_tx],
retry="per_stage",
stage_attempts=3
)
Failed items are re-queued at the beginning of the stage.
Extracting Original Exceptions¶
AntFlow automatically extracts original exceptions from retry wrappers:
from antflow.utils import extract_exception
from tenacity import RetryError
# In callbacks, errors are already extracted
async def on_failure(item_id, error, metadata):
print(f"Original error: {error}")
# Manual extraction if needed
try:
# ... operation with retry
pass
except RetryError as e:
original = extract_exception(e)
print(f"Original exception: {original}")
Best Practices¶
1. Use Specific Exceptions¶
Catch specific exceptions rather than broad Exception types:
from antflow import AsyncExecutor, ExecutorShutdownError
import logging
logger = logging.getLogger(__name__)
try:
result = await executor.submit(task, arg)
except ExecutorShutdownError:
# Handle shutdown specifically
logger.warning("Executor was shut down")
except ValueError:
# Handle validation errors
logger.error("Invalid input")
except Exception as e:
# Catch-all for unexpected errors
logger.exception("Unexpected error", exc_info=e)
2. Always Set Failure Callbacks¶
In production, always configure failure callbacks:
from antflow import Stage
import logging
logger = logging.getLogger(__name__)
async def log_failure(item_id, error, metadata):
logger.error(
f"Stage failure for item {item_id}: {error}"
)
stage = Stage(
name="ProductionStage",
workers=10,
tasks=[process],
on_failure=log_failure
)
3. Configure Appropriate Retries¶
Choose retry strategies based on your use case:
- Idempotent operations: Use per-task retry with high attempts
- Transactional operations: Use per-stage retry
- External APIs: Use longer wait times between retries
from antflow import Stage
# For external API calls
api_stage = Stage(
name="API",
workers=5,
tasks=[call_api],
retry="per_task",
task_attempts=5,
task_wait_seconds=3.0
)
# For transactional database operations
db_stage = Stage(
name="Database",
workers=2,
tasks=[begin_tx, insert, update, commit],
retry="per_stage",
stage_attempts=3
)
4. Monitor Failure Rates¶
Track failures to identify issues:
stats = pipeline.get_stats()
failure_rate = stats.items_failed / (stats.items_processed + stats.items_failed)
if failure_rate > 0.1: # More than 10% failures
logger.warning(f"High failure rate: {failure_rate:.2%}")
# Alert, adjust retry settings, etc.
5. Graceful Degradation¶
Handle failures gracefully without stopping the entire pipeline:
from antflow import Pipeline
async def on_failure(item_id, error, metadata):
# Log the failure
logger.error(f"Item {item_id} failed: {error}")
# Store for later retry
await failed_queue.put({"id": item_id, "error": error})
# Pipeline continues processing other items
pipeline = Pipeline(stages=[stage])
results = await pipeline.run(items)
# Process failures separately
await retry_failed_items(failed_queue)
Example: Robust ETL Pipeline¶
Here's a complete example with comprehensive error handling:
import asyncio
import logging
from antflow import Pipeline, Stage
logger = logging.getLogger(__name__)
async def on_stage_failure(item_id, error, metadata):
logger.error(f"Stage failure for item {item_id}: {error}")
async def on_task_retry(event):
logger.warning(f"Retrying {event.task_name} for item {event.item_id}: {event.error}")
async def fetch_data(item_id):
# May fail due to network issues
...
async def validate_data(data):
# May fail due to invalid data
if not data.get('required_field'):
raise ValueError("Missing required field")
return data
async def save_data(data):
# May fail due to database issues
...
async def main():
# Tracker handles task-level events
tracker = StatusTracker(on_task_retry=on_task_retry)
# Fetch stage: retry on network errors
fetch_stage = Stage(
name="Fetch",
workers=10,
tasks=[fetch_data],
retry="per_task",
task_attempts=5,
task_wait_seconds=2.0,
on_failure=on_stage_failure
)
# Validate stage: don't retry validation errors
validate_stage = Stage(
name="Validate",
workers=5,
tasks=[validate_data],
retry="per_task",
task_attempts=1,
on_failure=on_stage_failure
)
# Save stage: retry on transient database errors
save_stage = Stage(
name="Save",
workers=3,
tasks=[save_data],
retry="per_task",
task_attempts=5,
task_wait_seconds=3.0,
on_failure=on_stage_failure
)
pipeline = Pipeline(
stages=[fetch_stage, validate_stage, save_stage],
status_tracker=tracker
)
items = range(100)
results = await pipeline.run(items)
# Report results using Error Summary API
summary = pipeline.get_error_summary()
logger.info(f"Processed: {len(results)}")
logger.info(f"Failed: {summary.total_failed}")
if summary.total_failed > 0:
logger.warning(f"Errors by stage: {summary.errors_by_stage}")
asyncio.run(main())
Debugging Tips¶
Enable Debug Logging¶
import logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger('antflow')
logger.setLevel(logging.DEBUG)
Use Task-Level Callbacks (via StatusTracker)¶
Get detailed information about task execution:
import logging
from antflow import StatusTracker, TaskEvent
logger = logging.getLogger(__name__)
async def on_task_start(event: TaskEvent):
logger.debug(f"Starting {event.task_name} for item {event.item_id}")
async def on_task_fail(event: TaskEvent):
logger.error(f"Task {event.task_name} failed for item {event.item_id}: {event.error}")
tracker = StatusTracker(
on_task_start=on_task_start,
on_task_fail=on_task_fail
)
pipeline = Pipeline(stages=[stage], status_tracker=tracker)
Check Pipeline Stats¶
Monitor pipeline health during execution:
import asyncio
import logging
logger = logging.getLogger(__name__)
async def monitor_pipeline(pipeline):
while True:
stats = pipeline.get_stats()
logger.info(
f"Progress: {stats.items_processed} processed, "
f"{stats.items_failed} failed, "
f"{stats.items_in_flight} in-flight"
)
await asyncio.sleep(5.0)
# Run monitoring concurrently with pipeline
async with asyncio.TaskGroup() as tg:
tg.create_task(monitor_pipeline(pipeline))
results = await pipeline.run(items)