Basic Examples¶
Simple examples to get started with AntFlow.
AsyncExecutor Examples¶
Using map()¶
The map() method applies an async function to multiple inputs:
import asyncio
from antflow import AsyncExecutor
async def square(x: int) -> int:
"""Square a number with a small delay."""
await asyncio.sleep(0.1)
return x * x
async def main():
async with AsyncExecutor(max_workers=5) as executor:
# Map with order preservation (default)
results = []
async for result in executor.map(square, range(10)):
results.append(result)
print(results) # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
asyncio.run(main())
Output:
Using submit()¶
Submit individual tasks and collect results:
import asyncio
from antflow import AsyncExecutor
async def cube(x: int) -> int:
"""Cube a number with a small delay."""
await asyncio.sleep(0.15)
return x * x * x
async def main():
async with AsyncExecutor(max_workers=5) as executor:
# Submit individual tasks
futures = [executor.submit(cube, i) for i in range(5)]
# Collect results
results = [await f.result() for f in futures]
print(results) # [0, 1, 8, 27, 64]
asyncio.run(main())
Output:
Using as_completed()¶
Process results as they complete:
import asyncio
from antflow import AsyncExecutor
async def main():
async with AsyncExecutor(max_workers=5) as executor:
futures = [executor.submit(square, i) for i in range(5, 10)]
async for future in executor.as_completed(futures):
result = await future.result()
print(f"Completed: {result}")
asyncio.run(main())
Output:
Pipeline Examples¶
Two-Stage ETL Pipeline¶
Extract, transform, and load data through a pipeline:
import asyncio
from antflow import Pipeline, Stage
async def extract(x: int) -> str:
"""Extract/fetch data."""
await asyncio.sleep(0.05)
return f"data_{x}"
async def transform(x: str) -> str:
"""Transform data."""
await asyncio.sleep(0.05)
return x.upper()
async def load(x: str) -> str:
"""Load/save data."""
await asyncio.sleep(0.05)
return f"saved_{x}"
async def main():
# Define stages
extract_stage = Stage(
name="Extract",
workers=3,
tasks=[extract],
retry="per_task",
task_attempts=3
)
transform_stage = Stage(
name="Transform",
workers=2,
tasks=[transform, load], # Multiple tasks in sequence
retry="per_task",
task_attempts=3
)
# Create pipeline
pipeline = Pipeline(
stages=[extract_stage, transform_stage],
collect_results=True
)
# Process items
items = list(range(20))
results = await pipeline.run(items)
print(f"Processed {len(results)} items")
for result in results[:5]:
print(f" ID {result['id']}: {result['value']}")
asyncio.run(main())
Output:
Processed 20 items
ID 0: saved_DATA_0
ID 1: saved_DATA_1
ID 2: saved_DATA_2
ID 3: saved_DATA_3
ID 4: saved_DATA_4
Pipeline with Status Tracking¶
Monitor pipeline execution with StatusTracker:
from antflow import Pipeline, Stage, StatusTracker
async def on_status_change(event):
if event.status == "completed":
print(f"✅ Completed item {event.item_id}")
elif event.status == "failed":
print(f"❌ Failed item {event.item_id}: {event.metadata.get('error')}")
tracker = StatusTracker(on_status_change=on_status_change)
stage = Stage(
name="ProcessStage",
workers=3,
tasks=[process_data],
retry="per_task",
task_attempts=3
)
pipeline = Pipeline(stages=[stage], status_tracker=tracker)
results = await pipeline.run(items)
# Query statistics
stats = tracker.get_stats()
print(f"Completed: {stats['completed']}, Failed: {stats['failed']}")
Getting Pipeline Statistics¶
Monitor pipeline progress:
import asyncio
from antflow import Pipeline, Stage
async def main():
pipeline = Pipeline(stages=[extract_stage, transform_stage])
results = await pipeline.run(range(100))
# Get statistics
stats = pipeline.get_stats()
print(f"Items processed: {stats.items_processed}")
print(f"Items failed: {stats.items_failed}")
print(f"Items in-flight: {stats.items_in_flight}")
print(f"Queue sizes: {stats.queue_sizes}")
asyncio.run(main())
Output:
Complete Basic Example¶
Here's a complete working example combining executor and pipeline:
import asyncio
from antflow import AsyncExecutor, Pipeline, Stage
# Executor example
async def double(x):
await asyncio.sleep(0.1)
return x * 2
# Pipeline example
async def fetch(x):
return f"item_{x}"
async def process(x):
return x.upper()
async def main():
# Use executor for quick parallel processing
print("=== AsyncExecutor ===")
async with AsyncExecutor(max_workers=5) as executor:
results = []
async for result in executor.map(double, range(5)):
results.append(result)
print(f"Executor results: {results}")
# Use pipeline for multi-stage processing
print("\n=== Pipeline ===")
stage1 = Stage(name="Fetch", workers=3, tasks=[fetch])
stage2 = Stage(name="Process", workers=2, tasks=[process])
pipeline = Pipeline(stages=[stage1, stage2])
results = await pipeline.run(range(5))
print(f"Pipeline results:")
for r in results:
print(f" {r['id']}: {r['value']}")
if __name__ == "__main__":
asyncio.run(main())
Output:
=== AsyncExecutor ===
Executor results: [0, 2, 4, 6, 8]
=== Pipeline ===
Pipeline results:
0: ITEM_0
1: ITEM_1
2: ITEM_2
3: ITEM_3
4: ITEM_4
Next Steps¶
- Explore Advanced Examples for more complex use cases
- Read the AsyncExecutor Guide for detailed documentation
- Learn about Pipeline features and patterns
- Check the API Reference for complete documentation