Dashboard and Real-Time Monitoring¶
AntFlow provides comprehensive tools for monitoring pipeline execution in real-time, including worker states, performance metrics, and dashboard helpers.
Overview¶
The dashboard functionality provides:
- Worker State Tracking: Know what each worker is doing at any moment
- Performance Metrics: Track items processed, failures, and average processing time per worker
- Snapshot API: Query current state for dashboards and UIs
- Event Streaming: Subscribe to real-time status changes
- PipelineDashboard Helper: Combines queries and events for efficient monitoring
Quick Start¶
from antflow import Pipeline, PipelineDashboard, Stage, StatusTracker
tracker = StatusTracker()
stage = Stage(name="Process", workers=5, tasks=[my_task])
pipeline = Pipeline(stages=[stage], status_tracker=tracker)
dashboard = PipelineDashboard(pipeline, tracker)
snapshot = dashboard.get_snapshot()
print(f"Active workers: {len(dashboard.get_active_workers())}")
print(f"Items processed: {snapshot.pipeline_stats.items_processed}")
Worker States¶
Querying Worker States¶
Get the current state of all workers:
states = pipeline.get_worker_states()
for worker_name, state in states.items():
print(f"{worker_name}:")
print(f" Status: {state.status}") # "idle" or "busy"
print(f" Current item: {state.current_item_id}")
print(f" Processing since: {state.processing_since}")
Worker State Fields¶
Each WorkerState contains:
worker_name: Full worker name (e.g., "Fetch-W0")stage: Stage name the worker belongs tostatus: Either"idle"or"busy"current_item_id: ID of item being processed (None if idle)processing_since: Timestamp when started processing current item
Example: Finding Busy Workers¶
states = pipeline.get_worker_states()
busy_workers = [
(name, state.current_item_id)
for name, state in states.items()
if state.status == "busy"
]
print(f"Busy workers: {len(busy_workers)}")
for worker, item_id in busy_workers:
print(f" {worker} processing item {item_id}")
Worker Metrics¶
Querying Performance Metrics¶
Get performance metrics for all workers:
metrics = pipeline.get_worker_metrics()
for worker_name, metric in metrics.items():
print(f"{worker_name}:")
print(f" Items processed: {metric.items_processed}")
print(f" Items failed: {metric.items_failed}")
print(f" Avg processing time: {metric.avg_processing_time:.3f}s")
print(f" Last active: {metric.last_active}")
Worker Metrics Fields¶
Each WorkerMetrics contains:
worker_name: Full worker namestage: Stage nameitems_processed: Count of successfully processed itemsitems_failed: Count of failed itemstotal_processing_time: Total time spent processing (seconds)last_active: Timestamp of last activityavg_processing_time(property): Average time per item
Example: Top Performers¶
metrics = pipeline.get_worker_metrics()
top_workers = sorted(
metrics.items(),
key=lambda x: x[1].items_processed,
reverse=True
)[:5]
print("Top 5 workers:")
for worker, metric in top_workers:
print(f" {worker}: {metric.items_processed} items, "
f"avg {metric.avg_processing_time:.3f}s")
Dashboard Snapshots¶
Getting Complete Snapshot¶
Get a complete snapshot of the current state:
snapshot = pipeline.get_dashboard_snapshot()
print(f"Timestamp: {snapshot.timestamp}")
print(f"Active workers: {sum(1 for s in snapshot.worker_states.values() if s.status == 'busy')}")
print(f"Total processed: {snapshot.pipeline_stats.items_processed}")
print(f"Total failed: {snapshot.pipeline_stats.items_failed}")
print(f"Queue sizes: {snapshot.pipeline_stats.queue_sizes}")
for worker, metrics in snapshot.worker_metrics.items():
if metrics.items_processed > 0:
print(f" {worker}: {metrics.items_processed} items")
Snapshot Fields¶
A DashboardSnapshot contains:
worker_states: Dict of all WorkerState objectsworker_metrics: Dict of all WorkerMetrics objectspipeline_stats: PipelineStats (items processed/failed, queue sizes)timestamp: When snapshot was taken
PipelineDashboard Helper¶
The PipelineDashboard class combines queries and events for efficient monitoring.
Basic Usage¶
from antflow import PipelineDashboard
dashboard = PipelineDashboard(
pipeline=pipeline,
tracker=tracker,
on_update=my_update_callback, # Optional
update_interval=1.0 # Seconds between updates
)
snapshot = dashboard.get_snapshot()
active = dashboard.get_active_workers()
idle = dashboard.get_idle_workers()
utilization = dashboard.get_worker_utilization()
Subscribing to Events¶
Subscribe to status change events:
async def on_item_failed(event):
if event.status == "failed":
error = event.metadata.get("error")
print(f"Item {event.item_id} failed: {error}")
dashboard.subscribe(on_item_failed)
await pipeline.run(items)
dashboard.unsubscribe(on_item_failed)
Periodic Updates¶
Use automatic periodic updates for real-time dashboards:
async def print_status(snapshot):
active = sum(1 for s in snapshot.worker_states.values() if s.status == "busy")
print(f"Active: {active}, Processed: {snapshot.pipeline_stats.items_processed}")
dashboard = PipelineDashboard(
pipeline,
tracker,
on_update=print_status,
update_interval=2.0
)
async with dashboard:
await pipeline.run(items)
Worker Utilization¶
Calculate success rate for each worker:
utilization = dashboard.get_worker_utilization()
for worker, util in sorted(utilization.items()):
print(f"{worker}: {util*100:.1f}% success rate")
Real-Time Dashboard Example¶
WebSocket Dashboard¶
from fastapi import FastAPI, WebSocket
from antflow import Pipeline, PipelineDashboard, Stage, StatusTracker
app = FastAPI()
@app.websocket("/ws/dashboard")
async def dashboard_endpoint(websocket: WebSocket):
await websocket.accept()
tracker = StatusTracker()
pipeline = Pipeline(stages=[stage], status_tracker=tracker)
dashboard = PipelineDashboard(pipeline, tracker)
initial = dashboard.get_snapshot()
await websocket.send_json({
"type": "init",
"workers": {
name: {
"status": state.status,
"current_item": state.current_item_id
}
for name, state in initial.worker_states.items()
},
"stats": {
"processed": initial.pipeline_stats.items_processed,
"failed": initial.pipeline_stats.items_failed,
"queues": initial.pipeline_stats.queue_sizes
}
})
async def on_event(event):
await websocket.send_json({
"type": "event",
"item_id": event.item_id,
"status": event.status,
"worker": event.worker,
"timestamp": event.timestamp
})
dashboard.subscribe(on_event)
asyncio.create_task(pipeline.run(items))
Monitoring Loop¶
async def monitor_pipeline(pipeline):
while True:
await asyncio.sleep(1.0)
states = pipeline.get_worker_states()
metrics = pipeline.get_worker_metrics()
stats = pipeline.get_stats()
busy_count = sum(1 for s in states.values() if s.status == "busy")
print(f"\rActive: {busy_count}/{len(states)} | "
f"Processed: {stats.items_processed} | "
f"Failed: {stats.items_failed}",
end="", flush=True)
if stats.items_in_flight == 0 and busy_count == 0:
break
asyncio.create_task(monitor_pipeline(pipeline))
await pipeline.run(items)
Complete Example¶
import asyncio
from antflow import Pipeline, PipelineDashboard, Stage, StatusTracker
async def process_item(x: int) -> int:
await asyncio.sleep(0.1)
return x * 2
async def print_updates(snapshot):
active = sum(1 for s in snapshot.worker_states.values() if s.status == "busy")
print(f"[{snapshot.timestamp:.1f}] Active: {active}, "
f"Processed: {snapshot.pipeline_stats.items_processed}")
async def main():
tracker = StatusTracker()
stage = Stage(name="Process", workers=5, tasks=[process_item])
pipeline = Pipeline(stages=[stage], status_tracker=tracker)
dashboard = PipelineDashboard(
pipeline,
tracker,
on_update=print_updates,
update_interval=2.0
)
async with dashboard:
print("Starting processing...")
results = await pipeline.run(range(50))
print(f"\nCompleted! Processed {len(results)} items")
print("\nFinal metrics:")
for worker, metrics in dashboard.get_worker_metrics().items():
if metrics.items_processed > 0:
print(f" {worker}: {metrics.items_processed} items, "
f"avg {metrics.avg_processing_time:.3f}s")
utilization = dashboard.get_worker_utilization()
avg_util = sum(utilization.values()) / len(utilization)
print(f"\nAverage utilization: {avg_util*100:.1f}%")
asyncio.run(main())
Best Practices¶
For Live Dashboards¶
- Use PipelineDashboard for combining queries and events
- Send initial snapshot when client connects
- Stream incremental updates via events
- Poll infrequently (1-2 seconds) for non-critical metrics
- Limit event subscribers to avoid performance impact
For Logging/Monitoring¶
- Query metrics at end of pipeline run for summaries
- Use StatusTracker events for real-time alerts
- Track worker utilization to identify bottlenecks
- Monitor queue sizes for backpressure detection
Performance Considerations¶
- Queries are cheap: O(1) dict lookups
- Snapshots copy data: Use sparingly for large worker counts
- Events are async: Won't slow down pipeline
- Automatic monitoring: Minimal overhead (<1% typically)
Examples¶
See the examples/ directory for complete dashboard implementations:
- rich_polling_dashboard.py: Polling-based terminal dashboard (Recommended)
- rich_callback_dashboard.py: Event-driven terminal dashboard
- dashboard_websocket.py: WebSocket server for web UIs
API Reference¶
See the API documentation for complete details on:
Pipeline.get_worker_states()Pipeline.get_worker_metrics()Pipeline.get_dashboard_snapshot()PipelineDashboardWorkerStateWorkerMetricsDashboardSnapshot