Concurrent Execution

In this tutorial, you’ll learn how to execute independent routines in parallel using Routilux’s concurrent execution mode for better performance.

Learning Objectives

By the end of this tutorial, you’ll be able to:

  • Understand when to use concurrent execution

  • Configure concurrent execution mode

  • Handle thread-safe operations

  • Use wait_for_completion() and shutdown()

  • Build high-performance workflows

Step 1: Understanding Concurrent Execution

Concurrent execution allows independent routines to run in parallel, which is especially useful for I/O-bound operations (network calls, file I/O, etc.):

 1from routilux import Flow, Routine
 2import time
 3
 4class SlowIOOperation(Routine):
 5    def __init__(self):
 6        super().__init__()
 7        # Store configuration in _config (required for serialization)
 8        self.set_config(name="Operation", delay=0.2)
 9        self.trigger_slot = self.define_slot("trigger", handler=self.operate)
10        self.output_event = self.define_event("output", ["result"])
11
12    def operate(self, **kwargs):
13        # Simulate I/O operation (network call, file read, etc.)
14        delay = self.get_config("delay", 0.2)
15        name = self.get_config("name", "Operation")
16        time.sleep(delay)
17        result = f"{name} completed"
18        print(f"[{time.time():.2f}] {result}")
19        self.emit("output", result=result)
20
21# Sequential execution (default)
22print("=== Sequential Execution ===")
23flow_seq = Flow(flow_id="sequential", execution_strategy="sequential")
24
25op1 = SlowIOOperation()
26op1.set_config(name="Operation1", delay=0.2)
27op2 = SlowIOOperation()
28op2.set_config(name="Operation2", delay=0.2)
29op3 = SlowIOOperation()
30op3.set_config(name="Operation3", delay=0.2)
31
32op1_id = flow_seq.add_routine(op1, "op1")
33op2_id = flow_seq.add_routine(op2, "op2")
34op3_id = flow_seq.add_routine(op3, "op3")
35
36start = time.time()
37flow_seq.execute(op1_id)
38flow_seq.execute(op2_id)
39flow_seq.execute(op3_id)
40elapsed = time.time() - start
41print(f"Sequential time: {elapsed:.2f}s")

Expected Output:

=== Sequential Execution ===
[1234567890.12] Operation1 completed
[1234567890.32] Operation2 completed
[1234567890.52] Operation3 completed
Sequential time: 0.60s

Key Points:

  • Sequential mode executes one routine at a time

  • Total time is sum of all operation times

  • Suitable for CPU-bound or dependent operations

Step 2: Enabling Concurrent Execution

Enable concurrent execution by setting the execution strategy:

 1from routilux import Flow, Routine
 2import time
 3
 4class SlowIOOperation(Routine):
 5    def __init__(self):
 6        super().__init__()
 7        # Store configuration in _config (required for serialization)
 8        self.set_config(name="Operation", delay=0.2)
 9        self.trigger_slot = self.define_slot("trigger", handler=self.operate)
10        self.output_event = self.define_event("output", ["result"])
11
12    def operate(self, **kwargs):
13        delay = self.get_config("delay", 0.2)
14        name = self.get_config("name", "Operation")
15        time.sleep(delay)
16        result = f"{name} completed"
17        print(f"[{time.time():.2f}] {result}")
18        self.emit("output", result=result)
19
20# Concurrent execution
21print("=== Concurrent Execution ===")
22flow_conc = Flow(
23    flow_id="concurrent",
24    execution_strategy="concurrent",
25    max_workers=3  # Number of parallel workers
26)
27
28op1 = SlowIOOperation()
29op1.set_config(name="Operation1", delay=0.2)
30op2 = SlowIOOperation()
31op2.set_config(name="Operation2", delay=0.2)
32op3 = SlowIOOperation()
33op3.set_config(name="Operation3", delay=0.2)
34
35op1_id = flow_conc.add_routine(op1, "op1")
36op2_id = flow_conc.add_routine(op2, "op2")
37op3_id = flow_conc.add_routine(op3, "op3")
38
39start = time.time()
40flow_conc.execute(op1_id)
41flow_conc.execute(op2_id)
42flow_conc.execute(op3_id)
43
44# Wait for all concurrent tasks to complete
45flow_conc.wait_for_completion(timeout=5.0)
46elapsed = time.time() - start
47print(f"Concurrent time: {elapsed:.2f}s")
48
49# Always clean up
50flow_conc.shutdown(wait=True)

