Skip to content

Types API

The antflow.types module defines the core data structures used for type hinting and data exchange between components.

Overview

This module contains Data Classes that represent the state of the system, including events, metrics, and snapshots.

Enumerated Types

StatusType

Represents the current state of an item in the pipeline.

Value Description
queued Item is waiting in a stage's input queue.
in_progress Item is currently being processed by a worker.
completed Item has successfully finished processing in the stage.
failed Item failed processing (after exhausting retries).
retrying Item failed but is queued for a retry (per-stage retry strategy).

WorkerStatus

Represents the current activity state of a worker.

Value Description
idle Worker is waiting for new items.
busy Worker is currently processing an item.

TaskEventType

Represents the type of event occurring at the task level.

Value Description
start A task function has started execution.
complete A task function has completed successfully.
retry A task failed and is being retried.
fail A task failed permanently (after exhausting retries).

Data Classes

PipelineResult dataclass

PipelineResult(id: Any, value: Any, sequence_id: int, metadata: Dict[str, Any] = dict(), error: Optional[Exception] = None)

Result of a pipeline execution for a single item.

Attributes:

Name Type Description
id Any

Unique identifier of the item

value Any

The final processed value

sequence_id int

Internal sequence number for ordering

metadata Dict[str, Any]

Additional metadata from the input item

error Optional[Exception]

Exception if processing failed (usually None for successful results)

is_success property

is_success: bool

Check if processing was successful.

StageStats dataclass

StageStats(stage_name: str, pending_items: int, in_progress_items: int, completed_items: int, failed_items: int)

Statistics for a single stage.

Attributes:

Name Type Description
stage_name str

Name of the stage

pending_items int

Number of items in the queue

in_progress_items int

Number of items currently being processed (busy workers)

completed_items int

Total items successfully processed by this stage

failed_items int

Total items failed in this stage

PipelineStats dataclass

PipelineStats(items_processed: int, items_failed: int, items_in_flight: int, queue_sizes: Dict[str, int], stage_stats: Dict[str, StageStats] = dict())

Aggregate statistics for the pipeline.

Attributes:

Name Type Description
items_processed int

Total number of items successfully processed (final output)

items_failed int

Total number of items that failed

items_in_flight int

Number of items currently being processed anywhere

queue_sizes Dict[str, int]

Dictionary mapping stage names to current queue sizes

stage_stats Dict[str, StageStats]

Detailed statistics per stage

WorkerState dataclass

WorkerState(worker_name: str, stage: str, status: WorkerStatus, current_item_id: Optional[Any] = None, processing_since: Optional[float] = None)

Current state of a worker.

Attributes:

Name Type Description
worker_name str

Unique name of the worker (e.g., 'Fetch-W0')

stage str

Name of the stage this worker belongs to

status WorkerStatus

Current status ('idle' or 'busy')

current_item_id Optional[Any]

ID of the item currently being processed (if busy)

processing_since Optional[float]

Timestamp when current processing started

WorkerMetrics dataclass

WorkerMetrics(worker_name: str, stage: str, items_processed: int = 0, items_failed: int = 0, total_processing_time: float = 0.0, last_active: Optional[float] = None)

Performance metrics for a single worker.

Attributes:

Name Type Description
worker_name str

Unique name of the worker

stage str

Name of the stage

items_processed int

Count of successfully processed items

items_failed int

Count of failed items

total_processing_time float

Cumulative processing time in seconds

last_active Optional[float]

Timestamp of last activity

avg_processing_time property

avg_processing_time: float

Calculate average processing time per item.

TaskEvent dataclass

TaskEvent(item_id: Any, stage: str, task_name: str, worker: str, event_type: TaskEventType, attempt: int, timestamp: float, error: Optional[Exception] = None, duration: Optional[float] = None)

Event emitted for task-level operations within a stage.

Provides granular visibility into individual task execution, including retries and failures at the task level.

Attributes:

Name Type Description
item_id Any

Item being processed

stage str

Stage name

task_name str

Name of the specific task function

worker str

Worker name processing the task

event_type TaskEventType

Type of event ([TaskEventType][antflow.types.TaskEventType])

attempt int

Current attempt number (1-indexed)

timestamp float

Unix timestamp when event occurred

error Optional[Exception]

Exception if task failed or is retrying (None otherwise)

duration Optional[float]

Time taken to execute task in seconds (None for start events)

DashboardSnapshot dataclass

DashboardSnapshot(worker_states: Dict[str, WorkerState], worker_metrics: Dict[str, WorkerMetrics], pipeline_stats: PipelineStats, timestamp: float)

Snapshot of the entire pipeline state for monitoring.

Attributes:

Name Type Description
worker_states Dict[str, WorkerState]

Dictionary of all WorkerState

worker_metrics Dict[str, WorkerMetrics]

Dictionary of all WorkerMetrics

pipeline_stats PipelineStats

Aggregate PipelineStats

timestamp float

Timestamp when snapshot was taken

StatusEvent dataclass

StatusEvent(item_id: Any, stage: str | None, status: StatusType, worker: str | None, timestamp: float, metadata: Dict[str, Any] = dict())

Represents a status change event for an item in the pipeline.

Attributes:

Name Type Description
item_id Any

Unique identifier for the item

stage str | None

Name of the stage (None if not stage-specific)

status StatusType

Current status of the item

worker str | None

Name of the worker processing the item (if applicable)

timestamp float

Unix timestamp when the event occurred

metadata Dict[str, Any]

Additional metadata about the event

worker_id property

worker_id: int | None

Extract worker ID from worker name.

Examples:

"ProcessBatch-W5" -> 5 "Fetch-W0" -> 0 None -> None

Returns:

Type Description
int | None

Worker ID (0-indexed) or None if not available