Connecting Routines¶
In this tutorial, you’ll learn about different connection patterns in Routilux, including one-to-many, many-to-one, and complex branching patterns. You’ll also understand how the event queue architecture works.
Learning Objectives¶
By the end of this tutorial, you’ll be able to:
Connect one event to multiple slots (fan-out)
Connect multiple events to one slot (fan-in)
Understand merge strategies for handling multiple inputs
Understand the event queue execution model
Build branching and converging workflows
Step 1: One-to-Many Connections (Fan-Out)¶
A single event can be connected to multiple slots. This is useful when you want to send the same data to multiple processors:
1from routilux import Flow, Routine
2
3class DataSource(Routine):
4 def __init__(self):
5 super().__init__()
6 self.trigger_slot = self.define_slot("trigger", handler=self.generate)
7 self.output_event = self.define_event("output", ["data"])
8
9 def generate(self, value=None, **kwargs):
10 value = value or kwargs.get("value", "test")
11 self.emit("output", data=value)
12
13class ProcessorA(Routine):
14 def __init__(self):
15 super().__init__()
16 self.input_slot = self.define_slot("input", handler=self.process)
17
18 def process(self, data=None, **kwargs):
19 data_value = data or kwargs.get("data", "")
20 print(f"Processor A received: {data_value}")
21
22class ProcessorB(Routine):
23 def __init__(self):
24 super().__init__()
25 self.input_slot = self.define_slot("input", handler=self.process)
26
27 def process(self, data=None, **kwargs):
28 data_value = data or kwargs.get("data", "")
29 print(f"Processor B received: {data_value}")
30
31# Create flow
32flow = Flow(flow_id="fanout_flow")
33
34source = DataSource()
35processor_a = ProcessorA()
36processor_b = ProcessorB()
37
38source_id = flow.add_routine(source, "source")
39a_id = flow.add_routine(processor_a, "processor_a")
40b_id = flow.add_routine(processor_b, "processor_b")
41
42# Connect one event to multiple slots
43flow.connect(source_id, "output", a_id, "input")
44flow.connect(source_id, "output", b_id, "input")
45
46# Execute
47job_state = flow.execute(source_id, entry_params={"value": "Hello"})
48print(f"Status: {job_state.status}")
Expected Output:
Processor A received: Hello
Processor B received: Hello
Status: completed
Key Points:
One event can connect to multiple slots
All connected slots receive the same data
Both processors execute (order may vary due to event queue)
This pattern is called “fan-out”
Step 2: Many-to-One Connections (Fan-In)¶
Multiple events can connect to the same slot. This is useful for aggregating data from multiple sources. By default, new data replaces old data, but you can use merge strategies to control how data is combined:
1from routilux import Flow, Routine
2
3class SourceA(Routine):
4 def __init__(self):
5 super().__init__()
6 self.trigger_slot = self.define_slot("trigger", handler=self.generate)
7 self.output_event = self.define_event("output", ["data", "source"])
8
9 def generate(self, **kwargs):
10 self.emit("output", data="Data from A", source="A")
11
12class SourceB(Routine):
13 def __init__(self):
14 super().__init__()
15 self.trigger_slot = self.define_slot("trigger", handler=self.generate)
16 self.output_event = self.define_event("output", ["data", "source"])
17
18 def generate(self, **kwargs):
19 self.emit("output", data="Data from B", source="B")
20
21class Aggregator(Routine):
22 def __init__(self):
23 super().__init__()
24 # Use "append" merge strategy to accumulate data
25 self.input_slot = self.define_slot(
26 "input",
27 handler=self.aggregate,
28 merge_strategy="append" # Accumulates data in lists
29 )
30
31 def aggregate(self, data=None, source=None, **kwargs):
32 # With append strategy, data and source are lists (accumulated values)
33 data_value = data or kwargs.get("data", [])
34 source_value = source or kwargs.get("source", [])
35 # Convert to string for display (if it's a list, show the last item)
36 if isinstance(data_value, list) and data_value:
37 data_str = data_value[-1]
38 else:
39 data_str = data_value
40 if isinstance(source_value, list) and source_value:
41 source_str = source_value[-1]
42 else:
43 source_str = source_value
44 print(f"Aggregator received: {data_str} from {source_str}")
45
46 # Access accumulated data
47 all_data = self.input_slot._data
48 print(f"All accumulated data: {all_data}")
49
50# Create flow
51flow = Flow(flow_id="fanin_flow")
52
53source_a = SourceA()
54source_b = SourceB()
55aggregator = Aggregator()
56
57a_id = flow.add_routine(source_a, "source_a")
58b_id = flow.add_routine(source_b, "source_b")
59agg_id = flow.add_routine(aggregator, "aggregator")
60
61# Connect multiple events to one slot
62flow.connect(a_id, "output", agg_id, "input")
63flow.connect(b_id, "output", agg_id, "input")
64
65# Execute both sources (they run independently)
66job_state_a = flow.execute(a_id)
67job_state_b = flow.execute(b_id)
68
69print(f"Status A: {job_state_a.status}, Status B: {job_state_b.status}")
Expected Output:
Aggregator received: Data from A from A
All accumulated data: {'data': ['Data from A'], 'source': ['A']}
Aggregator received: Data from B from B
All accumulated data: {'data': ['Data from A', 'Data from B'], 'source': ['A', 'B']}
Status A: completed, Status B: completed
Note: With merge_strategy="append", the handler receives lists (accumulated values).
In the first call, data and source are lists with one element each. In the second
call, they contain both values. The example above shows how to extract the latest value
for display purposes.
Important Note: In the current event queue architecture, each execute()
call creates an independent execution with its own JobState. The aggregator in
the example above receives data from both executions, but they are separate
executions. For true aggregation within a single execution, see the aggregation
pattern in Advanced Patterns.
Key Points:
Multiple events can connect to the same slot
Use
merge_strategy="append"to accumulate data in listsDefault strategy is “override” (new data replaces old)
Each
execute()call is independent - slot data is NOT shared between executions
Step 3: Understanding Merge Strategies¶
Merge strategies control how data from multiple sources is combined in a slot:
1from routilux import Flow, Routine
2
3class Source1(Routine):
4 def __init__(self):
5 super().__init__()
6 self.trigger_slot = self.define_slot("trigger", handler=self.send)
7 self.output_event = self.define_event("output", ["value"])
8
9 def send(self, **kwargs):
10 self.emit("output", value=1)
11
12class Source2(Routine):
13 def __init__(self):
14 super().__init__()
15 self.trigger_slot = self.define_slot("trigger", handler=self.send)
16 self.output_event = self.define_event("output", ["value"])
17
18 def send(self, **kwargs):
19 self.emit("output", value=2)
20
21class Receiver(Routine):
22 def __init__(self):
23 super().__init__()
24 # Store strategy in _config (required for serialization)
25 self.set_config(strategy="override")
26 self.input_slot = self.define_slot(
27 "input",
28 handler=self.receive,
29 merge_strategy=self.get_config("strategy", "override")
30 )
31
32 def receive(self, value=None, **kwargs):
33 val = value or kwargs.get("value", None)
34 all_data = self.input_slot._data
35 print(f"Received value: {val}, All data: {all_data}")
36
37# Test with "override" strategy (default)
38print("=== Override Strategy ===")
39flow1 = Flow(flow_id="override_flow")
40s1 = Source1()
41s2 = Source2()
42r1 = Receiver()
43r1.set_config(strategy="override")
44# Recreate slot with correct strategy
45r1.input_slot = r1.define_slot("input", handler=r1.receive, merge_strategy="override")
46
47s1_id = flow1.add_routine(s1, "source1")
48s2_id = flow1.add_routine(s2, "source2")
49r1_id = flow1.add_routine(r1, "receiver")
50
51flow1.connect(s1_id, "output", r1_id, "input")
52flow1.connect(s2_id, "output", r1_id, "input")
53
54flow1.execute(s1_id)
55flow1.execute(s2_id)
Expected Output:
=== Override Strategy ===
Received value: 1, All data: {'value': 1}
Received value: 2, All data: {'value': 2}
Available Merge Strategies:
“override” (default): New data replaces old data
“append”: Values are appended to lists (useful for aggregation)
Custom function: Define your own merge logic
Key Points:
“override” is the default and most common strategy
“append” is useful when you need to collect multiple values
Custom merge strategies allow complex data combination logic
Step 4: Complex Branching Patterns¶
You can create complex workflows with branching and converging paths:
1from routilux import Flow, Routine
2
3class DataSource(Routine):
4 def __init__(self):
5 super().__init__()
6 self.trigger_slot = self.define_slot("trigger", handler=self.generate)
7 self.output_event = self.define_event("output", ["data"])
8
9 def generate(self, value=None, **kwargs):
10 value = value or kwargs.get("value", "test")
11 self.emit("output", data=value)
12
13class Processor1(Routine):
14 def __init__(self):
15 super().__init__()
16 # Store name in _config (required for serialization)
17 self.set_config(name="UPPER")
18 self.input_slot = self.define_slot("input", handler=self.process)
19 self.output_event = self.define_event("output", ["result"])
20
21 def process(self, data=None, **kwargs):
22 data_value = data or kwargs.get("data", "")
23 name = self.get_config("name", "P1")
24 result = f"{name}: {data_value.upper()}"
25 self.emit("output", result=result)
26
27class Processor2(Routine):
28 def __init__(self):
29 super().__init__()
30 # Store name in _config (required for serialization)
31 self.set_config(name="lower")
32 self.input_slot = self.define_slot("input", handler=self.process)
33 self.output_event = self.define_event("output", ["result"])
34
35 def process(self, data=None, **kwargs):
36 data_value = data or kwargs.get("data", "")
37 name = self.get_config("name", "P2")
38 result = f"{name}: {data_value.lower()}"
39 self.emit("output", result=result)
40
41class Aggregator(Routine):
42 def __init__(self):
43 super().__init__()
44 self.input_slot = self.define_slot(
45 "input",
46 handler=self.aggregate,
47 merge_strategy="append"
48 )
49
50 def aggregate(self, result=None, **kwargs):
51 # With append strategy, result is a list
52 result_value = result or kwargs.get("result", [])
53 # Extract the latest result for display
54 if isinstance(result_value, list):
55 for r in result_value:
56 print(f"Aggregated: {r}")
57 else:
58 print(f"Aggregated: {result_value}")
59
60# Create flow with branching pattern
61flow = Flow(flow_id="branching_flow")
62
63source = DataSource()
64proc1 = Processor1() # Name stored in _config
65proc2 = Processor2() # Name stored in _config
66aggregator = Aggregator()
67
68source_id = flow.add_routine(source, "source")
69p1_id = flow.add_routine(proc1, "processor1")
70p2_id = flow.add_routine(proc2, "processor2")
71agg_id = flow.add_routine(aggregator, "aggregator")
72
73# Branch: source -> processor1 and processor2
74flow.connect(source_id, "output", p1_id, "input")
75flow.connect(source_id, "output", p2_id, "input")
76
77# Converge: processor1 and processor2 -> aggregator
78flow.connect(p1_id, "output", agg_id, "input")
79flow.connect(p2_id, "output", agg_id, "input")
80
81# Execute
82job_state = flow.execute(source_id, entry_params={"value": "Hello"})
83print(f"Status: {job_state.status}")
Expected Output:
Aggregated: UPPER: HELLO
Aggregated: lower: hello
Status: completed
Key Points:
You can create complex branching and converging patterns
Multiple processors can run in parallel (if using concurrent execution)
Aggregators can collect results from multiple sources
The event queue ensures all tasks are processed
Step 5: Understanding the Event Queue Architecture¶
Routilux uses an event queue pattern for execution. Understanding this helps you write better workflows:
1from routilux import Flow, Routine
2import time
3
4class SlowProcessor(Routine):
5 def __init__(self):
6 super().__init__()
7 # Store configuration in _config (required for serialization)
8 self.set_config(name="Slow", delay=0.1)
9 self.input_slot = self.define_slot("input", handler=self.process)
10 self.output_event = self.define_event("output", ["result"])
11
12 def process(self, data=None, **kwargs):
13 data_value = data or kwargs.get("data", "")
14 delay = self.get_config("delay", 0.1)
15 name = self.get_config("name", "Slow")
16 time.sleep(delay) # Simulate slow processing
17 result = f"{name} processed: {data_value}"
18 print(f"[{time.time():.2f}] {result}")
19 self.emit("output", result=result)
20
21class FastProcessor(Routine):
22 def __init__(self):
23 super().__init__()
24 # Store configuration in _config (required for serialization)
25 self.set_config(name="Fast")
26 self.input_slot = self.define_slot("input", handler=self.process)
27
28 def process(self, result=None, **kwargs):
29 result_value = result or kwargs.get("result", "")
30 name = self.get_config("name", "Fast")
31 print(f"[{time.time():.2f}] {name} received: {result_value}")
32
33# Create flow
34flow = Flow(flow_id="queue_demo")
35
36slow = SlowProcessor()
37slow.set_config(name="Slow", delay=0.2)
38fast = FastProcessor()
39fast.set_config(name="Fast")
40
41slow_id = flow.add_routine(slow, "slow")
42fast_id = flow.add_routine(fast, "fast")
43
44flow.connect(slow_id, "output", fast_id, "input")
45
46# Execute
47print("Starting execution...")
48job_state = flow.execute(slow_id, entry_params={"data": "test"})
49print(f"Status: {job_state.status}")
Expected Output (timing may vary):
Starting execution...
[1234567890.12] Slow processed: test
[1234567890.32] Fast received: Slow processed: test
Status: completed
Key Points:
emit()is non-blocking - it returns immediately after enqueuing tasksTasks are processed in queue order (fair scheduling)
The event queue ensures all tasks complete before execution finishes
This architecture supports both sequential and concurrent execution modes
Common Pitfalls¶
Pitfall 1: Expecting data to be shared between execute() calls
# Wrong: Each execute() creates a new JobState
flow.execute(source1_id) # Creates JobState 1
flow.execute(source2_id) # Creates JobState 2
# Aggregator won't see both messages in the same execution!
Solution: Use a single execute() with multiple emit() calls from
the same routine, or use the aggregation pattern (see Advanced Patterns).
Pitfall 2: Using wrong merge strategy
# Wrong: Using "override" when you need to accumulate
self.input_slot = self.define_slot("input", handler=self.aggregate)
# Later data will overwrite earlier data!
Solution: Use merge_strategy="append" when you need to collect multiple
values.
Pitfall 3: Not understanding event queue order
# Don't assume depth-first execution order
# Tasks are processed in queue order, not call-stack order
self.emit("event1", data="A")
self.emit("event2", data="B")
# Order may vary depending on queue processing
Solution: Don’t rely on execution order unless using sequential mode with single worker. Use explicit synchronization if order matters.
Best Practices¶
Use descriptive connection patterns: Make your workflow structure clear
Choose appropriate merge strategies: “override” for replacement, “append” for accumulation
Understand event queue behavior: Tasks are processed fairly, not depth-first
Test with different execution modes: Sequential vs concurrent may behave differently
Use aggregation patterns for collecting data: See Advanced Patterns for proper patterns
Next Steps¶
Now that you understand connections, let’s move on to Data Flow and Parameters to learn about parameter mapping, data extraction, and how data flows through your workflows.