Expected Output:

=== Concurrent Execution ===
[1234567890.12] Operation1 completed
[1234567890.12] Operation2 completed
[1234567890.12] Operation3 completed
Concurrent time: 0.22s

Key Points:

  • Concurrent mode runs multiple routines in parallel

  • Total time is approximately the longest operation time

  • Use wait_for_completion() to wait for all tasks

  • Always call shutdown() to clean up thread pool

Step 3: Concurrent Execution in a Flow

In a connected flow, independent routines can execute concurrently:

 1from routilux import Flow, Routine
 2import time
 3
 4class DataFetcher(Routine):
 5    def __init__(self):
 6        super().__init__()
 7        # Store configuration in _config (required for serialization)
 8        self.set_config(source_name="Source", delay=0.1)
 9        self.trigger_slot = self.define_slot("trigger", handler=self.fetch)
10        self.output_event = self.define_event("output", ["data", "source"])
11
12    def fetch(self, **kwargs):
13        delay = self.get_config("delay", 0.1)
14        source_name = self.get_config("source_name", "Source")
15        time.sleep(delay)  # Simulate network delay
16        data = f"Data from {source_name}"
17        print(f"[{time.time():.2f}] Fetched: {data}")
18        self.emit("output", data=data, source=source_name)
19
20class Aggregator(Routine):
21    def __init__(self):
22        super().__init__()
23        self.input_slot = self.define_slot(
24            "input",
25            handler=self.aggregate,
26            merge_strategy="append"
27        )
28
29    def aggregate(self, data=None, source=None, **kwargs):
30        data_value = data or kwargs.get("data", "")
31        source_value = source or kwargs.get("source", "")
32        print(f"Aggregated: {data_value} from {source_value}")
33
34# Create concurrent flow
35flow = Flow(
36    flow_id="concurrent_flow",
37    execution_strategy="concurrent",
38    max_workers=3
39)
40
41# Create multiple fetchers
42fetcher1 = DataFetcher()
43fetcher1.set_config(source_name="Source1", delay=0.1)
44fetcher2 = DataFetcher()
45fetcher2.set_config(source_name="Source2", delay=0.1)
46fetcher3 = DataFetcher()
47fetcher3.set_config(source_name="Source3", delay=0.1)
48aggregator = Aggregator()
49
50f1_id = flow.add_routine(fetcher1, "fetcher1")
51f2_id = flow.add_routine(fetcher2, "fetcher2")
52f3_id = flow.add_routine(fetcher3, "fetcher3")
53agg_id = flow.add_routine(aggregator, "aggregator")
54
55# Connect all fetchers to aggregator
56flow.connect(f1_id, "output", agg_id, "input")
57flow.connect(f2_id, "output", agg_id, "input")
58flow.connect(f3_id, "output", agg_id, "input")
59
60# Execute all fetchers (they run concurrently)
61start = time.time()
62flow.execute(f1_id)
63flow.execute(f2_id)
64flow.execute(f3_id)
65
66from routilux.job_state import JobState
67JobState.wait_for_completion(flow, job_state, timeout=5.0)
68elapsed = time.time() - start
69print(f"Total time: {elapsed:.2f}s")
70
71flow.shutdown(wait=True)

Expected Output:

[1234567890.12] Fetched: Data from Source1
[1234567890.12] Fetched: Data from Source2
[1234567890.12] Fetched: Data from Source3
Aggregated: Data from Source1 from Source1
Aggregated: Data from Source2 from Source2
Aggregated: Data from Source3 from Source3
Total time: 0.12s

Key Points:

  • Independent routines in a flow can execute concurrently

  • Dependencies are automatically handled (aggregator waits for fetchers)

  • Use merge_strategy=”append” to collect results from multiple sources

  • Concurrent execution significantly improves performance for I/O-bound operations

Step 4: Thread Safety Considerations

