Advanced Patterns

In this tutorial, you’ll learn advanced patterns for building complex workflows, including aggregation patterns, conditional routing, and best practices for real-world applications.

Learning Objectives

By the end of this tutorial, you’ll be able to:

  • Implement proper aggregation patterns

  • Use conditional routing for dynamic workflows

  • Build complex branching and converging patterns

  • Handle multiple data sources correctly

  • Design scalable workflow architectures

Step 1: Aggregation Pattern - Collecting from Multiple Sources

A common pattern is collecting data from multiple sources before processing. Important: Each execute() call creates an independent execution. To collect data from multiple sources in a single execution, emit multiple times from the same routine:

 1from routilux import Flow, Routine
 2
 3class MultiSourceEmitter(Routine):
 4    """Emit data from multiple sources in a single execution"""
 5
 6    def __init__(self):
 7        super().__init__()
 8        self.trigger_slot = self.define_slot("trigger", handler=self.emit_all)
 9        self.output_event = self.define_event("output", ["data", "source"])
10
11    def emit_all(self, **kwargs):
12        # Emit multiple times in the same execution
13        sources = ["SourceA", "SourceB", "SourceC"]
14        for source in sources:
15            self.emit("output", data=f"Data from {source}", source=source)
16
17class Aggregator(Routine):
18    """Collect and process aggregated data"""
19
20    def __init__(self):
21        super().__init__()
22        # Store expected_count in _config (required for serialization)
23        self.set_config(expected_count=3)
24        self.input_slot = self.define_slot(
25            "input",
26            handler=self.aggregate,
27            merge_strategy="append"  # Accumulate data
28        )
29        self.output_event = self.define_event("output", ["results"])
30        self.set_stat("message_count", 0)
31
32    def aggregate(self, data=None, source=None, **kwargs):
33        # Count messages
34        count = self.get_stat("message_count", 0) + 1
35        self.set_stat("message_count", count)
36
37        # With append strategy, data and source are lists
38        data_value = data or kwargs.get("data", [])
39        source_value = source or kwargs.get("source", [])
40
41        # Extract latest values for display
42        if isinstance(data_value, list) and data_value:
43            data_str = data_value[-1]
44        else:
45            data_str = data_value
46        if isinstance(source_value, list) and source_value:
47            source_str = source_value[-1]
48        else:
49            source_str = source_value
50
51        print(f"Received message {count}: {data_str} from {source_str}")
52
53        # Check if we have enough data
54        expected_count = self.get_config("expected_count", 3)
55        if count >= expected_count:
56            # Process all accumulated data
57            all_data = self.input_slot._data
58            print(f"All data collected: {all_data}")
59            self.emit("output", results=all_data)
60
61            # Reset for next batch
62            self.input_slot._data = {}
63            self.set_stat("message_count", 0)
64
65flow = Flow(flow_id="aggregation_flow")
66
67emitter = MultiSourceEmitter()
68aggregator = Aggregator()  # expected_count stored in _config
69
70emitter_id = flow.add_routine(emitter, "emitter")
71agg_id = flow.add_routine(aggregator, "aggregator")
72
73flow.connect(emitter_id, "output", agg_id, "input")
74
75job_state = flow.execute(emitter_id)
76print(f"Status: {job_state.status}")

Expected Output:

Received message 1: Data from SourceA from SourceA
Received message 2: Data from SourceB from SourceB
Received message 3: Data from SourceC from SourceC
All data collected: {'data': ['Data from SourceA', 'Data from SourceB', 'Data from SourceC'], 'source': ['SourceA', 'SourceB', 'SourceC']}
Status: completed

Note: With merge_strategy="append", the handler receives lists (accumulated values). The example extracts the latest value from each list for display purposes.

Key Points:

  • Emit multiple times from the same routine to collect in one execution

  • Use merge_strategy="append" to accumulate data

  • Track message count to know when aggregation is complete

  • Reset state after processing for next batch

Important: Don’t use multiple execute() calls for aggregation - each creates an independent execution with separate JobState.

Step 2: Conditional Routing

Use conditional routing to dynamically route data based on conditions. Routilux provides ConditionalRouter built-in routine:

 1from routilux import Flow, Routine
 2from routilux.builtin_routines import ConditionalRouter
 3
 4class DataSource(Routine):
 5    def __init__(self):
 6        super().__init__()
 7        self.trigger_slot = self.define_slot("trigger", handler=self.send)
 8        self.output_event = self.define_event("output", ["data"])
 9
