Concurrent Execution¶
In this tutorial, you’ll learn how to execute independent routines in parallel using Routilux’s concurrent execution mode for better performance.
Learning Objectives¶
By the end of this tutorial, you’ll be able to:
Understand when to use concurrent execution
Configure concurrent execution mode
Handle thread-safe operations
Use wait_for_completion() and shutdown()
Build high-performance workflows
Step 1: Understanding Concurrent Execution¶
Concurrent execution allows independent routines to run in parallel, which is especially useful for I/O-bound operations (network calls, file I/O, etc.):
1from routilux import Flow, Routine
2import time
3
4class SlowIOOperation(Routine):
5 def __init__(self):
6 super().__init__()
7 # Store configuration in _config (required for serialization)
8 self.set_config(name="Operation", delay=0.2)
9 self.trigger_slot = self.define_slot("trigger", handler=self.operate)
10 self.output_event = self.define_event("output", ["result"])
11
12 def operate(self, **kwargs):
13 # Simulate I/O operation (network call, file read, etc.)
14 delay = self.get_config("delay", 0.2)
15 name = self.get_config("name", "Operation")
16 time.sleep(delay)
17 result = f"{name} completed"
18 print(f"[{time.time():.2f}] {result}")
19 self.emit("output", result=result)
20
21# Sequential execution (default)
22print("=== Sequential Execution ===")
23flow_seq = Flow(flow_id="sequential", execution_strategy="sequential")
24
25op1 = SlowIOOperation()
26op1.set_config(name="Operation1", delay=0.2)
27op2 = SlowIOOperation()
28op2.set_config(name="Operation2", delay=0.2)
29op3 = SlowIOOperation()
30op3.set_config(name="Operation3", delay=0.2)
31
32op1_id = flow_seq.add_routine(op1, "op1")
33op2_id = flow_seq.add_routine(op2, "op2")
34op3_id = flow_seq.add_routine(op3, "op3")
35
36start = time.time()
37flow_seq.execute(op1_id)
38flow_seq.execute(op2_id)
39flow_seq.execute(op3_id)
40elapsed = time.time() - start
41print(f"Sequential time: {elapsed:.2f}s")
Expected Output:
=== Sequential Execution ===
[1234567890.12] Operation1 completed
[1234567890.32] Operation2 completed
[1234567890.52] Operation3 completed
Sequential time: 0.60s
Key Points:
Sequential mode executes one routine at a time
Total time is sum of all operation times
Suitable for CPU-bound or dependent operations
Step 2: Enabling Concurrent Execution¶
Enable concurrent execution by setting the execution strategy:
1from routilux import Flow, Routine
2import time
3
4class SlowIOOperation(Routine):
5 def __init__(self):
6 super().__init__()
7 # Store configuration in _config (required for serialization)
8 self.set_config(name="Operation", delay=0.2)
9 self.trigger_slot = self.define_slot("trigger", handler=self.operate)
10 self.output_event = self.define_event("output", ["result"])
11
12 def operate(self, **kwargs):
13 delay = self.get_config("delay", 0.2)
14 name = self.get_config("name", "Operation")
15 time.sleep(delay)
16 result = f"{name} completed"
17 print(f"[{time.time():.2f}] {result}")
18 self.emit("output", result=result)
19
20# Concurrent execution
21print("=== Concurrent Execution ===")
22flow_conc = Flow(
23 flow_id="concurrent",
24 execution_strategy="concurrent",
25 max_workers=3 # Number of parallel workers
26)
27
28op1 = SlowIOOperation()
29op1.set_config(name="Operation1", delay=0.2)
30op2 = SlowIOOperation()
31op2.set_config(name="Operation2", delay=0.2)
32op3 = SlowIOOperation()
33op3.set_config(name="Operation3", delay=0.2)
34
35op1_id = flow_conc.add_routine(op1, "op1")
36op2_id = flow_conc.add_routine(op2, "op2")
37op3_id = flow_conc.add_routine(op3, "op3")
38
39start = time.time()
40flow_conc.execute(op1_id)
41flow_conc.execute(op2_id)
42flow_conc.execute(op3_id)
43
44# Wait for all concurrent tasks to complete
45flow_conc.wait_for_completion(timeout=5.0)
46elapsed = time.time() - start
47print(f"Concurrent time: {elapsed:.2f}s")
48
49# Always clean up
50flow_conc.shutdown(wait=True)
Expected Output:
=== Concurrent Execution ===
[1234567890.12] Operation1 completed
[1234567890.12] Operation2 completed
[1234567890.12] Operation3 completed
Concurrent time: 0.22s
Key Points:
Concurrent mode runs multiple routines in parallel
Total time is approximately the longest operation time
Use
wait_for_completion()to wait for all tasksAlways call
shutdown()to clean up thread pool
Step 3: Concurrent Execution in a Flow¶
In a connected flow, independent routines can execute concurrently:
1from routilux import Flow, Routine
2import time
3
4class DataFetcher(Routine):
5 def __init__(self):
6 super().__init__()
7 # Store configuration in _config (required for serialization)
8 self.set_config(source_name="Source", delay=0.1)
9 self.trigger_slot = self.define_slot("trigger", handler=self.fetch)
10 self.output_event = self.define_event("output", ["data", "source"])
11
12 def fetch(self, **kwargs):
13 delay = self.get_config("delay", 0.1)
14 source_name = self.get_config("source_name", "Source")
15 time.sleep(delay) # Simulate network delay
16 data = f"Data from {source_name}"
17 print(f"[{time.time():.2f}] Fetched: {data}")
18 self.emit("output", data=data, source=source_name)
19
20class Aggregator(Routine):
21 def __init__(self):
22 super().__init__()
23 self.input_slot = self.define_slot(
24 "input",
25 handler=self.aggregate,
26 merge_strategy="append"
27 )
28
29 def aggregate(self, data=None, source=None, **kwargs):
30 data_value = data or kwargs.get("data", "")
31 source_value = source or kwargs.get("source", "")
32 print(f"Aggregated: {data_value} from {source_value}")
33
34# Create concurrent flow
35flow = Flow(
36 flow_id="concurrent_flow",
37 execution_strategy="concurrent",
38 max_workers=3
39)
40
41# Create multiple fetchers
42fetcher1 = DataFetcher()
43fetcher1.set_config(source_name="Source1", delay=0.1)
44fetcher2 = DataFetcher()
45fetcher2.set_config(source_name="Source2", delay=0.1)
46fetcher3 = DataFetcher()
47fetcher3.set_config(source_name="Source3", delay=0.1)
48aggregator = Aggregator()
49
50f1_id = flow.add_routine(fetcher1, "fetcher1")
51f2_id = flow.add_routine(fetcher2, "fetcher2")
52f3_id = flow.add_routine(fetcher3, "fetcher3")
53agg_id = flow.add_routine(aggregator, "aggregator")
54
55# Connect all fetchers to aggregator
56flow.connect(f1_id, "output", agg_id, "input")
57flow.connect(f2_id, "output", agg_id, "input")
58flow.connect(f3_id, "output", agg_id, "input")
59
60# Execute all fetchers (they run concurrently)
61start = time.time()
62flow.execute(f1_id)
63flow.execute(f2_id)
64flow.execute(f3_id)
65
66from routilux.job_state import JobState
67JobState.wait_for_completion(flow, job_state, timeout=5.0)
68elapsed = time.time() - start
69print(f"Total time: {elapsed:.2f}s")
70
71flow.shutdown(wait=True)
Expected Output:
[1234567890.12] Fetched: Data from Source1
[1234567890.12] Fetched: Data from Source2
[1234567890.12] Fetched: Data from Source3
Aggregated: Data from Source1 from Source1
Aggregated: Data from Source2 from Source2
Aggregated: Data from Source3 from Source3
Total time: 0.12s
Key Points:
Independent routines in a flow can execute concurrently
Dependencies are automatically handled (aggregator waits for fetchers)
Use merge_strategy=”append” to collect results from multiple sources
Concurrent execution significantly improves performance for I/O-bound operations
Step 4: Thread Safety Considerations¶
When using concurrent execution, ensure thread-safe operations:
1from routilux import Flow, Routine
2import threading
3import time
4
5class ThreadSafeCounter(Routine):
6 def __init__(self):
7 super().__init__()
8 self.input_slot = self.define_slot("input", handler=self.increment)
9 self._lock = threading.Lock() # Thread lock for safety
10 self.set_stat("count", 0)
11
12 def increment(self, **kwargs):
13 # Thread-safe increment
14 with self._lock:
15 current = self.get_stat("count", 0)
16 self.set_stat("count", current + 1)
17 print(f"Count: {self.get_stat('count', 0)}")
18
19class DataSource(Routine):
20 def __init__(self):
21 super().__init__()
22 # Store configuration in _config (required for serialization)
23 self.set_config(name="Source")
24 self.trigger_slot = self.define_slot("trigger", handler=self.send)
25 self.output_event = self.define_event("output", ["data"])
26
27 def send(self, **kwargs):
28 name = self.get_config("name", "Source")
29 time.sleep(0.05) # Simulate processing
30 self.emit("output", data=f"Data from {name}")
31
32flow = Flow(
33 flow_id="threadsafe_flow",
34 execution_strategy="concurrent",
35 max_workers=5
36)
37
38counter = ThreadSafeCounter()
39counter_id = flow.add_routine(counter, "counter")
40
41# Create multiple sources
42sources = []
43source_ids = []
44for i in range(5):
45 source = DataSource()
46 source.set_config(name=f"Source{i}")
47 sources.append(source)
48 source_id = flow.add_routine(source, f"source{i}")
49 source_ids.append(source_id)
50 flow.connect(source_id, "output", counter_id, "input")
51
52# Execute all sources concurrently
53from routilux.job_state import JobState
54job_states = []
55for source_id in source_ids:
56 job_state = flow.execute(source_id)
57 job_states.append(job_state)
58
59# Wait for all executions to complete
60for job_state in job_states:
61 JobState.wait_for_completion(flow, job_state, timeout=5.0)
62print(f"Final count: {counter.get_stat('count', 0)}")
63
64flow.shutdown(wait=True)
Expected Output:
Count: 1
Count: 2
Count: 3
Count: 4
Count: 5
Final count: 5
Key Points:
Use locks (
threading.Lock()) for shared state modificationsRoutilux’s internal state management is thread-safe
Be careful with shared resources (files, databases, etc.)
Test concurrent workflows thoroughly
Step 5: Complete Example - Parallel Data Processing¶
Here’s a complete example of parallel data processing:
1from routilux import Flow, Routine
2import time
3
4class DataFetcher(Routine):
5 def __init__(self):
6 super().__init__()
7 # Store configuration in _config (required for serialization)
8 self.set_config(source_id="F0", delay=0.1)
9 self.trigger_slot = self.define_slot("trigger", handler=self.fetch)
10 self.output_event = self.define_event("output", ["data", "source_id", "timestamp"])
11
12 def fetch(self, **kwargs):
13 delay = self.get_config("delay", 0.1)
14 source_id = self.get_config("source_id", "F0")
15 time.sleep(delay)
16 timestamp = time.time()
17 data = f"Data from source {source_id}"
18 self.emit("output", data=data, source_id=source_id, timestamp=timestamp)
19
20class DataProcessor(Routine):
21 def __init__(self):
22 super().__init__()
23 self.input_slot = self.define_slot("input", handler=self.process)
24 self.output_event = self.define_event("output", ["processed", "source_id"])
25
26 def process(self, data=None, source_id=None, timestamp=None, **kwargs):
27 data_value = data or kwargs.get("data", "")
28 source = source_id or kwargs.get("source_id", "unknown")
29
30 # Process data
31 processed = f"Processed: {data_value}"
32 print(f"Processed data from source {source}")
33
34 self.emit("output", processed=processed, source_id=source)
35
36class ResultCollector(Routine):
37 def __init__(self):
38 super().__init__()
39 self.input_slot = self.define_slot(
40 "input",
41 handler=self.collect,
42 merge_strategy="append"
43 )
44 self.set_stat("collected_count", 0)
45
46 def collect(self, processed=None, source_id=None, **kwargs):
47 processed_value = processed or kwargs.get("processed", "")
48 source = source_id or kwargs.get("source_id", "unknown")
49
50 self.increment_stat("collected_count")
51 print(f"Collected: {processed_value} from {source}")
52
53def main():
54 flow = Flow(
55 flow_id="parallel_processing",
56 execution_strategy="concurrent",
57 max_workers=5
58 )
59
60 # Create multiple fetchers
61 fetchers = []
62 fetcher_ids = []
63 for i in range(5):
64 fetcher = DataFetcher()
65 fetcher.set_config(source_id=f"F{i}", delay=0.1)
66 fetchers.append(fetcher)
67 fetcher_id = flow.add_routine(fetcher, f"fetcher{i}")
68 fetcher_ids.append(fetcher_id)
69
70 processor = DataProcessor()
71 collector = ResultCollector()
72
73 processor_id = flow.add_routine(processor, "processor")
74 collector_id = flow.add_routine(collector, "collector")
75
76 # Connect: fetchers -> processor -> collector
77 for fetcher_id in fetcher_ids:
78 flow.connect(fetcher_id, "output", processor_id, "input")
79
80 flow.connect(processor_id, "output", collector_id, "input")
81
82 # Execute all fetchers concurrently
83 print("Starting parallel data fetching...")
84 start = time.time()
85
86 from routilux.job_state import JobState
87 job_states = []
88 for fetcher_id in fetcher_ids:
89 job_state = flow.execute(fetcher_id)
90 job_states.append(job_state)
91
92 # Wait for all executions to complete
93 for job_state in job_states:
94 JobState.wait_for_completion(flow, job_state, timeout=10.0)
95 elapsed = time.time() - start
96
97 print(f"\nCompleted in {elapsed:.2f}s")
98 print(f"Collected {collector.get_stat('collected_count', 0)} results")
99
100 flow.shutdown(wait=True)
101
102if __name__ == "__main__":
103 main()
Expected Output:
Starting parallel data fetching...
Processed data from source F0
Processed data from source F1
Processed data from source F2
Processed data from source F3
Processed data from source F4
Collected: Processed: Data from source F0 from F0
Collected: Processed: Data from source F1 from F1
Collected: Processed: Data from source F2 from F2
Collected: Processed: Data from source F3 from F3
Collected: Processed: Data from source F4 from F4
Completed in 0.15s
Collected 5 results
Key Points:
Concurrent execution significantly improves performance for I/O-bound operations
Independent routines can run in parallel
Always use
wait_for_completion()andshutdown()Use appropriate max_workers based on your workload
Common Pitfalls¶
Pitfall 1: Forgetting to wait for completion
flow.execute(routine_id)
# Missing wait_for_completion()!
# Tasks may not be finished yet
Solution: Always call wait_for_completion() after executing concurrent flows.
Pitfall 2: Not shutting down thread pool
from routilux.job_state import JobState
job_state = flow.execute(routine_id)
JobState.wait_for_completion(flow, job_state)
# Missing shutdown()!
# Thread pool resources not released
Solution: Always call shutdown(wait=True) to clean up resources.
Pitfall 3: Using concurrent mode for CPU-bound operations
# CPU-bound operations don't benefit from concurrent mode
# (Python's GIL limits true parallelism)
flow = Flow(execution_strategy="concurrent") # May not help
Solution: Use concurrent mode for I/O-bound operations, sequential for CPU-bound.
Best Practices¶
Use concurrent mode for I/O-bound operations: Network calls, file I/O, database queries
Use sequential mode for CPU-bound operations: Heavy computation, data processing
Set appropriate max_workers: Match your workload (typically 3-10)
Always wait for completion: Use
wait_for_completion()after executionAlways shutdown: Call
shutdown(wait=True)to clean up resourcesEnsure thread safety: Use locks for shared state modifications
Test thoroughly: Concurrent execution can reveal race conditions
Next Steps¶
Now that you understand concurrent execution, let’s move on to Advanced Patterns to learn about aggregation patterns, conditional routing, and other advanced features.