Connecting Routines

In this tutorial, you’ll learn about different connection patterns in Routilux, including one-to-many, many-to-one, and complex branching patterns. You’ll also understand how the event queue architecture works.

Learning Objectives

By the end of this tutorial, you’ll be able to:

  • Connect one event to multiple slots (fan-out)

  • Connect multiple events to one slot (fan-in)

  • Understand merge strategies for handling multiple inputs

  • Understand the event queue execution model

  • Build branching and converging workflows

Step 1: One-to-Many Connections (Fan-Out)

A single event can be connected to multiple slots. This is useful when you want to send the same data to multiple processors:

 1from routilux import Flow, Routine
 2
 3class DataSource(Routine):
 4    def __init__(self):
 5        super().__init__()
 6        self.trigger_slot = self.define_slot("trigger", handler=self.generate)
 7        self.output_event = self.define_event("output", ["data"])
 8
 9    def generate(self, value=None, **kwargs):
10        value = value or kwargs.get("value", "test")
11        self.emit("output", data=value)
12
13class ProcessorA(Routine):
14    def __init__(self):
15        super().__init__()
16        self.input_slot = self.define_slot("input", handler=self.process)
17
18    def process(self, data=None, **kwargs):
19        data_value = data or kwargs.get("data", "")
20        print(f"Processor A received: {data_value}")
21
22class ProcessorB(Routine):
23    def __init__(self):
24        super().__init__()
25        self.input_slot = self.define_slot("input", handler=self.process)
26
27    def process(self, data=None, **kwargs):
28        data_value = data or kwargs.get("data", "")
29        print(f"Processor B received: {data_value}")
30
31# Create flow
32flow = Flow(flow_id="fanout_flow")
33
34source = DataSource()
35processor_a = ProcessorA()
36processor_b = ProcessorB()
37
38source_id = flow.add_routine(source, "source")
39a_id = flow.add_routine(processor_a, "processor_a")
40b_id = flow.add_routine(processor_b, "processor_b")
41
42# Connect one event to multiple slots
43flow.connect(source_id, "output", a_id, "input")
44flow.connect(source_id, "output", b_id, "input")
45
46# Execute
47job_state = flow.execute(source_id, entry_params={"value": "Hello"})
48print(f"Status: {job_state.status}")

Expected Output:

Processor A received: Hello
Processor B received: Hello
Status: completed

Key Points:

  • One event can connect to multiple slots

  • All connected slots receive the same data

  • Both processors execute (order may vary due to event queue)

  • This pattern is called “fan-out”

Step 2: Many-to-One Connections (Fan-In)

Multiple events can connect to the same slot. This is useful for aggregating data from multiple sources. By default, new data replaces old data, but you can use merge strategies to control how data is combined:

 1from routilux import Flow, Routine
 2
 3class SourceA(Routine):
 4    def __init__(self):
 5        super().__init__()
 6        self.trigger_slot = self.define_slot("trigger", handler=self.generate)
 7        self.output_event = self.define_event("output", ["data", "source"])
 8
 9    def generate(self, **kwargs):