10    def send(self, value=None, **kwargs):
11        value = value or kwargs.get("value", 0)
12        self.emit("output", data={"value": value, "priority": "high" if value > 10 else "low"})
13
14class HighPriorityHandler(Routine):
15    def __init__(self):
16        super().__init__()
17        self.input_slot = self.define_slot("input", handler=self.handle)
18
19    def handle(self, data=None, **kwargs):
20        data_value = data or kwargs.get("data", {})
21        print(f"High priority handler: {data_value}")
22
23class LowPriorityHandler(Routine):
24    def __init__(self):
25        super().__init__()
26        self.input_slot = self.define_slot("input", handler=self.handle)
27
28    def handle(self, data=None, **kwargs):
29        data_value = data or kwargs.get("data", {})
30        print(f"Low priority handler: {data_value}")
31
32flow = Flow(flow_id="routing_flow")
33
34source = DataSource()
35router = ConditionalRouter()
36high_handler = HighPriorityHandler()
37low_handler = LowPriorityHandler()
38
39source_id = flow.add_routine(source, "source")
40router_id = flow.add_routine(router, "router")
41high_id = flow.add_routine(high_handler, "high_handler")
42low_id = flow.add_routine(low_handler, "low_handler")
43
44# Configure router
45router.set_config(
46    routes=[
47        ("high", "data.get('value', 0) > 10"),
48        ("low", "data.get('value', 0) <= 10"),
49    ],
50    default_route="low"
51)
52
53# Define router events
54router.define_event("high")
55router.define_event("low")
56
57# Connect: source -> router -> handlers
58flow.connect(source_id, "output", router_id, "input")
59flow.connect(router_id, "high", high_id, "input")
60flow.connect(router_id, "low", low_id, "input")
61
62# Test with high value
63print("=== High value (15) ===")
64job_state1 = flow.execute(source_id, entry_params={"value": 15})
65print(f"Status: {job_state1.status}")
66
67# Test with low value
68print("=== Low value (5) ===")
69job_state2 = flow.execute(source_id, entry_params={"value": 5})
70print(f"Status: {job_state2.status}")

Expected Output:

=== High value (15) ===
High priority handler: {'value': 15, 'priority': 'high'}
=== Low value (5) ===
Low priority handler: {'value': 5, 'priority': 'low'}

Key Points:

  • Use ConditionalRouter for dynamic routing

  • Routes are evaluated in order (first match wins)

  • Use Python expressions for conditions

  • Define events for each route

Step 3: Fan-Out and Fan-In Pattern

Combine fan-out (one-to-many) and fan-in (many-to-one) patterns:

 1from routilux import Flow, Routine
 2
 3class DataSource(Routine):
 4    def __init__(self):
 5        super().__init__()
 6        self.trigger_slot = self.define_slot("trigger", handler=self.send)
 7        self.output_event = self.define_event("output", ["data"])
 8
 9    def send(self, **kwargs):
10        self.emit("output", data="test_data")
11
12class Processor1(Routine):
13    def __init__(self):
14        super().__init__()
15        self.input_slot = self.define_slot("input", handler=self.process)
16        self.output_event = self.define_event("output", ["result", "processor"])
17
18    def process(self, data=None, **kwargs):
19        data_value = data or kwargs.get("data", "")
20        result = f"P1: {data_value.upper()}"
21        self.emit("output", result=result, processor="P1")
22
23class Processor2(Routine):
24    def __init__(self):
25        super().__init__()
26        self.input_slot = self.define_slot("input", handler=self.process)
27        self.output_event = self.define_event("output", ["result", "processor"])
28
29    def process(self, data=None, **kwargs):
30        data_value = data or kwargs.get("data", "")
31        result = f"P2: {data_value.lower()}"
32        self.emit("output", result=result, processor="P2")
33
34class Aggregator(Routine):
35    def __init__(self):
36        super().__init__()
37        self.input_slot = self.define_slot(
38            "input",
39            handler=self.aggregate,
40            merge_strategy="append"
41        )
42
43    def aggregate(self, result=None, processor=None, **kwargs):
44        result_value = result or kwargs.get("result", "")
45        proc = processor or kwargs.get("processor", "")
46        print(f"Aggregated from {proc}: {result_value}")
47
48flow = Flow(flow_id="fanout_fanin_flow")
49
50source = DataSource()
51proc1 = Processor1()
52proc2 = Processor2()
53aggregator = Aggregator()
54
55source_id = flow.add_routine(source, "source")
56p1_id = flow.add_routine(proc1, "processor1")
57p2_id = flow.add_routine(proc2, "processor2")
58agg_id = flow.add_routine(aggregator, "aggregator")
59
60# Fan-out: source -> processor1 and processor2
61flow.connect(source_id, "output", p1_id, "input")
62flow.connect(source_id, "output", p2_id, "input")
63
64# Fan-in: processor1 and processor2 -> aggregator
65flow.connect(p1_id, "output", agg_id, "input")
66flow.connect(p2_id, "output", agg_id, "input")
67
68job_state = flow.execute(source_id)
69print(f"Status: {job_state.status}")

