Types API¶
The antflow.types module defines the core data structures used for type hinting and data exchange between components.
Overview¶
This module contains Data Classes that represent the state of the system, including events, metrics, and snapshots.
Enumerated Types¶
StatusType¶
Represents the current state of an item in the pipeline.
| Value | Description |
|---|---|
queued |
Item is waiting in a stage's input queue. |
in_progress |
Item is currently being processed by a worker. |
completed |
Item has successfully finished processing in the stage. |
failed |
Item failed processing (after exhausting retries). |
retrying |
Item failed but is queued for a retry (per-stage retry strategy). |
WorkerStatus¶
Represents the current activity state of a worker.
| Value | Description |
|---|---|
idle |
Worker is waiting for new items. |
busy |
Worker is currently processing an item. |
TaskEventType¶
Represents the type of event occurring at the task level.
| Value | Description |
|---|---|
start |
A task function has started execution. |
complete |
A task function has completed successfully. |
retry |
A task failed and is being retried. |
fail |
A task failed permanently (after exhausting retries). |
Data Classes¶
PipelineResult
dataclass
¶
PipelineResult(id: Any, value: Any, sequence_id: int, metadata: Dict[str, Any] = dict(), error: Optional[Exception] = None)
Result of a pipeline execution for a single item.
Attributes:
| Name | Type | Description |
|---|---|---|
id |
Any
|
Unique identifier of the item |
value |
Any
|
The final processed value |
sequence_id |
int
|
Internal sequence number for ordering |
metadata |
Dict[str, Any]
|
Additional metadata from the input item |
error |
Optional[Exception]
|
Exception if processing failed (usually None for successful results) |
StageStats
dataclass
¶
StageStats(stage_name: str, pending_items: int, in_progress_items: int, completed_items: int, failed_items: int)
Statistics for a single stage.
Attributes:
| Name | Type | Description |
|---|---|---|
stage_name |
str
|
Name of the stage |
pending_items |
int
|
Number of items in the queue |
in_progress_items |
int
|
Number of items currently being processed (busy workers) |
completed_items |
int
|
Total items successfully processed by this stage |
failed_items |
int
|
Total items failed in this stage |
PipelineStats
dataclass
¶
PipelineStats(items_processed: int, items_failed: int, items_in_flight: int, queue_sizes: Dict[str, int], stage_stats: Dict[str, StageStats] = dict())
Aggregate statistics for the pipeline.
Attributes:
| Name | Type | Description |
|---|---|---|
items_processed |
int
|
Total number of items successfully processed (final output) |
items_failed |
int
|
Total number of items that failed |
items_in_flight |
int
|
Number of items currently being processed anywhere |
queue_sizes |
Dict[str, int]
|
Dictionary mapping stage names to current queue sizes |
stage_stats |
Dict[str, StageStats]
|
Detailed statistics per stage |
WorkerState
dataclass
¶
WorkerState(worker_name: str, stage: str, status: WorkerStatus, current_item_id: Optional[Any] = None, processing_since: Optional[float] = None)
Current state of a worker.
Attributes:
| Name | Type | Description |
|---|---|---|
worker_name |
str
|
Unique name of the worker (e.g., 'Fetch-W0') |
stage |
str
|
Name of the stage this worker belongs to |
status |
WorkerStatus
|
Current status ('idle' or 'busy') |
current_item_id |
Optional[Any]
|
ID of the item currently being processed (if busy) |
processing_since |
Optional[float]
|
Timestamp when current processing started |
WorkerMetrics
dataclass
¶
WorkerMetrics(worker_name: str, stage: str, items_processed: int = 0, items_failed: int = 0, total_processing_time: float = 0.0, last_active: Optional[float] = None)
Performance metrics for a single worker.
Attributes:
| Name | Type | Description |
|---|---|---|
worker_name |
str
|
Unique name of the worker |
stage |
str
|
Name of the stage |
items_processed |
int
|
Count of successfully processed items |
items_failed |
int
|
Count of failed items |
total_processing_time |
float
|
Cumulative processing time in seconds |
last_active |
Optional[float]
|
Timestamp of last activity |
avg_processing_time
property
¶
Calculate average processing time per item.
TaskEvent
dataclass
¶
TaskEvent(item_id: Any, stage: str, task_name: str, worker: str, event_type: TaskEventType, attempt: int, timestamp: float, error: Optional[Exception] = None, duration: Optional[float] = None)
Event emitted for task-level operations within a stage.
Provides granular visibility into individual task execution, including retries and failures at the task level.
Attributes:
| Name | Type | Description |
|---|---|---|
item_id |
Any
|
Item being processed |
stage |
str
|
Stage name |
task_name |
str
|
Name of the specific task function |
worker |
str
|
Worker name processing the task |
event_type |
TaskEventType
|
Type of event ([TaskEventType][antflow.types.TaskEventType]) |
attempt |
int
|
Current attempt number (1-indexed) |
timestamp |
float
|
Unix timestamp when event occurred |
error |
Optional[Exception]
|
Exception if task failed or is retrying (None otherwise) |
duration |
Optional[float]
|
Time taken to execute task in seconds (None for start events) |
DashboardSnapshot
dataclass
¶
DashboardSnapshot(worker_states: Dict[str, WorkerState], worker_metrics: Dict[str, WorkerMetrics], pipeline_stats: PipelineStats, timestamp: float)
Snapshot of the entire pipeline state for monitoring.
Attributes:
| Name | Type | Description |
|---|---|---|
worker_states |
Dict[str, WorkerState]
|
Dictionary of all WorkerState |
worker_metrics |
Dict[str, WorkerMetrics]
|
Dictionary of all WorkerMetrics |
pipeline_stats |
PipelineStats
|
Aggregate PipelineStats |
timestamp |
float
|
Timestamp when snapshot was taken |
StatusEvent
dataclass
¶
StatusEvent(item_id: Any, stage: str | None, status: StatusType, worker: str | None, timestamp: float, metadata: Dict[str, Any] = dict())
Represents a status change event for an item in the pipeline.
Attributes:
| Name | Type | Description |
|---|---|---|
item_id |
Any
|
Unique identifier for the item |
stage |
str | None
|
Name of the stage (None if not stage-specific) |
status |
StatusType
|
Current status of the item |
worker |
str | None
|
Name of the worker processing the item (if applicable) |
timestamp |
float
|
Unix timestamp when the event occurred |
metadata |
Dict[str, Any]
|
Additional metadata about the event |
worker_id
property
¶
Extract worker ID from worker name.
Examples:
"ProcessBatch-W5" -> 5 "Fetch-W0" -> 0 None -> None
Returns:
| Type | Description |
|---|---|
int | None
|
Worker ID (0-indexed) or None if not available |