Quick Start¶
Get started with AntFlow in minutes!
AsyncExecutor Example¶
The AsyncExecutor provides a familiar concurrent.futures-style API for async operations:
import asyncio
from antflow import AsyncExecutor
async def process_item(x):
await asyncio.sleep(0.1)
return x * 2
async def main():
async with AsyncExecutor(max_workers=5) as executor:
# Map over items
results = []
async for result in executor.map(process_item, range(10)):
results.append(result)
print(results) # [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
asyncio.run(main())
Pipeline Example¶
Create multi-stage processing pipelines with ease:
import asyncio
from antflow import Pipeline, Stage
async def fetch(x):
return f"data_{x}"
async def process(x):
return x.upper()
async def main():
# Define stages
stage1 = Stage(name="Fetch", workers=3, tasks=[fetch])
stage2 = Stage(name="Process", workers=2, tasks=[process])
# Create and run pipeline
pipeline = Pipeline(stages=[stage1, stage2])
results = await pipeline.run(range(10))
print(results)
asyncio.run(main())
Key Concepts¶
AsyncExecutor¶
Similar to concurrent.futures.ThreadPoolExecutor but for async operations:
submit(fn, *args, **kwargs)- Submit a single async taskmap(fn, *iterables)- Map function over iterablesas_completed(futures)- Iterate over futures as they complete
Pipeline¶
Multi-stage processing with configurable workers and retry strategies:
- Stages - Define processing steps with worker pools
- Tasks - Sequence of async functions per stage
- Retry Strategies - Per-task or per-stage retry logic
- StatusTracker - Real-time monitoring with event callbacks
- Worker Tracking - Track which worker processes each item with custom item IDs
Next Steps¶
- Explore the AsyncExecutor Guide for detailed usage
- Learn about Pipeline features and patterns
- See Worker Tracking for monitoring individual workers
- Check out Examples for real-world use cases
- Browse the API Reference for complete documentation