10        self.emit("output", data="Data from A", source="A")
11
12class SourceB(Routine):
13    def __init__(self):
14        super().__init__()
15        self.trigger_slot = self.define_slot("trigger", handler=self.generate)
16        self.output_event = self.define_event("output", ["data", "source"])
17
18    def generate(self, **kwargs):
19        self.emit("output", data="Data from B", source="B")
20
21class Aggregator(Routine):
22    def __init__(self):
23        super().__init__()
24        # Use "append" merge strategy to accumulate data
25        self.input_slot = self.define_slot(
26            "input",
27            handler=self.aggregate,
28            merge_strategy="append"  # Accumulates data in lists
29        )
30
31    def aggregate(self, data=None, source=None, **kwargs):
32        # With append strategy, data and source are lists (accumulated values)
33        data_value = data or kwargs.get("data", [])
34        source_value = source or kwargs.get("source", [])
35        # Convert to string for display (if it's a list, show the last item)
36        if isinstance(data_value, list) and data_value:
37            data_str = data_value[-1]
38        else:
39            data_str = data_value
40        if isinstance(source_value, list) and source_value:
41            source_str = source_value[-1]
42        else:
43            source_str = source_value
44        print(f"Aggregator received: {data_str} from {source_str}")
45
46        # Access accumulated data
47        all_data = self.input_slot._data
48        print(f"All accumulated data: {all_data}")
49
50# Create flow
51flow = Flow(flow_id="fanin_flow")
52
53source_a = SourceA()
54source_b = SourceB()
55aggregator = Aggregator()
56
57a_id = flow.add_routine(source_a, "source_a")
58b_id = flow.add_routine(source_b, "source_b")
59agg_id = flow.add_routine(aggregator, "aggregator")
60
61# Connect multiple events to one slot
62flow.connect(a_id, "output", agg_id, "input")
63flow.connect(b_id, "output", agg_id, "input")
64
65# Execute both sources (they run independently)
66job_state_a = flow.execute(a_id)
67job_state_b = flow.execute(b_id)
68
69print(f"Status A: {job_state_a.status}, Status B: {job_state_b.status}")

Expected Output:

Aggregator received: Data from A from A
All accumulated data: {'data': ['Data from A'], 'source': ['A']}
Aggregator received: Data from B from B
All accumulated data: {'data': ['Data from A', 'Data from B'], 'source': ['A', 'B']}
Status A: completed, Status B: completed

Note: With merge_strategy="append", the handler receives lists (accumulated values). In the first call, data and source are lists with one element each. In the second call, they contain both values. The example above shows how to extract the latest value for display purposes.

Important Note: In the current event queue architecture, each execute() call creates an independent execution with its own JobState. The aggregator in the example above receives data from both executions, but they are separate executions. For true aggregation within a single execution, see the aggregation pattern in Advanced Patterns.

Key Points:

  • Multiple events can connect to the same slot

  • Use merge_strategy="append" to accumulate data in lists

  • Default strategy is “override” (new data replaces old)

  • Each execute() call is independent - slot data is NOT shared between executions

Step 3: Understanding Merge Strategies

Merge strategies control how data from multiple sources is combined in a slot:

 1from routilux import Flow, Routine
 2
 3class Source1(Routine):
 4    def __init__(self):
 5        super().__init__()
 6        self.trigger_slot = self.define_slot("trigger", handler=self.send)
 7        self.output_event = self.define_event("output", ["value"])
 8
 9    def send(self, **kwargs):
10        self.emit("output", value=1)
11
12class Source2(Routine):
13    def __init__(self):
14        super().__init__()
15        self.trigger_slot = self.define_slot("trigger", handler=self.send)
16        self.output_event = self.define_event("output", ["value"])
17
18    def send(self, **kwargs):
19        self.emit("output", value=2)
20
21class Receiver(Routine):
22    def __init__(self):
23        super().__init__()
24        # Store strategy in _config (required for serialization)
25        self.set_config(strategy="override")
26        self.input_slot = self.define_slot(
27            "input",
28            handler=self.receive,
29            merge_strategy=self.get_config("strategy", "override")
30        )
31
32    def receive(self, value=None, **kwargs):
33        val = value or kwargs.get("value", None)
34        all_data = self.input_slot._data
35        print(f"Received value: {val}, All data: {all_data}")
36
37# Test with "override" strategy (default)
38print("=== Override Strategy ===")
39flow1 = Flow(flow_id="override_flow")
40s1 = Source1()
41s2 = Source2()
42r1 = Receiver()
43r1.set_config(strategy="override")
44# Recreate slot with correct strategy
45r1.input_slot = r1.define_slot("input", handler=r1.receive, merge_strategy="override")
46
47s1_id = flow1.add_routine(s1, "source1")
48s2_id = flow1.add_routine(s2, "source2")
49r1_id = flow1.add_routine(r1, "receiver")
50
51flow1.connect(s1_id, "output", r1_id, "input")
52flow1.connect(s2_id, "output", r1_id, "input")
53
54flow1.execute(s1_id)
55flow1.execute(s2_id)