When using concurrent execution, ensure thread-safe operations:

 1from routilux import Flow, Routine
 2import threading
 3import time
 4
 5class ThreadSafeCounter(Routine):
 6    def __init__(self):
 7        super().__init__()
 8        self.input_slot = self.define_slot("input", handler=self.increment)
 9        self._lock = threading.Lock()  # Thread lock for safety
10        self.set_stat("count", 0)
11
12    def increment(self, **kwargs):
13        # Thread-safe increment
14        with self._lock:
15            current = self.get_stat("count", 0)
16            self.set_stat("count", current + 1)
17            print(f"Count: {self.get_stat('count', 0)}")
18
19class DataSource(Routine):
20    def __init__(self):
21        super().__init__()
22        # Store configuration in _config (required for serialization)
23        self.set_config(name="Source")
24        self.trigger_slot = self.define_slot("trigger", handler=self.send)
25        self.output_event = self.define_event("output", ["data"])
26
27    def send(self, **kwargs):
28        name = self.get_config("name", "Source")
29        time.sleep(0.05)  # Simulate processing
30        self.emit("output", data=f"Data from {name}")
31
32flow = Flow(
33    flow_id="threadsafe_flow",
34    execution_strategy="concurrent",
35    max_workers=5
36)
37
38counter = ThreadSafeCounter()
39counter_id = flow.add_routine(counter, "counter")
40
41# Create multiple sources
42sources = []
43source_ids = []
44for i in range(5):
45    source = DataSource()
46    source.set_config(name=f"Source{i}")
47    sources.append(source)
48    source_id = flow.add_routine(source, f"source{i}")
49    source_ids.append(source_id)
50    flow.connect(source_id, "output", counter_id, "input")
51
52# Execute all sources concurrently
53from routilux.job_state import JobState
54job_states = []
55for source_id in source_ids:
56    job_state = flow.execute(source_id)
57    job_states.append(job_state)
58
59# Wait for all executions to complete
60for job_state in job_states:
61    JobState.wait_for_completion(flow, job_state, timeout=5.0)
62print(f"Final count: {counter.get_stat('count', 0)}")
63
64flow.shutdown(wait=True)

Expected Output:

Count: 1
Count: 2
Count: 3
Count: 4
Count: 5
Final count: 5

Key Points:

  • Use locks (threading.Lock()) for shared state modifications

  • Routilux’s internal state management is thread-safe

  • Be careful with shared resources (files, databases, etc.)

  • Test concurrent workflows thoroughly

Step 5: Complete Example - Parallel Data Processing

