StatusTracker API¶
The antflow.tracker module provides real-time monitoring and event tracking for pipelines.
Overview¶
The StatusTracker is the observability layer of AntFlow. It allows you to: 1. Monitor Items: Track the lifecycle of data items as they move through stages. 2. Monitor Tasks: Get granular events for every task execution (start, success, retry, failure). 3. Build Dashboards: Use the event stream to power real-time UIs.
Usage Example¶
from antflow import Pipeline, Stage, StatusTracker
# 1. Define a custom handler
async def on_event(event):
if event.status == "failed":
print(f"🚨 Alert: Item {event.item_id} failed in {event.stage}")
# 2. Initialize tracker
tracker = StatusTracker(on_status_change=on_event)
# 3. Attach to pipeline
pipeline = Pipeline(stages=[...], status_tracker=tracker)
# 4. Run
await pipeline.run(items)
Customizing Behavior¶
You can customize how status changes are handled by providing callbacks or subclassing StatusTracker.
Using Callbacks¶
The easiest way to react to events is by passing async callbacks during initialization:
async def on_fail(event: TaskEvent):
# Send alert to Slack/Discord
await send_alert(f"Item {event.item_id} failed: {event.error}")
tracker = StatusTracker(
on_task_fail=on_fail,
on_status_change=lambda e: print(f"Status: {e.status}")
)
Accessing Worker & Job Status¶
The StatusTracker focuses on Item progress. To monitor Workers, use the Pipeline methods:
# Item Status (via Tracker)
item_status = tracker.get_status(item_id=123)
# Worker Status (via Pipeline)
worker_states = pipeline.get_worker_states()
for name, state in worker_states.items():
print(f"Worker {name}: {state.status}")
Available Callbacks¶
You can provide these async callbacks to the StatusTracker constructor to react to specific events:
| Callback | Signature | Description |
|---|---|---|
on_status_change |
async def fn(event: StatusEvent) |
Triggered when an item's status changes (queued, in_progress, completed, failed). |
on_task_start |
async def fn(event: TaskEvent) |
Triggered when a specific task function starts execution. |
on_task_complete |
async def fn(event: TaskEvent) |
Triggered when a task function completes successfully. |
on_task_retry |
async def fn(event: TaskEvent) |
Triggered when a task fails but will be retried. |
on_task_fail |
async def fn(event: TaskEvent) |
Triggered when a task fails permanently (after all retries). |
Event Objects¶
For detailed properties of StatusEvent and TaskEvent, please refer to the Types API documentation.
Event Types¶
For quick reference, here are the possible values for status and event types:
Item Status (StatusEvent.status)¶
| Value | Description |
|---|---|
queued |
Item has been added to a stage's input queue. |
in_progress |
Worker has picked up the item and started processing. |
completed |
All tasks in the stage finished successfully. |
failed |
Stage execution failed (after all retries). |
retrying |
Item failed processing and is waiting to be retried (per-stage retry). Metadata includes attempt and error. |
Task Events (TaskEvent.event_type)¶
| Value | Description |
|---|---|
start |
A specific task function started running. |
complete |
Task function returned successfully. |
retry |
Task failed but has retries remaining. |
fail |
Task failed and has no retries left. |
Class Reference¶
StatusTracker¶
StatusTracker
¶
StatusTracker(on_status_change: Optional[Callable[[StatusEvent], Awaitable[None]]] = None, on_task_start: Optional[Callable[[TaskEvent], Awaitable[None]]] = None, on_task_complete: Optional[Callable[[TaskEvent], Awaitable[None]]] = None, on_task_retry: Optional[Callable[[TaskEvent], Awaitable[None]]] = None, on_task_fail: Optional[Callable[[TaskEvent], Awaitable[None]]] = None)
Tracks status changes for items flowing through a pipeline.
Provides methods to query current status, filter by status, get statistics, and retrieve event history.
Example
Initialize the status tracker.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
on_status_change
|
Optional[Callable[[StatusEvent], Awaitable[None]]]
|
Optional callback invoked on each item StatusEvent |
None
|
on_task_start
|
Optional[Callable[[TaskEvent], Awaitable[None]]]
|
Optional callback when a task starts executing (TaskEvent) |
None
|
on_task_complete
|
Optional[Callable[[TaskEvent], Awaitable[None]]]
|
Optional callback when a task completes successfully (TaskEvent) |
None
|
on_task_retry
|
Optional[Callable[[TaskEvent], Awaitable[None]]]
|
Optional callback when a task is retrying after failure (TaskEvent) |
None
|
on_task_fail
|
Optional[Callable[[TaskEvent], Awaitable[None]]]
|
Optional callback when a task fails after all retries (TaskEvent) |
None
|
get_status
¶
Get the current status of an item.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
item_id
|
Any
|
The item identifier |
required |
Returns:
| Type | Description |
|---|---|
StatusEvent | None
|
The most recent StatusEvent for the item, or None if not found |
get_by_status
¶
Get all items currently in a given status.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
status
|
StatusType
|
The status to filter by |
required |
Returns:
| Type | Description |
|---|---|
List[StatusEvent]
|
List of StatusEvents for items with the given status |
get_stats
¶
Get aggregate statistics by status.
Returns:
| Type | Description |
|---|---|
Dict[str, int]
|
Dictionary mapping status names to counts |
get_history
¶
Get the full event history for an item.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
item_id
|
Any
|
The item identifier |
required |
Returns:
| Type | Description |
|---|---|
List[StatusEvent]
|
List of all StatusEvents for the item, in chronological order |