Concurrent Flow Demo¶
This example demonstrates Routilux’s concurrent execution capabilities, showing how multiple routines can execute in parallel to improve performance.
Overview¶
The demo simulates a real-world scenario where data needs to be fetched from multiple sources, processed, and aggregated. Using concurrent execution, all data fetching operations run in parallel, significantly reducing total execution time.
Key Features Demonstrated¶
Unified Event Queue: Both sequential and concurrent modes use the same queue mechanism
Automatic Flow Detection: No need to pass flow parameter in
emit()callsNon-blocking emit(): Event emission returns immediately after enqueuing tasks
Concurrent execution strategy
Multiple parallel routines
Performance comparison (sequential vs concurrent)
Thread-safe state management
Error handling in concurrent execution
Serialization of concurrent flows
Dynamic strategy switching
wait_for_completion(): Proper waiting for async tasks
Example Code¶
1#!/usr/bin/env python
2"""
3Concurrent Flow Execution Demo
4
5This demo demonstrates Routilux's concurrent execution capabilities.
6It shows how multiple routines can execute in parallel using thread pools,
7significantly improving performance for I/O-bound operations.
8
9Features demonstrated:
10- Concurrent execution strategy
11- Multiple parallel routines
12- Dependency handling
13- Performance comparison (sequential vs concurrent)
14- Thread-safe state management
15- Error handling in concurrent execution
16- Serialization of concurrent flows
17"""
18
19import json
20import time
21from typing import Any, Dict
22
23from routilux import ErrorHandler, ErrorStrategy, Flow, Routine
24
25# ============================================================================
26# Routine Definitions
27# ============================================================================
28
29
30class DataFetcher(Routine):
31 """Fetch data from multiple sources concurrently"""
32
33 def __init__(self):
34 super().__init__()
35 self.input_slot = self.define_slot("trigger", handler=self.fetch_data)
36 self.output_event = self.define_event("data_fetched", ["data", "source", "timestamp"])
37
38 def configure(self, source_name: str = None, delay: float = 0.2):
39 """Configure the fetcher"""
40 self.set_config(source_name=source_name or "unknown", delay=delay)
41
42 @property
43 def source_name(self):
44 return self.get_config("source_name", "unknown")
45
46 @property
47 def delay(self):
48 return self.get_config("delay", 0.2)
49
50 def fetch_data(self, **kwargs):
51 """Simulate fetching data from a source (I/O operation)"""
52 # Execution state should be stored in JobState, not routine._stats
53
54 # Simulate network delay
55 time.sleep(self.delay)
56
57 # Simulate fetching data
58 data = {
59 "source": self.source_name,
60 "content": f"Data from {self.source_name}",
61 "size": len(self.source_name) * 10,
62 "fetched_at": time.time(),
63 }
64
65 # Flow is automatically detected from routine context
66 self.emit(
67 "data_fetched",
68 data=data,
69 source=self.source_name,
70 timestamp=time.time(),
71 )
72
73
74class DataProcessor(Routine):
75 """Process fetched data"""
76
77 def __init__(self):
78 super().__init__()
79 self.input_slot = self.define_slot("data_input", handler=self.process_data)
80 self.output_event = self.define_event(
81 "data_processed", ["result", "processor_id", "processing_time"]
82 )
83
84 def configure(self, processor_id: str = None):
85 """Configure the processor"""
86 self.set_config(processor_id=processor_id or "unknown")
87
88 @property
89 def processor_id(self):
90 return self.get_config("processor_id", "unknown")
91
92 def process_data(self, data: Dict[str, Any], source: str, timestamp: float):
93 """Process the fetched data"""
94 # Execution state should be stored in JobState, not routine._stats
95
96 start_time = time.time()
97
98 # Simulate processing (CPU-bound operation)
99 processed = {
100 "original": data,
Running the Example¶
python examples/concurrent_flow_demo.py
Expected Output¶
The demo will show:
Concurrent Execution Test: Demonstrates parallel execution of multiple data fetchers
Performance Comparison: Shows execution time difference between sequential and concurrent modes
Error Handling: Demonstrates error handling in concurrent scenarios
Serialization: Shows that concurrent flows can be serialized and deserialized
Strategy Switching: Demonstrates dynamic strategy changes
Performance Results¶
Typical performance improvements:
Sequential Execution: ~0.65-0.75 seconds for 3 parallel tasks
Concurrent Execution: ~0.25-0.30 seconds for the same tasks
Speedup: 2-3x faster with concurrent execution
The actual speedup depends on: - Number of parallel routines - I/O wait time - System resources - Thread pool size
Key Concepts¶
Concurrent Execution Strategy¶
When a flow is created with execution_strategy="concurrent", tasks are processed concurrently using a thread pool:
flow = Flow(
execution_strategy="concurrent",
max_workers=5
)
Unified Queue Mechanism:
- Both sequential and concurrent modes use the same event queue
- Sequential mode: max_workers=1 (one task at a time)
- Concurrent mode: max_workers>1 (multiple tasks in parallel)
- Tasks are processed fairly in queue order
Automatic Flow Detection:
- emit() automatically detects flow from routine context
- No need to pass flow parameter in most cases
- Flow context is set automatically by Flow.execute() and Flow.resume()
def process_data(self, data=None, **kwargs):
# Flow is automatically detected - no need to pass it!
self.emit("output", result=f"Processed: {data}")
Thread Pool Management¶
The max_workers parameter controls the maximum number of concurrent threads:
flow = Flow(execution_strategy="concurrent", max_workers=10)
Thread Safety¶
All state updates are thread-safe: - Routine stats are protected - JobState updates are synchronized - Execution tracking is safe
Error Handling¶
Errors in concurrent execution are handled the same way as sequential execution:
flow.set_error_handler(ErrorHandler(strategy=ErrorStrategy.CONTINUE))
# Errors in one routine don't block others
Serialization¶
Concurrent flows can be serialized and deserialized:
# Serialize
data = flow.serialize()
# Deserialize
new_flow = Flow()
new_flow.deserialize(data)
# Execution strategy and max_workers are preserved
Waiting for Completion¶
In concurrent execution, tasks run asynchronously. Use JobState.wait_for_completion() to wait for all tasks:
from routilux.job_state import JobState
flow = Flow(execution_strategy="concurrent")
job_state = flow.execute("entry_routine")
# Wait for all concurrent tasks to complete
JobState.wait_for_completion(flow, job_state, timeout=10.0)
# Now all tasks are guaranteed to be finished
Resource Cleanup¶
Always properly shut down concurrent flows to clean up resources:
from routilux.job_state import JobState
flow = Flow(execution_strategy="concurrent")
try:
job_state = flow.execute("entry_routine")
JobState.wait_for_completion(flow, job_state, timeout=10.0)
finally:
flow.shutdown(wait=True) # Clean up thread pool
Use Cases¶
Concurrent execution is ideal for:
Multiple API Calls: Fetching data from multiple APIs simultaneously
Database Queries: Running multiple independent queries in parallel
File Processing: Processing multiple files concurrently
Network Operations: Any I/O-bound operations that can run in parallel
Data Aggregation: Collecting data from multiple sources simultaneously
Best Practices¶
Choose Appropriate max_workers: Too many threads can cause overhead
Use for I/O-bound Operations: Concurrent execution is most beneficial for I/O-bound tasks
Handle Errors Properly: Use appropriate error handling strategies
Monitor Performance: Use ExecutionTracker to monitor concurrent execution performance
Test Both Strategies: Compare sequential and concurrent performance for your use case
Wait for Completion: Always call
wait_for_completion()after execution to ensure all tasks finishClean Up Resources: Always call
shutdown()when done with a concurrent flow, preferably in atry/finallyblock
See Also¶
Working with Flows - Flow usage guide
Error Handling and Exception Management - Error handling strategies
Flow API - Flow API reference