Skip to content

StatusTracker API

The antflow.tracker module provides real-time monitoring and event tracking for pipelines.

Overview

The StatusTracker is the observability layer of AntFlow. It allows you to: 1. Monitor Items: Track the lifecycle of data items as they move through stages. 2. Monitor Tasks: Get granular events for every task execution (start, success, retry, failure). 3. Build Dashboards: Use the event stream to power real-time UIs.

Usage Example

from antflow import Pipeline, Stage, StatusTracker

# 1. Define a custom handler
async def on_event(event):
    if event.status == "failed":
        print(f"🚨 Alert: Item {event.item_id} failed in {event.stage}")

# 2. Initialize tracker
tracker = StatusTracker(on_status_change=on_event)

# 3. Attach to pipeline
pipeline = Pipeline(stages=[...], status_tracker=tracker)

# 4. Run
await pipeline.run(items)

Customizing Behavior

You can customize how status changes are handled by providing callbacks or subclassing StatusTracker.

Using Callbacks

The easiest way to react to events is by passing async callbacks during initialization:

async def on_fail(event: TaskEvent):
    # Send alert to Slack/Discord
    await send_alert(f"Item {event.item_id} failed: {event.error}")

tracker = StatusTracker(
    on_task_fail=on_fail,
    on_status_change=lambda e: print(f"Status: {e.status}")
)

Accessing Worker & Job Status

The StatusTracker focuses on Item progress. To monitor Workers, use the Pipeline methods:

# Item Status (via Tracker)
item_status = tracker.get_status(item_id=123)

# Worker Status (via Pipeline)
worker_states = pipeline.get_worker_states()
for name, state in worker_states.items():
    print(f"Worker {name}: {state.status}")

Available Callbacks

You can provide these async callbacks to the StatusTracker constructor to react to specific events:

Callback Signature Description
on_status_change async def fn(event: StatusEvent) Triggered when an item's status changes (queued, in_progress, completed, failed).
on_task_start async def fn(event: TaskEvent) Triggered when a specific task function starts execution.
on_task_complete async def fn(event: TaskEvent) Triggered when a task function completes successfully.
on_task_retry async def fn(event: TaskEvent) Triggered when a task fails but will be retried.
on_task_fail async def fn(event: TaskEvent) Triggered when a task fails permanently (after all retries).

Event Objects

For detailed properties of StatusEvent and TaskEvent, please refer to the Types API documentation.

Event Types

For quick reference, here are the possible values for status and event types:

Item Status (StatusEvent.status)

Value Description
queued Item has been added to a stage's input queue.
in_progress Worker has picked up the item and started processing.
completed All tasks in the stage finished successfully.
failed Stage execution failed (after all retries).
retrying Item failed processing and is waiting to be retried (per-stage retry). Metadata includes attempt and error.

Task Events (TaskEvent.event_type)

Value Description
start A specific task function started running.
complete Task function returned successfully.
retry Task failed but has retries remaining.
fail Task failed and has no retries left.

Class Reference

StatusTracker

StatusTracker

StatusTracker(on_status_change: Optional[Callable[[StatusEvent], Awaitable[None]]] = None, on_task_start: Optional[Callable[[TaskEvent], Awaitable[None]]] = None, on_task_complete: Optional[Callable[[TaskEvent], Awaitable[None]]] = None, on_task_retry: Optional[Callable[[TaskEvent], Awaitable[None]]] = None, on_task_fail: Optional[Callable[[TaskEvent], Awaitable[None]]] = None)

Tracks status changes for items flowing through a pipeline.

Provides methods to query current status, filter by status, get statistics, and retrieve event history.

Example
tracker = StatusTracker()

async def on_change(event: StatusEvent):
    print(f"Item {event.item_id}: {event.status}")

tracker.on_status_change = on_change
pipeline = Pipeline(stages=[...], status_tracker=tracker)

results = await pipeline.run(items)
print(tracker.get_stats())

Initialize the status tracker.

Parameters:

Name Type Description Default
on_status_change Optional[Callable[[StatusEvent], Awaitable[None]]]

Optional callback invoked on each item StatusEvent

None
on_task_start Optional[Callable[[TaskEvent], Awaitable[None]]]

Optional callback when a task starts executing (TaskEvent)

None
on_task_complete Optional[Callable[[TaskEvent], Awaitable[None]]]

Optional callback when a task completes successfully (TaskEvent)

None
on_task_retry Optional[Callable[[TaskEvent], Awaitable[None]]]

Optional callback when a task is retrying after failure (TaskEvent)

None
on_task_fail Optional[Callable[[TaskEvent], Awaitable[None]]]

Optional callback when a task fails after all retries (TaskEvent)

None

get_status

get_status(item_id: Any) -> StatusEvent | None

Get the current status of an item.

Parameters:

Name Type Description Default
item_id Any

The item identifier

required

Returns:

Type Description
StatusEvent | None

The most recent StatusEvent for the item, or None if not found

get_by_status

get_by_status(status: StatusType) -> List[StatusEvent]

Get all items currently in a given status.

Parameters:

Name Type Description Default
status StatusType

The status to filter by

required

Returns:

Type Description
List[StatusEvent]

List of StatusEvents for items with the given status

get_stats

get_stats() -> Dict[str, int]

Get aggregate statistics by status.

Returns:

Type Description
Dict[str, int]

Dictionary mapping status names to counts

get_history

get_history(item_id: Any) -> List[StatusEvent]

Get the full event history for an item.

Parameters:

Name Type Description Default
item_id Any

The item identifier

required

Returns:

Type Description
List[StatusEvent]

List of all StatusEvents for the item, in chronological order