Expected Output:

=== Override Strategy ===
Received value: 1, All data: {'value': 1}
Received value: 2, All data: {'value': 2}

Available Merge Strategies:

  1. “override” (default): New data replaces old data

  2. “append”: Values are appended to lists (useful for aggregation)

  3. Custom function: Define your own merge logic

Key Points:

  • “override” is the default and most common strategy

  • “append” is useful when you need to collect multiple values

  • Custom merge strategies allow complex data combination logic

Step 4: Complex Branching Patterns

You can create complex workflows with branching and converging paths:

 1from routilux import Flow, Routine
 2
 3class DataSource(Routine):
 4    def __init__(self):
 5        super().__init__()
 6        self.trigger_slot = self.define_slot("trigger", handler=self.generate)
 7        self.output_event = self.define_event("output", ["data"])
 8
 9    def generate(self, value=None, **kwargs):
10        value = value or kwargs.get("value", "test")
11        self.emit("output", data=value)
12
13class Processor1(Routine):
14    def __init__(self):
15        super().__init__()
16        # Store name in _config (required for serialization)
17        self.set_config(name="UPPER")
18        self.input_slot = self.define_slot("input", handler=self.process)
19        self.output_event = self.define_event("output", ["result"])
20
21    def process(self, data=None, **kwargs):
22        data_value = data or kwargs.get("data", "")
23        name = self.get_config("name", "P1")
24        result = f"{name}: {data_value.upper()}"
25        self.emit("output", result=result)
26
27class Processor2(Routine):
28    def __init__(self):
29        super().__init__()
30        # Store name in _config (required for serialization)
31        self.set_config(name="lower")
32        self.input_slot = self.define_slot("input", handler=self.process)
33        self.output_event = self.define_event("output", ["result"])
34
35    def process(self, data=None, **kwargs):
36        data_value = data or kwargs.get("data", "")
37        name = self.get_config("name", "P2")
38        result = f"{name}: {data_value.lower()}"
39        self.emit("output", result=result)
40
41class Aggregator(Routine):
42    def __init__(self):
43        super().__init__()
44        self.input_slot = self.define_slot(
45            "input",
46            handler=self.aggregate,
47            merge_strategy="append"
48        )
49
50    def aggregate(self, result=None, **kwargs):
51        # With append strategy, result is a list
52        result_value = result or kwargs.get("result", [])
53        # Extract the latest result for display
54        if isinstance(result_value, list):
55            for r in result_value:
56                print(f"Aggregated: {r}")
57        else:
58            print(f"Aggregated: {result_value}")
59
60# Create flow with branching pattern
61flow = Flow(flow_id="branching_flow")
62
63source = DataSource()
64proc1 = Processor1()  # Name stored in _config
65proc2 = Processor2()  # Name stored in _config
66aggregator = Aggregator()
67
68source_id = flow.add_routine(source, "source")
69p1_id = flow.add_routine(proc1, "processor1")
70p2_id = flow.add_routine(proc2, "processor2")
71agg_id = flow.add_routine(aggregator, "aggregator")
72
73# Branch: source -> processor1 and processor2
74flow.connect(source_id, "output", p1_id, "input")
75flow.connect(source_id, "output", p2_id, "input")
76
77# Converge: processor1 and processor2 -> aggregator
78flow.connect(p1_id, "output", agg_id, "input")
79flow.connect(p2_id, "output", agg_id, "input")
80
81# Execute
82job_state = flow.execute(source_id, entry_params={"value": "Hello"})
83print(f"Status: {job_state.status}")

Expected Output:

Aggregated: UPPER: HELLO
Aggregated: lower: hello
Status: completed

Key Points:

  • You can create complex branching and converging patterns

  • Multiple processors can run in parallel (if using concurrent execution)

  • Aggregators can collect results from multiple sources

  • The event queue ensures all tasks are processed

