Skip to content

Monitoring: Dashboard vs StatusTracker

AntFlow provides two primary mechanisms for monitoring pipeline execution. Understanding the differences between them will help you choose the right tool for your use case.

Overview

Feature Dashboard StatusTracker
Mechanism Polling (periodic updates) Event-driven (callbacks)
Use Case Visual monitoring, progress display Logging, system integration, debugging
Performance Low overhead, updates every N seconds Immediate updates on every event
Complexity Simple (built-in dashboards) Flexible (custom callbacks)
Data Access Current state snapshots Complete event history

Dashboard: Polling-based Monitoring

How It Works

Dashboard uses a polling mechanism where a background task periodically queries the pipeline state: 1. Background task calls get_dashboard_snapshot() every N seconds (default 0.5) 2. Snapshot contains current worker states, metrics, and pipeline statistics 3. Dashboard renders the snapshot to the terminal or custom UI

Built-in Dashboards

AntFlow includes three built-in dashboards:

from antflow import Pipeline, Stage

async def task(x):
    await asyncio.sleep(0.1)
    return x * 2

pipeline = Pipeline(stages=[Stage(name="Process", workers=5, tasks=[task])])

# Option 1: Minimal progress bar
await pipeline.run(items, progress=True)

# Option 2: Compact dashboard
await pipeline.run(items, dashboard="compact")

# Option 3: Detailed dashboard (shows per-stage progress)
await pipeline.run(items, dashboard="detailed")

# Option 4: Full dashboard (includes error summary)
await pipeline.run(items, dashboard="full")

When to Use Dashboard

Perfect for: - Interactive development and debugging - Real-time progress visualization - Identifying bottlenecks in multi-stage pipelines - Monitoring during production runs

Not ideal for: - Storing events in external systems (databases, logs) - Complex business logic based on specific events - High-frequency event processing (>10 events/second)

Performance Considerations

The polling interval is configurable:

# Fast updates (more responsive, higher CPU)
await pipeline.run(items, dashboard="detailed", dashboard_update_interval=0.1)

# Slow updates (less responsive, lower CPU)
await pipeline.run(items, dashboard="detailed", dashboard_update_interval=1.0)

Recommended range: 0.1 to 1.0 seconds.

Custom Dashboards

You can create custom dashboards using the DashboardProtocol:

from antflow import Pipeline, PipelineResult, DashboardSnapshot, DashboardProtocol

class MyDashboard(DashboardProtocol):
    def on_start(self, pipeline, total_items):
        self.total = total_items
        print(f"Starting to process {total_items} items...")

    def on_update(self, snapshot: DashboardSnapshot):
        stats = snapshot.pipeline_stats
        progress = (stats.items_processed / self.total) * 100
        print(f"Progress: {progress:.1f}% ({stats.items_processed}/{self.total})")

    def on_finish(self, results: list[PipelineResult], summary):
        print(f"Done! Processed {len(results)} items")
        if summary.total_failed > 0:
            print(f"Failed: {summary.total_failed}")

# Use custom dashboard
pipeline = Pipeline(stages=[...])
await pipeline.run(items, custom_dashboard=MyDashboard())

StatusTracker: Event-driven Monitoring

How It Works

StatusTracker uses async callbacks that are invoked immediately when events occur: 1. Pipeline emits events (queued, in_progress, completed, failed, etc.) 2. StatusTracker invokes registered callbacks 3. Callbacks can log, store, or process events immediately

Basic Usage

from antflow import Pipeline, StatusTracker, StatusEvent

async def on_status_change(event: StatusEvent):
    print(f"Item {event.item_id}: {event.status} @ {event.stage}")

tracker = StatusTracker(on_status_change=on_status_change)

pipeline = Pipeline(stages=[...], status_tracker=tracker)
await pipeline.run(items)

Event Types

StatusTracker can track multiple types of events:

async def on_queued(event: StatusEvent):
    print(f"Queued: {event.item_id}")

async def on_start(event: StatusEvent):
    print(f"Started: {event.item_id} on worker {event.worker}")

async def on_completed(event: StatusEvent):
    print(f"Completed: {event.item_id}")

async def on_failed(event: StatusEvent):
    error = event.metadata.get("error", "Unknown")
    print(f"Failed: {event.item_id} - {error}")

