Types API¶
The antflow.types module defines the core data structures used for type hinting and data exchange between components.
Overview¶
This module contains Data Classes that represent the state of the system, including events, metrics, and snapshots.
Enumerated Types¶
StatusType¶
Represents the current state of an item in the pipeline.
| Value | Description |
|---|---|
queued |
Item is waiting in a stage's input queue. |
in_progress |
Item is currently being processed by a worker. |
completed |
Item has successfully finished processing in the stage. |
failed |
Item failed processing (after exhausting retries). |
retrying |
Item failed but is queued for a retry (per-stage retry strategy). |
WorkerStatus¶
Represents the current activity state of a worker.
| Value | Description |
|---|---|
idle |
Worker is waiting for new items. |
busy |
Worker is currently processing an item. |
TaskEventType¶
Represents the type of event occurring at the task level.
| Value | Description |
|---|---|
start |
A task function has started execution. |
complete |
A task function has completed successfully. |
retry |
A task failed and is being retried. |
fail |
A task failed permanently (after exhausting retries). |
WorkerState¶
| Field | Type | Description |
|---|---|---|
status |
WorkerStatus |
idle or busy. |
current_item_id |
Optional[Any] |
ID of the item currently being processed. |
current_task |
Optional[str] |
Name of the task function currently being executed. |
processing_since |
Optional[float] |
Timestamp when the current item started. |
stage |
str |
Name of the stage the worker belongs to. |
Data Classes¶
PipelineResult
dataclass
¶
PipelineResult(id: Any, value: Any, sequence_id: int, metadata: Dict[str, Any] = dict(), error: Optional[Exception] = None)
Result of a pipeline execution for a single item.
Attributes:
| Name | Type | Description |
|---|---|---|
id |
Any
|
Unique identifier of the item |
value |
Any
|
The final processed value |
sequence_id |
int
|
Internal sequence number for ordering |
metadata |
Dict[str, Any]
|
Additional metadata from the input item |
error |
Optional[Exception]
|
Exception if processing failed (usually None for successful results) |
StageStats
dataclass
¶
StageStats(stage_name: str, pending_items: int, in_progress_items: int, completed_items: int, failed_items: int)
Statistics for a single stage.
Attributes:
| Name | Type | Description |
|---|---|---|
stage_name |
str
|
Name of the stage |
pending_items |
int
|
Number of items in the queue |
in_progress_items |
int
|
Number of items currently being processed (busy workers) |
completed_items |
int
|
Total items successfully processed by this stage |
failed_items |
int
|
Total items failed in this stage |
PipelineStats
dataclass
¶
PipelineStats(items_processed: int, items_failed: int, items_in_flight: int, queue_sizes: Dict[str, int], stage_stats: Dict[str, StageStats] = dict())
Aggregate statistics for the pipeline.
Attributes:
| Name | Type | Description |
|---|---|---|
items_processed |
int
|
Total number of items successfully processed (final output) |
items_failed |
int
|
Total number of items that failed |
items_in_flight |
int
|
Number of items currently being processed anywhere |
queue_sizes |
Dict[str, int]
|
Dictionary mapping stage names to current queue sizes |
stage_stats |
Dict[str, StageStats]
|
Detailed statistics per stage |
WorkerState
dataclass
¶
WorkerState(worker_name: str, stage: str, status: WorkerStatus, current_item_id: Optional[Any] = None, current_task: Optional[str] = None, processing_since: Optional[float] = None)
Current state of a worker.
Attributes:
| Name | Type | Description |
|---|---|---|
worker_name |
str
|
Unique name of the worker (e.g., 'Fetch-W0') |
stage |
str
|
Name of the stage this worker belongs to |
status |
WorkerStatus
|
Current status ('idle' or 'busy') |
current_item_id |
Optional[Any]
|
ID of the item currently being processed (if busy) |
processing_since |
Optional[float]
|
Timestamp when current processing started |
WorkerMetrics
dataclass
¶
WorkerMetrics(worker_name: str, stage: str, items_processed: int = 0, items_failed: int = 0, total_processing_time: float = 0.0, last_active: Optional[float] = None)
Performance metrics for a single worker.
Attributes:
| Name | Type | Description |
|---|---|---|
worker_name |
str
|
Unique name of the worker |
stage |
str
|
Name of the stage |
items_processed |
int
|
Count of successfully processed items |
items_failed |
int
|
Count of failed items |
total_processing_time |
float
|
Cumulative processing time in seconds |
last_active |
Optional[float]
|
Timestamp of last activity |
avg_processing_time
property
¶
Calculate average processing time per item.
TaskEvent
dataclass
¶
TaskEvent(item_id: Any, stage: str, task_name: str, worker: str, event_type: TaskEventType, attempt: int, timestamp: float, error: Optional[Exception] = None, duration: Optional[float] = None)
Event emitted for task-level operations within a stage.
Provides granular visibility into individual task execution, including retries and failures at the task level.
Attributes:
| Name | Type | Description |
|---|---|---|
item_id |
Any
|
Item being processed |
stage |
str
|
Stage name |
task_name |
str
|
Name of the specific task function |
worker |
str
|
Worker name processing the task |
event_type |
TaskEventType
|
Type of event ([TaskEventType][antflow.types.TaskEventType]) |
attempt |
int
|
Current attempt number (1-indexed) |
timestamp |
float
|
Unix timestamp when event occurred |
error |
Optional[Exception]
|
Exception if task failed or is retrying (None otherwise) |
duration |
Optional[float]
|
Time taken to execute task in seconds (None for start events) |
DashboardSnapshot
dataclass
¶
DashboardSnapshot(worker_states: Dict[str, WorkerState], worker_metrics: Dict[str, WorkerMetrics], pipeline_stats: PipelineStats, error_summary: ErrorSummary, timestamp: float)
Snapshot of the entire pipeline state for monitoring.
Attributes:
| Name | Type | Description |
|---|---|---|
worker_states |
Dict[str, WorkerState]
|
Dictionary of all WorkerState |
worker_metrics |
Dict[str, WorkerMetrics]
|
Dictionary of all WorkerMetrics |
pipeline_stats |
PipelineStats
|
Aggregate PipelineStats |
timestamp |
float
|
Timestamp when snapshot was taken |
StatusEvent
dataclass
¶
StatusEvent(item_id: Any, stage: str | None, status: StatusType, worker: str | None, timestamp: float, metadata: Dict[str, Any] = dict())
Represents a status change event for an item in the pipeline.
Attributes:
| Name | Type | Description |
|---|---|---|
item_id |
Any
|
Unique identifier for the item |
stage |
str | None
|
Name of the stage (None if not stage-specific) |
status |
StatusType
|
Current status of the item |
worker |
str | None
|
Name of the worker processing the item (if applicable) |
timestamp |
float
|
Unix timestamp when the event occurred |
metadata |
Dict[str, Any]
|
Additional metadata about the event |
worker_id
property
¶
Extract worker ID from worker name.
Examples:
"ProcessBatch-W5" -> 5 "Fetch-W0" -> 0 None -> None
Returns:
| Type | Description |
|---|---|
int | None
|
Worker ID (0-indexed) or None if not available |
FailedItem
dataclass
¶
Details of a single failed item.
Attributes:
| Name | Type | Description |
|---|---|---|
item_id |
Any
|
Unique identifier of the failed item |
error |
str
|
Error message string |
error_type |
str
|
Type name of the exception |
stage |
str
|
Stage where failure occurred |
attempts |
int
|
Number of attempts made before failure |
timestamp |
float
|
Unix timestamp when failure occurred |
ErrorSummary
dataclass
¶
ErrorSummary(total_failed: int, errors_by_type: Dict[str, int], errors_by_stage: Dict[str, int], failed_items: List[FailedItem])
Aggregated error information from pipeline execution.
Attributes:
| Name | Type | Description |
|---|---|---|
total_failed |
int
|
Total count of failed items |
errors_by_type |
Dict[str, int]
|
Count of errors grouped by exception type |
errors_by_stage |
Dict[str, int]
|
Count of errors grouped by stage name |
failed_items |
List[FailedItem]
|
List of individual failed item details |
DashboardProtocol
¶
Bases: Protocol
Protocol for custom dashboard implementations.
Implement this protocol to create custom dashboards that integrate with Pipeline.run()'s dashboard parameter.
Example
class MyDashboard:
def on_start(self, pipeline, total_items):
print(f"Starting {total_items} items")
def on_update(self, snapshot):
print(f"Progress: {snapshot.pipeline_stats.items_processed}")
def on_finish(self, results, summary):
print(f"Done! {len(results)} results, {summary.total_failed} failed")
results = await pipeline.run(items, custom_dashboard=MyDashboard())