Expected Output:

Aggregated from P1: P1: TEST_DATA
Aggregated from P2: P2: test_data
Status: completed

Key Points:

  • Fan-out sends data to multiple processors

  • Fan-in collects results from multiple sources

  • Use merge_strategy=”append” for fan-in

  • Processors can run in parallel (with concurrent execution)

Step 4: Pipeline with Error Recovery

Build resilient pipelines with error recovery:

 1from routilux import Flow, Routine, ErrorHandler, ErrorStrategy
 2
 3class DataSource(Routine):
 4    def __init__(self):
 5        super().__init__()
 6        self.trigger_slot = self.define_slot("trigger", handler=self.send)
 7        self.output_event = self.define_event("output", ["data"])
 8
 9    def send(self, **kwargs):
10        self.emit("output", data="test_data")
11
12class UnreliableValidator(Routine):
13    def __init__(self):
14        super().__init__()
15        self.input_slot = self.define_slot("input", handler=self.validate)
16        self.output_event = self.define_event("output", ["data", "valid"])
17        self.call_count = 0
18
19    def validate(self, data=None, **kwargs):
20        self.call_count += 1
21        data_value = data or kwargs.get("data", "")
22
23        if self.call_count < 2:
24            raise ValueError("Validation failed")
25
26        self.emit("output", data=data_value, valid=True)
27
28class Processor(Routine):
29    def __init__(self):
30        super().__init__()
31        self.input_slot = self.define_slot("input", handler=self.process)
32        self.output_event = self.define_event("output", ["result"])
33
34    def process(self, data=None, valid=None, **kwargs):
35        data_value = data or kwargs.get("data", "")
36        is_valid = valid if valid is not None else kwargs.get("valid", False)
37
38        if is_valid:
39            result = f"Processed: {data_value}"
40            self.emit("output", result=result)
41        else:
42            raise ValueError("Cannot process invalid data")
43
44class Sink(Routine):
45    def __init__(self):
46        super().__init__()
47        self.input_slot = self.define_slot("input", handler=self.receive)
48
49    def receive(self, result=None, **kwargs):
50        result_value = result or kwargs.get("result", "")
51        print(f"Final result: {result_value}")
52
53flow = Flow(flow_id="resilient_pipeline")
54
55source = DataSource()
56validator = UnreliableValidator()
57processor = Processor()
58sink = Sink()
59
60source_id = flow.add_routine(source, "source")
61validator_id = flow.add_routine(validator, "validator")
62processor_id = flow.add_routine(processor, "processor")
63sink_id = flow.add_routine(sink, "sink")
64
65flow.connect(source_id, "output", validator_id, "input")
66flow.connect(validator_id, "output", processor_id, "input")
67flow.connect(processor_id, "output", sink_id, "input")
68
69# Set error handlers
70validator.set_as_critical(max_retries=3, retry_delay=0.1)
71processor.set_error_handler(ErrorHandler(strategy=ErrorStrategy.CONTINUE))
72
73job_state = flow.execute(source_id)
74print(f"Status: {job_state.status}")
75print(f"Validator attempts: {validator.call_count}")

Expected Output:

Final result: Processed: test_data
Status: completed
Validator attempts: 2

Key Points:

  • Combine error handling with pipeline patterns

  • Use retry for transient failures

  • Use continue for non-critical steps

  • Build resilient workflows that recover from errors

Step 5: Complete Example - Data Processing Pipeline