Here’s a complete example of parallel data processing:

  1from routilux import Flow, Routine
  2import time
  3
  4class DataFetcher(Routine):
  5    def __init__(self):
  6        super().__init__()
  7        # Store configuration in _config (required for serialization)
  8        self.set_config(source_id="F0", delay=0.1)
  9        self.trigger_slot = self.define_slot("trigger", handler=self.fetch)
 10        self.output_event = self.define_event("output", ["data", "source_id", "timestamp"])
 11
 12    def fetch(self, **kwargs):
 13        delay = self.get_config("delay", 0.1)
 14        source_id = self.get_config("source_id", "F0")
 15        time.sleep(delay)
 16        timestamp = time.time()
 17        data = f"Data from source {source_id}"
 18        self.emit("output", data=data, source_id=source_id, timestamp=timestamp)
 19
 20class DataProcessor(Routine):
 21    def __init__(self):
 22        super().__init__()
 23        self.input_slot = self.define_slot("input", handler=self.process)
 24        self.output_event = self.define_event("output", ["processed", "source_id"])
 25
 26    def process(self, data=None, source_id=None, timestamp=None, **kwargs):
 27        data_value = data or kwargs.get("data", "")
 28        source = source_id or kwargs.get("source_id", "unknown")
 29
 30        # Process data
 31        processed = f"Processed: {data_value}"
 32        print(f"Processed data from source {source}")
 33
 34        self.emit("output", processed=processed, source_id=source)
 35
 36class ResultCollector(Routine):
 37    def __init__(self):
 38        super().__init__()
 39        self.input_slot = self.define_slot(
 40            "input",
 41            handler=self.collect,
 42            merge_strategy="append"
 43        )
 44        self.set_stat("collected_count", 0)
 45
 46    def collect(self, processed=None, source_id=None, **kwargs):
 47        processed_value = processed or kwargs.get("processed", "")
 48        source = source_id or kwargs.get("source_id", "unknown")
 49
 50        self.increment_stat("collected_count")
 51        print(f"Collected: {processed_value} from {source}")
 52
 53def main():
 54    flow = Flow(
 55        flow_id="parallel_processing",
 56        execution_strategy="concurrent",
 57        max_workers=5
 58    )
 59
 60    # Create multiple fetchers
 61    fetchers = []
 62    fetcher_ids = []
 63    for i in range(5):
 64        fetcher = DataFetcher()
 65        fetcher.set_config(source_id=f"F{i}", delay=0.1)
 66        fetchers.append(fetcher)
 67        fetcher_id = flow.add_routine(fetcher, f"fetcher{i}")
 68        fetcher_ids.append(fetcher_id)
 69
 70    processor = DataProcessor()
 71    collector = ResultCollector()
 72
 73    processor_id = flow.add_routine(processor, "processor")
 74    collector_id = flow.add_routine(collector, "collector")
 75
 76    # Connect: fetchers -> processor -> collector
 77    for fetcher_id in fetcher_ids:
 78        flow.connect(fetcher_id, "output", processor_id, "input")
 79
 80    flow.connect(processor_id, "output", collector_id, "input")
 81
 82    # Execute all fetchers concurrently
 83    print("Starting parallel data fetching...")
 84    start = time.time()
 85
 86    from routilux.job_state import JobState
 87    job_states = []
 88    for fetcher_id in fetcher_ids:
 89        job_state = flow.execute(fetcher_id)
 90        job_states.append(job_state)
 91
 92    # Wait for all executions to complete
 93    for job_state in job_states:
 94    JobState.wait_for_completion(flow, job_state, timeout=10.0)
 95    elapsed = time.time() - start
 96
 97    print(f"\nCompleted in {elapsed:.2f}s")
 98    print(f"Collected {collector.get_stat('collected_count', 0)} results")
 99
100    flow.shutdown(wait=True)
101
102if __name__ == "__main__":
103    main()

Expected Output:

Starting parallel data fetching...
Processed data from source F0
Processed data from source F1
Processed data from source F2
Processed data from source F3
Processed data from source F4
Collected: Processed: Data from source F0 from F0
Collected: Processed: Data from source F1 from F1
Collected: Processed: Data from source F2 from F2
Collected: Processed: Data from source F3 from F3
Collected: Processed: Data from source F4 from F4

Completed in 0.15s
Collected 5 results

Key Points:

  • Concurrent execution significantly improves performance for I/O-bound operations

  • Independent routines can run in parallel

  • Always use wait_for_completion() and shutdown()

  • Use appropriate max_workers based on your workload

Common Pitfalls

Pitfall 1: Forgetting to wait for completion

flow.execute(routine_id)
# Missing wait_for_completion()!
# Tasks may not be finished yet

Solution: Always call wait_for_completion() after executing concurrent flows.

Pitfall 2: Not shutting down thread pool

from routilux.job_state import JobState
job_state = flow.execute(routine_id)
JobState.wait_for_completion(flow, job_state)
# Missing shutdown()!
# Thread pool resources not released

Solution: Always call shutdown(wait=True) to clean up resources.

Pitfall 3: Using concurrent mode for CPU-bound operations

# CPU-bound operations don't benefit from concurrent mode
# (Python's GIL limits true parallelism)
flow = Flow(execution_strategy="concurrent")  # May not help

Solution: Use concurrent mode for I/O-bound operations, sequential for CPU-bound.

Best Practices

  1. Use concurrent mode for I/O-bound operations: Network calls, file I/O, database queries

  2. Use sequential mode for CPU-bound operations: Heavy computation, data processing

  3. Set appropriate max_workers: Match your workload (typically 3-10)

  4. Always wait for completion: Use wait_for_completion() after execution

  5. Always shutdown: Call shutdown(wait=True) to clean up resources

  6. Ensure thread safety: Use locks for shared state modifications

  7. Test thoroughly: Concurrent execution can reveal race conditions

Next Steps

Now that you understand concurrent execution, let’s move on to Advanced Patterns to learn about aggregation patterns, conditional routing, and other advanced features.