Display Module API¶
The antflow.display module provides progress bars and dashboards for pipeline monitoring.
ProgressDisplay¶
Minimal terminal progress bar without external dependencies.
Constructor¶
Parameters:
total: Total number of items to processwidth: Width of the progress bar in charactersfill_char: Character for completed portionempty_char: Character for remaining portion
Methods¶
on_start(pipeline, total_items): Initialize progress trackingon_update(snapshot): Update display with current stateon_finish(results, summary): Show completion message
Usage¶
# Automatic via pipeline
results = await pipeline.run(items, progress=True)
# Manual usage
progress = ProgressDisplay(total=100)
progress.on_start(pipeline, 100)
# ... during processing ...
progress.on_update(snapshot)
progress.on_finish(results, summary)
BaseDashboard¶
Abstract base class for custom dashboards.
Methods¶
on_start(pipeline, total_items): Called when pipeline startson_update(snapshot): Called periodically (callsrender())on_finish(results, summary): Called when pipeline completesrender(snapshot): Abstract method - implement display logic
Example¶
class MyDashboard(BaseDashboard):
def render(self, snapshot):
stats = snapshot.pipeline_stats
print(f"Progress: {stats.items_processed}")
CompactDashboard¶
Rich-based compact dashboard showing essential metrics.
Constructor¶
Parameters:
refresh_rate: Display refresh rate in Hz
Display¶
Shows a single panel with:
- Progress bar with percentage
- Current stage activity
- Processing rate and ETA
- Success/failure counts
Usage¶
DetailedDashboard¶
Rich-based dashboard with stage-level metrics.
Constructor¶
Display¶
Shows:
- Overall progress bar with rate and ETA
- Per-stage table with worker counts
- Worker performance metrics table
Usage¶
FullDashboard¶
Rich-based comprehensive dashboard with full monitoring.
Constructor¶
Parameters:
refresh_rate: Display refresh rate in Hzmax_items_shown: Maximum items to show in item tracker
Display¶
Shows:
- Overview panel with statistics
- Stage metrics table
- Worker monitoring table
- Item tracking table (requires
StatusTracker)
Usage¶
tracker = StatusTracker()
pipeline = Pipeline(stages=[...], status_tracker=tracker)
results = await pipeline.run(items, dashboard="full")
DashboardProtocol¶
Protocol for custom dashboard implementations.
Methods¶
def on_start(self, pipeline: Pipeline, total_items: int) -> None:
"""Called when pipeline execution starts."""
...
def on_update(self, snapshot: DashboardSnapshot) -> None:
"""Called periodically with current pipeline state."""
...
def on_finish(
self,
results: List[PipelineResult],
summary: ErrorSummary
) -> None:
"""Called when pipeline execution completes."""
...
Usage¶
class MyDashboard:
def on_start(self, pipeline, total_items):
print(f"Starting {total_items} items")
def on_update(self, snapshot):
print(f"Processed: {snapshot.pipeline_stats.items_processed}")
def on_finish(self, results, summary):
print(f"Done: {len(results)} results")
results = await pipeline.run(items, custom_dashboard=MyDashboard())
Types¶
DashboardSnapshot¶
@dataclass
class DashboardSnapshot:
worker_states: Dict[str, WorkerState]
worker_metrics: Dict[str, WorkerMetrics]
pipeline_stats: PipelineStats
timestamp: float
ErrorSummary¶
@dataclass
class ErrorSummary:
total_failed: int
errors_by_type: Dict[str, int]
errors_by_stage: Dict[str, int]
failed_items: List[FailedItem]