Step 5: Understanding the Event Queue Architecture

Routilux uses an event queue pattern for execution. Understanding this helps you write better workflows:

 1from routilux import Flow, Routine
 2import time
 3
 4class SlowProcessor(Routine):
 5    def __init__(self):
 6        super().__init__()
 7        # Store configuration in _config (required for serialization)
 8        self.set_config(name="Slow", delay=0.1)
 9        self.input_slot = self.define_slot("input", handler=self.process)
10        self.output_event = self.define_event("output", ["result"])
11
12    def process(self, data=None, **kwargs):
13        data_value = data or kwargs.get("data", "")
14        delay = self.get_config("delay", 0.1)
15        name = self.get_config("name", "Slow")
16        time.sleep(delay)  # Simulate slow processing
17        result = f"{name} processed: {data_value}"
18        print(f"[{time.time():.2f}] {result}")
19        self.emit("output", result=result)
20
21class FastProcessor(Routine):
22    def __init__(self):
23        super().__init__()
24        # Store configuration in _config (required for serialization)
25        self.set_config(name="Fast")
26        self.input_slot = self.define_slot("input", handler=self.process)
27
28    def process(self, result=None, **kwargs):
29        result_value = result or kwargs.get("result", "")
30        name = self.get_config("name", "Fast")
31        print(f"[{time.time():.2f}] {name} received: {result_value}")
32
33# Create flow
34flow = Flow(flow_id="queue_demo")
35
36slow = SlowProcessor()
37slow.set_config(name="Slow", delay=0.2)
38fast = FastProcessor()
39fast.set_config(name="Fast")
40
41slow_id = flow.add_routine(slow, "slow")
42fast_id = flow.add_routine(fast, "fast")
43
44flow.connect(slow_id, "output", fast_id, "input")
45
46# Execute
47print("Starting execution...")
48job_state = flow.execute(slow_id, entry_params={"data": "test"})
49print(f"Status: {job_state.status}")

Expected Output (timing may vary):

Starting execution...
[1234567890.12] Slow processed: test
[1234567890.32] Fast received: Slow processed: test
Status: completed

Key Points:

  • emit() is non-blocking - it returns immediately after enqueuing tasks

  • Tasks are processed in queue order (fair scheduling)

  • The event queue ensures all tasks complete before execution finishes

  • This architecture supports both sequential and concurrent execution modes

Common Pitfalls

Pitfall 1: Expecting data to be shared between execute() calls

# Wrong: Each execute() creates a new JobState
flow.execute(source1_id)  # Creates JobState 1
flow.execute(source2_id)  # Creates JobState 2
# Aggregator won't see both messages in the same execution!

Solution: Use a single execute() with multiple emit() calls from the same routine, or use the aggregation pattern (see Advanced Patterns).

Pitfall 2: Using wrong merge strategy

# Wrong: Using "override" when you need to accumulate
self.input_slot = self.define_slot("input", handler=self.aggregate)
# Later data will overwrite earlier data!

Solution: Use merge_strategy="append" when you need to collect multiple values.

Pitfall 3: Not understanding event queue order

# Don't assume depth-first execution order
# Tasks are processed in queue order, not call-stack order
self.emit("event1", data="A")
self.emit("event2", data="B")
# Order may vary depending on queue processing

Solution: Don’t rely on execution order unless using sequential mode with single worker. Use explicit synchronization if order matters.

Best Practices

  1. Use descriptive connection patterns: Make your workflow structure clear

  2. Choose appropriate merge strategies: “override” for replacement, “append” for accumulation

  3. Understand event queue behavior: Tasks are processed fairly, not depth-first

  4. Test with different execution modes: Sequential vs concurrent may behave differently

  5. Use aggregation patterns for collecting data: See Advanced Patterns for proper patterns

Next Steps

Now that you understand connections, let’s move on to Data Flow and Parameters to learn about parameter mapping, data extraction, and how data flows through your workflows.