Custom Dashboard Guide¶
AntFlow allows you to create custom dashboards by implementing the DashboardProtocol.
Built-in vs Custom Dashboards¶
AntFlow provides built-in dashboards for common use cases:
# Simple progress bar (no Rich dependency)
results = await pipeline.run(items, progress=True)
# Built-in Rich dashboards
results = await pipeline.run(items, dashboard="compact") # Single panel
results = await pipeline.run(items, dashboard="detailed") # Per-stage table
results = await pipeline.run(items, dashboard="full") # Full monitoring
For custom visualization needs, you can implement your own dashboard.
Two Approaches: Polling vs Callbacks¶
| Approach | Use Case | How it Works |
|---|---|---|
| DashboardProtocol (polling) | Terminal UIs, web dashboards | on_update() called periodically with full state |
| StatusTracker callbacks (event-driven) | Logging, real-time events | Callbacks fired on each status change |
When to Use Each¶
- DashboardProtocol: Best for rendering UIs that need complete state snapshots
- StatusTracker callbacks: Best for logging, event streaming, or reacting to specific events
[!TIP] See Complete Examples: - Polling:
examples/custom_dashboard.py- Callbacks:examples/custom_dashboard_callbacks.py
How Polling Works (DashboardProtocol)¶
When you use DashboardProtocol (via custom_dashboard, dashboard, or progress parameters), AntFlow starts a background task that:
- Calls
get_dashboard_snapshot()- Reads current pipeline state (worker states, metrics, stats) - Calls
display.on_update(snapshot)- Updates your dashboard with the snapshot - Sleeps for
dashboard_update_intervalseconds (default: 0.5s) - Repeats until the pipeline finishes
# Internally, this is what happens:
async def _monitor_progress(display, total_items, update_interval=0.5):
while True:
snapshot = pipeline.get_dashboard_snapshot() # Read current state
display.on_update(snapshot) # Update UI
await asyncio.sleep(update_interval) # Wait
Key Points:
- ✅ Efficient: Snapshots just read existing state - no new objects created
- ✅ No Empty Events: Every update has current state to display
- ✅ Configurable: Adjust dashboard_update_interval to balance responsiveness vs CPU usage
- ✅ Automatic Cleanup: Task is cancelled immediately when pipeline finishes
Configuring Update Interval¶
You can control how often the dashboard updates:
# Default: 0.5 seconds (2 updates per second)
results = await pipeline.run(items, dashboard="detailed")
# Faster updates (5 updates per second) - more responsive but higher CPU
results = await pipeline.run(items, dashboard="detailed", dashboard_update_interval=0.2)
# Slower updates (1 update per second) - lower CPU usage
results = await pipeline.run(items, dashboard="detailed", dashboard_update_interval=1.0)
# Very fast updates (10 updates per second) - only for debugging
results = await pipeline.run(items, dashboard="detailed", dashboard_update_interval=0.1)
Recommended Values: - 0.1s - Very responsive, good for debugging (10 updates/sec) - 0.2s - Fast updates, good for development (5 updates/sec) - 0.5s - Default, balanced (2 updates/sec) ⭐ - 1.0s - Slower, lower CPU usage (1 update/sec)
When to Use Lower Intervals: - Debugging fast-running pipelines - When you need to see every state change - Development and testing
When to Use Higher Intervals: - Long-running production pipelines - When CPU usage is a concern - When you only need periodic status checks
DashboardProtocol¶
The protocol defines three methods:
from antflow import DashboardProtocol
class MyDashboard:
def on_start(self, pipeline, total_items):
"""Called when pipeline execution starts."""
pass
def on_update(self, snapshot):
"""Called periodically with current pipeline state."""
pass
def on_finish(self, results, summary):
"""Called when pipeline execution completes."""
pass
Simple Example¶
Here's a minimal custom dashboard that logs to console:
from antflow import Pipeline, Stage
class LoggingDashboard:
def on_start(self, pipeline, total_items):
print(f"Starting pipeline with {total_items} items")
def on_update(self, snapshot):
stats = snapshot.pipeline_stats
print(f"Progress: {stats.items_processed}/{stats.items_processed + stats.items_in_flight}")
def on_finish(self, results, summary):
print(f"Done! {len(results)} succeeded, {summary.total_failed} failed")
# Use it
pipeline = Pipeline(stages=[...])
results = await pipeline.run(items, custom_dashboard=LoggingDashboard())
Using DashboardSnapshot¶
The on_update method receives a DashboardSnapshot with:
worker_states: Dict of worker name toWorkerStateworker_metrics: Dict of worker name toWorkerMetricspipeline_stats:PipelineStatsobjecttimestamp: Unix timestamp
def on_update(self, snapshot):
# Access pipeline statistics
stats = snapshot.pipeline_stats
print(f"Processed: {stats.items_processed}")
print(f"Failed: {stats.items_failed}")
print(f"In flight: {stats.items_in_flight}")
# Access per-stage statistics
for stage_name, stage_stat in stats.stage_stats.items():
print(f"Stage {stage_name}:")
print(f" Pending: {stage_stat.pending_items}")
print(f" In progress: {stage_stat.in_progress_items}")
print(f" Completed: {stage_stat.completed_items}")
# Access worker states
busy_workers = [
name for name, state in snapshot.worker_states.items()
if state.status == "busy"
]
print(f"Busy workers: {busy_workers}")
# Access worker metrics
for name, metrics in snapshot.worker_metrics.items():
print(f"{name}: {metrics.items_processed} items, avg {metrics.avg_processing_time:.2f}s")
Using ErrorSummary¶
The on_finish method receives an ErrorSummary:
def on_finish(self, results, summary):
if summary.total_failed > 0:
print(f"Errors by type:")
for error_type, count in summary.errors_by_type.items():
print(f" {error_type}: {count}")
print(f"Errors by stage:")
for stage, count in summary.errors_by_stage.items():
print(f" {stage}: {count}")
print(f"Failed items:")
for item in summary.failed_items:
print(f" ID {item.item_id}: {item.error}")
Subclassing BaseDashboard¶
For more complex dashboards, subclass BaseDashboard:
from antflow.display import BaseDashboard
class MyRichDashboard(BaseDashboard):
def __init__(self):
self.total = 0
def on_start(self, pipeline, total_items):
self.total = total_items
print("Starting...")
def render(self, snapshot):
# This is called by on_update
stats = snapshot.pipeline_stats
pct = (stats.items_processed / self.total * 100) if self.total else 0
print(f"\rProgress: {pct:.1f}%", end="")
def on_finish(self, results, summary):
print(f"\nDone: {len(results)} results")
Integration with Rich¶
For terminal UIs, use the Rich library:
from rich.console import Console
from rich.live import Live
from rich.table import Table
from antflow.display import BaseDashboard
class RichTableDashboard(BaseDashboard):
def __init__(self):
self.console = Console()
self.live = None
self.total = 0
def on_start(self, pipeline, total_items):
self.total = total_items
self.live = Live(self._make_table(None), console=self.console)
self.live.start()
def render(self, snapshot):
if self.live:
self.live.update(self._make_table(snapshot))
def on_finish(self, results, summary):
if self.live:
self.live.stop()
self.console.print(f"[green]Done![/green] {len(results)} results")
def _make_table(self, snapshot):
table = Table(title="Pipeline Status")
table.add_column("Stage")
table.add_column("Progress")
if snapshot:
for name, stat in snapshot.pipeline_stats.stage_stats.items():
table.add_row(name, f"{stat.completed_items} done")
return table
Web Dashboard Integration¶
AntFlow provides all the data you need to build custom web dashboards. Here's how to integrate with your frontend.
Available Data¶
The DashboardSnapshot contains everything you need:
snapshot = pipeline.get_dashboard_snapshot()
# Overall stats
snapshot.pipeline_stats.items_processed # Items completed (all stages)
snapshot.pipeline_stats.items_failed # Items that failed
snapshot.pipeline_stats.items_in_flight # Items currently being processed
snapshot.pipeline_stats.queue_sizes # Dict[stage_name, queue_size]
# Per-stage stats
for stage_name, stage_stat in snapshot.pipeline_stats.stage_stats.items():
stage_stat.pending_items # Waiting in queue
stage_stat.in_progress_items # Being processed
stage_stat.completed_items # Finished this stage
stage_stat.failed_items # Failed at this stage
# Worker states
for worker_name, state in snapshot.worker_states.items():
state.stage # Which stage this worker belongs to
state.status # "idle" or "busy"
state.current_item_id # Item being processed (or None)
state.processing_since # Timestamp when started (or None)
# Worker metrics
for worker_name, metrics in snapshot.worker_metrics.items():
metrics.items_processed # Total items processed by this worker
metrics.items_failed # Total items failed by this worker
metrics.avg_processing_time # Average time per item (seconds)
FastAPI REST Endpoint Example¶
from fastapi import FastAPI
from antflow import Pipeline, Stage
app = FastAPI()
pipeline = None # Will be set when pipeline starts
@app.get("/api/pipeline/status")
async def get_status():
"""REST endpoint that returns current pipeline state."""
if not pipeline:
return {"status": "not_running"}
snapshot = pipeline.get_dashboard_snapshot()
stats = snapshot.pipeline_stats
return {
"status": "running",
"progress": {
"processed": stats.items_processed,
"failed": stats.items_failed,
"in_flight": stats.items_in_flight,
"total": total_items,
},
"stages": {
name: {
"pending": s.pending_items,
"active": s.in_progress_items,
"completed": s.completed_items,
"failed": s.failed_items,
}
for name, s in stats.stage_stats.items()
},
"workers": {
name: {
"stage": state.stage,
"status": state.status,
"current_item": state.current_item_id,
}
for name, state in snapshot.worker_states.items()
}
}
@app.post("/api/pipeline/start")
async def start_pipeline(items: list):
"""Start pipeline processing."""
global pipeline, total_items
total_items = len(items)
pipeline = Pipeline(stages=[
Stage("Process", workers=5, tasks=[my_task])
])
# Run in background
asyncio.create_task(pipeline.run(items))
return {"status": "started", "total": total_items}
FastAPI WebSocket Example (Real-Time)¶
from fastapi import FastAPI, WebSocket
from antflow import Pipeline, Stage
app = FastAPI()
@app.websocket("/ws/pipeline")
async def websocket_endpoint(websocket: WebSocket):
"""WebSocket endpoint for real-time updates."""
await websocket.accept()
try:
while True:
if pipeline:
snapshot = pipeline.get_dashboard_snapshot()
stats = snapshot.pipeline_stats
await websocket.send_json({
"processed": stats.items_processed,
"failed": stats.items_failed,
"in_flight": stats.items_in_flight,
"stages": {
name: {
"pending": s.pending_items,
"active": s.in_progress_items,
"completed": s.completed_items,
}
for name, s in stats.stage_stats.items()
}
})
await asyncio.sleep(0.1) # Update rate: 10 Hz
except Exception:
pass # Client disconnected
Using DashboardProtocol with WebSocket¶
For push-based updates (instead of polling), use custom_dashboard:
class WebSocketDashboard:
def __init__(self, websocket):
self.ws = websocket
def on_start(self, pipeline, total_items):
asyncio.create_task(self.ws.send_json({
"type": "start",
"total": total_items,
"stages": [s.name for s in pipeline.stages]
}))
def on_update(self, snapshot):
stats = snapshot.pipeline_stats
asyncio.create_task(self.ws.send_json({
"type": "update",
"processed": stats.items_processed,
"failed": stats.items_failed,
"in_flight": stats.items_in_flight,
"stages": {
name: {"pending": s.pending_items, "active": s.in_progress_items, "done": s.completed_items}
for name, s in stats.stage_stats.items()
}
}))
def on_finish(self, results, summary):
asyncio.create_task(self.ws.send_json({
"type": "finish",
"results": len(results),
"failed": summary.total_failed,
"errors": summary.errors_by_type
}))
# Usage with FastAPI
@app.websocket("/ws/pipeline/run")
async def run_with_websocket(websocket: WebSocket, items: list):
await websocket.accept()
pipeline = Pipeline(stages=[...])
dashboard = WebSocketDashboard(websocket)
results = await pipeline.run(items, custom_dashboard=dashboard)
# Dashboard automatically sends start, updates, and finish events
Frontend JavaScript Example¶
// Connect to WebSocket
const ws = new WebSocket('ws://localhost:8000/ws/pipeline');
ws.onmessage = (event) => {
const data = JSON.parse(event.data);
// Update progress bar
const pct = (data.processed / totalItems) * 100;
document.getElementById('progress').style.width = `${pct}%`;
// Update stage table
for (const [stage, stats] of Object.entries(data.stages)) {
document.getElementById(`stage-${stage}-pending`).textContent = stats.pending;
document.getElementById(`stage-${stage}-active`).textContent = stats.active;
document.getElementById(`stage-${stage}-done`).textContent = stats.done;
}
// Update counters
document.getElementById('processed').textContent = data.processed;
document.getElementById('failed').textContent = data.failed;
};
Flask Example (Synchronous)¶
from flask import Flask, jsonify
import asyncio
from antflow import Pipeline, Stage
app = Flask(__name__)
pipeline = None
loop = asyncio.new_event_loop()
@app.route('/api/status')
def get_status():
if not pipeline:
return jsonify({"status": "not_running"})
snapshot = pipeline.get_dashboard_snapshot()
stats = snapshot.pipeline_stats
return jsonify({
"processed": stats.items_processed,
"failed": stats.items_failed,
"stages": {
name: {"done": s.completed_items, "active": s.in_progress_items}
for name, s in stats.stage_stats.items()
}
})
Multi-Stage Progress Visualization¶
When running pipelines with multiple stages, understanding progress requires per-stage visibility.
The Problem¶
The items_processed counter only increments when items complete all stages:
Pipeline: Fetch → Process → Save
Items: 100 50 10
items_processed = 10 (only counts items that finished ALL stages)
Solution: Use stage_stats¶
The DashboardSnapshot provides stage_stats for per-stage visibility:
class MultiStageDashboard:
def on_update(self, snapshot):
stats = snapshot.pipeline_stats
print(f"Overall: {stats.items_processed}/{self.total} completed end-to-end")
print()
# Per-stage breakdown
for stage_name, stage_stat in stats.stage_stats.items():
total_in_stage = (
stage_stat.pending_items +
stage_stat.in_progress_items +
stage_stat.completed_items +
stage_stat.failed_items
)
print(f"{stage_name}:")
print(f" Pending: {stage_stat.pending_items}")
print(f" In Progress: {stage_stat.in_progress_items}")
print(f" Completed: {stage_stat.completed_items}")
print(f" Failed: {stage_stat.failed_items}")
Visual Example¶
╭─────────────────────── Multi-Stage Pipeline ───────────────────────╮
│ │
│ Overall: [██░░░░░░░░░░░░░░░░░░░░░░░░░░░░] 10% (10/100 end-to-end) │
│ │
│ Stage │ Pending │ In Progress │ Completed │ Failed │
│ ───────────┼─────────┼─────────────┼───────────┼────── │
│ Fetch │ 0 │ 3 │ 97 │ 0 │
│ Process │ 47 │ 5 │ 45 │ 0 │
│ Save │ 35 │ 2 │ 10 │ 0 │
│ │
╰─────────────────────────────────────────────────────────────────────╯
Alternative: StatusTracker Callbacks (Event-Driven)¶
[!TIP] See a complete working example:
examples/monitoring_status_tracker.pyThis example demonstrates: -
on_status_changecallback for item-level events -on_task_*callbacks for task-level monitoring (start, complete, retry, fail) - Real-time polling withget_stats()- Query methods:get_status(),get_by_status(),get_history()
For event-driven monitoring (instead of polling), use StatusTracker callbacks:
from antflow import Pipeline, Stage, StatusTracker, StatusEvent
async def on_status_change(event: StatusEvent):
"""Called on EVERY status change (queued, in_progress, completed, failed)."""
print(f"[{event.timestamp}] Item {event.item_id}: {event.status} @ {event.stage}")
async def main():
tracker = StatusTracker(on_status_change=on_status_change)
pipeline = Pipeline(
stages=[
Stage("Fetch", workers=3, tasks=[fetch]),
Stage("Process", workers=5, tasks=[process]),
],
status_tracker=tracker,
)
results = await pipeline.run(items) # Callbacks fire during execution
Task-Level Callbacks¶
For even finer granularity, use task-level callbacks:
tracker = StatusTracker(
on_status_change=handle_item_status, # Item-level events
on_task_start=handle_task_start, # Task started
on_task_complete=handle_task_complete, # Task succeeded
on_task_retry=handle_task_retry, # Task being retried
on_task_fail=handle_task_fail, # Task failed
)
Combining Both Approaches¶
You can use both DashboardProtocol AND StatusTracker together:
async def log_failures(event: StatusEvent):
if event.status == "failed":
logging.error(f"Item {event.item_id} failed at {event.stage}: {event.metadata}")
tracker = StatusTracker(on_status_change=log_failures)
pipeline = Pipeline(stages=[...], status_tracker=tracker)
# Dashboard for UI + callbacks for logging
results = await pipeline.run(items, dashboard="compact")
Manual Polling (Without DashboardProtocol)¶
If you prefer complete control, you can poll manually:
import asyncio
from antflow import Pipeline, Stage
async def main():
pipeline = Pipeline(stages=[...])
# Start pipeline in background
await pipeline.start()
await pipeline.feed(items)
# Manual polling loop
while True:
snapshot = pipeline.get_dashboard_snapshot()
stats = snapshot.pipeline_stats
print(f"\rProcessed: {stats.items_processed}", end="")
if stats.items_processed + stats.items_failed >= len(items):
break
await asyncio.sleep(0.1)
await pipeline.join()
results = pipeline.results
This gives you full control over when and how to query pipeline state.