Here’s a complete example combining multiple advanced patterns:

  1from routilux import Flow, Routine, ErrorHandler, ErrorStrategy
  2from routilux.builtin_routines import ConditionalRouter
  3
  4class DataSource(Routine):
  5    def __init__(self):
  6        super().__init__()
  7        self.trigger_slot = self.define_slot("trigger", handler=self.generate)
  8        self.output_event = self.define_event("output", ["data"])
  9
 10    def generate(self, count=5, **kwargs):
 11        count = count or kwargs.get("count", 5)
 12        for i in range(count):
 13            self.emit("output", data={"id": i, "value": i * 10})
 14
 15class Router(Routine):
 16    def __init__(self):
 17        super().__init__()
 18        self.input_slot = self.define_slot("input", handler=self.route)
 19        self.high_event = self.define_event("high", ["data"])
 20        self.low_event = self.define_event("low", ["data"])
 21
 22    def route(self, data=None, **kwargs):
 23        data_value = data or kwargs.get("data", {})
 24        value = data_value.get("value", 0)
 25
 26        if value >= 30:
 27            self.emit("high", data=data_value)
 28        else:
 29            self.emit("low", data=data_value)
 30
 31class HighValueProcessor(Routine):
 32    def __init__(self):
 33        super().__init__()
 34        self.input_slot = self.define_slot("input", handler=self.process)
 35        self.output_event = self.define_event("output", ["result"])
 36
 37    def process(self, data=None, **kwargs):
 38        data_value = data or kwargs.get("data", {})
 39        result = f"HIGH: {data_value['value']}"
 40        self.emit("output", result=result)
 41
 42class LowValueProcessor(Routine):
 43    def __init__(self):
 44        super().__init__()
 45        self.input_slot = self.define_slot("input", handler=self.process)
 46        self.output_event = self.define_event("output", ["result"])
 47
 48    def process(self, data=None, **kwargs):
 49        data_value = data or kwargs.get("data", {})
 50        result = f"LOW: {data_value['value']}"
 51        self.emit("output", result=result)
 52
 53class Aggregator(Routine):
 54    def __init__(self):
 55        super().__init__()
 56        self.input_slot = self.define_slot(
 57            "input",
 58            handler=self.aggregate,
 59            merge_strategy="append"
 60        )
 61        self.set_stat("count", 0)
 62
 63    def aggregate(self, result=None, **kwargs):
 64        result_value = result or kwargs.get("result", "")
 65        count = self.get_stat("count", 0) + 1
 66        self.set_stat("count", count)
 67        print(f"[{count}] {result_value}")
 68
 69def main():
 70    flow = Flow(flow_id="advanced_pipeline")
 71
 72    source = DataSource()
 73    router = Router()
 74    high_processor = HighValueProcessor()
 75    low_processor = LowValueProcessor()
 76    aggregator = Aggregator()
 77
 78    source_id = flow.add_routine(source, "source")
 79    router_id = flow.add_routine(router, "router")
 80    high_id = flow.add_routine(high_processor, "high_processor")
 81    low_id = flow.add_routine(low_processor, "low_processor")
 82    agg_id = flow.add_routine(aggregator, "aggregator")
 83
 84    # Connect pipeline
 85    flow.connect(source_id, "output", router_id, "input")
 86    flow.connect(router_id, "high", high_id, "input")
 87    flow.connect(router_id, "low", low_id, "input")
 88    flow.connect(high_id, "output", agg_id, "input")
 89    flow.connect(low_id, "output", agg_id, "input")
 90
 91    # Set error handlers
 92    flow.set_error_handler(ErrorHandler(strategy=ErrorStrategy.CONTINUE))
 93
 94    job_state = flow.execute(source_id, entry_params={"count": 5})
 95
 96    print(f"\nStatus: {job_state.status}")
 97    print(f"Total aggregated: {aggregator.get_stat('count', 0)}")
 98
 99if __name__ == "__main__":
100    main()

Expected Output:

[1] LOW: 0
[2] LOW: 10
[3] LOW: 20
[4] HIGH: 30
[5] HIGH: 40

Status: completed
Total aggregated: 5

Key Points:

  • Combine routing, processing, and aggregation

  • Use conditional routing for dynamic workflows

  • Fan-in pattern collects results from multiple paths

  • Error handling ensures resilience

Common Pitfalls

Pitfall 1: Using multiple execute() calls for aggregation

# Wrong: Each execute() creates separate execution
flow.execute(source1_id)  # JobState 1
flow.execute(source2_id)  # JobState 2
# Aggregator won't see both in same execution!

Solution: Emit multiple times from the same routine in a single execution.

Pitfall 2: Not resetting aggregator state

def aggregate(self, **kwargs):
    # Accumulates data but never resets
    # Next batch will include old data!
    all_data = self.input_slot._data

Solution: Reset state after processing: self.input_slot._data = {}

Pitfall 3: Wrong merge strategy for aggregation

# Wrong: "override" replaces data instead of accumulating
self.input_slot = self.define_slot("input", handler=self.aggregate)
# Use "append" instead!

Solution: Use merge_strategy="append" for aggregation patterns.

Best Practices

  1. Use single execute() for aggregation: Emit multiple times from one routine

  2. Reset aggregator state: Clear data after processing each batch

  3. Use appropriate merge strategies: “append” for accumulation, “override” for replacement

  4. Combine patterns thoughtfully: Routing, processing, aggregation work well together

  5. Handle errors at appropriate levels: Critical vs optional operations

  6. Test complex workflows: Verify all paths work correctly

Next Steps

Now that you understand advanced patterns, let’s move on to Serialization and Persistence to learn how to save and restore workflow state for persistence and recovery.