tracker = StatusTracker(
    on_status_change=on_queued,  # Called for ALL status changes
)
# Or use specific callbacks:
# tracker = StatusTracker(
#     on_status_change=lambda e: print(f"Status: {e.status}"),
#     on_task_start=lambda e: print(f"Task started: {e.task_name}"),
#     on_task_complete=lambda e: print(f"Task done: {e.task_name}"),
#     on_task_retry=lambda e: print(f"Retrying: {e.task_name}"),
#     on_task_fail=lambda e: print(f"Task failed: {e.task_name}"),
# )

Querying StatusTracker

After execution, you can query the tracker for information:

# Get current status of an item
status = tracker.get_status(item_id=42)
print(f"Current status: {status.status}")

# Get all items in a specific status
failed_items = tracker.get_by_status("failed")
for item in failed_items:
    print(f"Failed item: {item.item_id}")

# Get aggregate statistics
stats = tracker.get_stats()
print(f"Completed: {stats['completed']}, Failed: {stats['failed']}")

# Get complete event history for an item
history = tracker.get_history(item_id=42)
for event in history:
    print(f"{event.timestamp}: {event.status} @ {event.stage}")

Error Summary

StatusTracker provides aggregated error information:

error_summary = tracker.get_error_summary()
print(f"Total failed: {error_summary.total_failed}")

# Group by error type
for error_type, count in error_summary.errors_by_type.items():
    print(f"  {error_type}: {count}")

# Group by stage
for stage, count in error_summary.errors_by_stage.items():
    print(f"  {stage}: {count}")

# Get individual failed items
for failed_item in error_summary.failed_items:
    print(f"Item {failed_item.item_id}: {failed_item.error}")

When to Use StatusTracker

Perfect for: - Logging events to external systems (databases, log files) - Integrating with monitoring tools (Prometheus, DataDog, etc.) - Implementing custom business logic based on events - Debugging complex failure scenarios - Storing complete event history for audit trails

Not ideal for: - Simple progress visualization (use Dashboard instead) - Real-time terminal output (use built-in dashboards) - Cases where you only need aggregate statistics (use get_stats())

Task-Level Events

StatusTracker also tracks events at the task level (within a stage):

async def on_task_start(event):
    print(f"Task '{event.task_name}' started on {event.worker}")

async def on_task_complete(event):
    duration = event.duration
    print(f"Task '{event.task_name}' completed in {duration:.2f}s")

async def on_task_retry(event):
    print(f"Task '{event.task_name}' retrying (attempt {event.attempt})")

async def on_task_fail(event):
    print(f"Task '{event.task_name}' failed: {event.error}")

tracker = StatusTracker(
    on_task_start=on_task_start,
    on_task_complete=on_task_complete,
    on_task_retry=on_task_retry,
    on_task_fail=on_task_fail,
)

Combining Both Mechanisms

You can use Dashboard and StatusTracker together:

from antflow import Pipeline, StatusTracker

async def log_to_db(event):
    # Store event in database
    await db.insert(event)

tracker = StatusTracker(on_status_change=log_to_db)

pipeline = Pipeline(
    stages=[...],
    status_tracker=tracker,
)

# Use dashboard for visual monitoring + tracker for logging
await pipeline.run(items, dashboard="detailed")

This gives you: - Visual feedback in the terminal (Dashboard) - Persistent logging to external systems (StatusTracker) - Complete event history for debugging (StatusTracker)

Performance Comparison

Metric Dashboard StatusTracker
CPU overhead Low (one snapshot every N seconds) Medium (callback per event)
Memory usage Low (only current state) Medium (stores event history)
Latency Up to N seconds Immediate
Scalability Good for any scale Depends on callback complexity

Choosing the Right Tool

Use Dashboard when:

  • You need visual progress feedback
  • You're debugging in an interactive terminal
  • You want to identify bottlenecks quickly
  • You don't need to store event history

Use StatusTracker when:

  • You need to log events to external systems
  • You want complete event history for debugging
  • You need to implement custom business logic
  • You're integrating with monitoring tools

Use Both when:

  • You want visual feedback AND persistent logging
  • You're debugging in production and need both views
  • You need to show progress to users while logging internally

Examples

See the examples directory for complete working examples: