Advanced Patterns¶
In this tutorial, you’ll learn advanced patterns for building complex workflows, including aggregation patterns, conditional routing, and best practices for real-world applications.
Learning Objectives¶
By the end of this tutorial, you’ll be able to:
Implement proper aggregation patterns
Use conditional routing for dynamic workflows
Build complex branching and converging patterns
Handle multiple data sources correctly
Design scalable workflow architectures
Step 1: Aggregation Pattern - Collecting from Multiple Sources¶
A common pattern is collecting data from multiple sources before processing.
Important: Each execute() call creates an independent execution. To
collect data from multiple sources in a single execution, emit multiple times
from the same routine:
1from routilux import Flow, Routine
2
3class MultiSourceEmitter(Routine):
4 """Emit data from multiple sources in a single execution"""
5
6 def __init__(self):
7 super().__init__()
8 self.trigger_slot = self.define_slot("trigger", handler=self.emit_all)
9 self.output_event = self.define_event("output", ["data", "source"])
10
11 def emit_all(self, **kwargs):
12 # Emit multiple times in the same execution
13 sources = ["SourceA", "SourceB", "SourceC"]
14 for source in sources:
15 self.emit("output", data=f"Data from {source}", source=source)
16
17class Aggregator(Routine):
18 """Collect and process aggregated data"""
19
20 def __init__(self):
21 super().__init__()
22 # Store expected_count in _config (required for serialization)
23 self.set_config(expected_count=3)
24 self.input_slot = self.define_slot(
25 "input",
26 handler=self.aggregate,
27 merge_strategy="append" # Accumulate data
28 )
29 self.output_event = self.define_event("output", ["results"])
30 self.set_stat("message_count", 0)
31
32 def aggregate(self, data=None, source=None, **kwargs):
33 # Count messages
34 count = self.get_stat("message_count", 0) + 1
35 self.set_stat("message_count", count)
36
37 # With append strategy, data and source are lists
38 data_value = data or kwargs.get("data", [])
39 source_value = source or kwargs.get("source", [])
40
41 # Extract latest values for display
42 if isinstance(data_value, list) and data_value:
43 data_str = data_value[-1]
44 else:
45 data_str = data_value
46 if isinstance(source_value, list) and source_value:
47 source_str = source_value[-1]
48 else:
49 source_str = source_value
50
51 print(f"Received message {count}: {data_str} from {source_str}")
52
53 # Check if we have enough data
54 expected_count = self.get_config("expected_count", 3)
55 if count >= expected_count:
56 # Process all accumulated data
57 all_data = self.input_slot._data
58 print(f"All data collected: {all_data}")
59 self.emit("output", results=all_data)
60
61 # Reset for next batch
62 self.input_slot._data = {}
63 self.set_stat("message_count", 0)
64
65flow = Flow(flow_id="aggregation_flow")
66
67emitter = MultiSourceEmitter()
68aggregator = Aggregator() # expected_count stored in _config
69
70emitter_id = flow.add_routine(emitter, "emitter")
71agg_id = flow.add_routine(aggregator, "aggregator")
72
73flow.connect(emitter_id, "output", agg_id, "input")
74
75job_state = flow.execute(emitter_id)
76print(f"Status: {job_state.status}")
Expected Output:
Received message 1: Data from SourceA from SourceA
Received message 2: Data from SourceB from SourceB
Received message 3: Data from SourceC from SourceC
All data collected: {'data': ['Data from SourceA', 'Data from SourceB', 'Data from SourceC'], 'source': ['SourceA', 'SourceB', 'SourceC']}
Status: completed
Note: With merge_strategy="append", the handler receives lists (accumulated values).
The example extracts the latest value from each list for display purposes.
Key Points:
Emit multiple times from the same routine to collect in one execution
Use
merge_strategy="append"to accumulate dataTrack message count to know when aggregation is complete
Reset state after processing for next batch
Important: Don’t use multiple execute() calls for aggregation - each
creates an independent execution with separate JobState.
Step 2: Conditional Routing¶
Use conditional routing to dynamically route data based on conditions. Routilux
provides ConditionalRouter built-in routine:
1from routilux import Flow, Routine
2from routilux.builtin_routines import ConditionalRouter
3
4class DataSource(Routine):
5 def __init__(self):
6 super().__init__()
7 self.trigger_slot = self.define_slot("trigger", handler=self.send)
8 self.output_event = self.define_event("output", ["data"])
9
10 def send(self, value=None, **kwargs):
11 value = value or kwargs.get("value", 0)
12 self.emit("output", data={"value": value, "priority": "high" if value > 10 else "low"})
13
14class HighPriorityHandler(Routine):
15 def __init__(self):
16 super().__init__()
17 self.input_slot = self.define_slot("input", handler=self.handle)
18
19 def handle(self, data=None, **kwargs):
20 data_value = data or kwargs.get("data", {})
21 print(f"High priority handler: {data_value}")
22
23class LowPriorityHandler(Routine):
24 def __init__(self):
25 super().__init__()
26 self.input_slot = self.define_slot("input", handler=self.handle)
27
28 def handle(self, data=None, **kwargs):
29 data_value = data or kwargs.get("data", {})
30 print(f"Low priority handler: {data_value}")
31
32flow = Flow(flow_id="routing_flow")
33
34source = DataSource()
35router = ConditionalRouter()
36high_handler = HighPriorityHandler()
37low_handler = LowPriorityHandler()
38
39source_id = flow.add_routine(source, "source")
40router_id = flow.add_routine(router, "router")
41high_id = flow.add_routine(high_handler, "high_handler")
42low_id = flow.add_routine(low_handler, "low_handler")
43
44# Configure router
45router.set_config(
46 routes=[
47 ("high", "data.get('value', 0) > 10"),
48 ("low", "data.get('value', 0) <= 10"),
49 ],
50 default_route="low"
51)
52
53# Define router events
54router.define_event("high")
55router.define_event("low")
56
57# Connect: source -> router -> handlers
58flow.connect(source_id, "output", router_id, "input")
59flow.connect(router_id, "high", high_id, "input")
60flow.connect(router_id, "low", low_id, "input")
61
62# Test with high value
63print("=== High value (15) ===")
64job_state1 = flow.execute(source_id, entry_params={"value": 15})
65print(f"Status: {job_state1.status}")
66
67# Test with low value
68print("=== Low value (5) ===")
69job_state2 = flow.execute(source_id, entry_params={"value": 5})
70print(f"Status: {job_state2.status}")
Expected Output:
=== High value (15) ===
High priority handler: {'value': 15, 'priority': 'high'}
=== Low value (5) ===
Low priority handler: {'value': 5, 'priority': 'low'}
Key Points:
Use
ConditionalRouterfor dynamic routingRoutes are evaluated in order (first match wins)
Use Python expressions for conditions
Define events for each route
Step 3: Fan-Out and Fan-In Pattern¶
Combine fan-out (one-to-many) and fan-in (many-to-one) patterns:
1from routilux import Flow, Routine
2
3class DataSource(Routine):
4 def __init__(self):
5 super().__init__()
6 self.trigger_slot = self.define_slot("trigger", handler=self.send)
7 self.output_event = self.define_event("output", ["data"])
8
9 def send(self, **kwargs):
10 self.emit("output", data="test_data")
11
12class Processor1(Routine):
13 def __init__(self):
14 super().__init__()
15 self.input_slot = self.define_slot("input", handler=self.process)
16 self.output_event = self.define_event("output", ["result", "processor"])
17
18 def process(self, data=None, **kwargs):
19 data_value = data or kwargs.get("data", "")
20 result = f"P1: {data_value.upper()}"
21 self.emit("output", result=result, processor="P1")
22
23class Processor2(Routine):
24 def __init__(self):
25 super().__init__()
26 self.input_slot = self.define_slot("input", handler=self.process)
27 self.output_event = self.define_event("output", ["result", "processor"])
28
29 def process(self, data=None, **kwargs):
30 data_value = data or kwargs.get("data", "")
31 result = f"P2: {data_value.lower()}"
32 self.emit("output", result=result, processor="P2")
33
34class Aggregator(Routine):
35 def __init__(self):
36 super().__init__()
37 self.input_slot = self.define_slot(
38 "input",
39 handler=self.aggregate,
40 merge_strategy="append"
41 )
42
43 def aggregate(self, result=None, processor=None, **kwargs):
44 result_value = result or kwargs.get("result", "")
45 proc = processor or kwargs.get("processor", "")
46 print(f"Aggregated from {proc}: {result_value}")
47
48flow = Flow(flow_id="fanout_fanin_flow")
49
50source = DataSource()
51proc1 = Processor1()
52proc2 = Processor2()
53aggregator = Aggregator()
54
55source_id = flow.add_routine(source, "source")
56p1_id = flow.add_routine(proc1, "processor1")
57p2_id = flow.add_routine(proc2, "processor2")
58agg_id = flow.add_routine(aggregator, "aggregator")
59
60# Fan-out: source -> processor1 and processor2
61flow.connect(source_id, "output", p1_id, "input")
62flow.connect(source_id, "output", p2_id, "input")
63
64# Fan-in: processor1 and processor2 -> aggregator
65flow.connect(p1_id, "output", agg_id, "input")
66flow.connect(p2_id, "output", agg_id, "input")
67
68job_state = flow.execute(source_id)
69print(f"Status: {job_state.status}")
Expected Output:
Aggregated from P1: P1: TEST_DATA
Aggregated from P2: P2: test_data
Status: completed
Key Points:
Fan-out sends data to multiple processors
Fan-in collects results from multiple sources
Use merge_strategy=”append” for fan-in
Processors can run in parallel (with concurrent execution)
Step 4: Pipeline with Error Recovery¶
Build resilient pipelines with error recovery:
1from routilux import Flow, Routine, ErrorHandler, ErrorStrategy
2
3class DataSource(Routine):
4 def __init__(self):
5 super().__init__()
6 self.trigger_slot = self.define_slot("trigger", handler=self.send)
7 self.output_event = self.define_event("output", ["data"])
8
9 def send(self, **kwargs):
10 self.emit("output", data="test_data")
11
12class UnreliableValidator(Routine):
13 def __init__(self):
14 super().__init__()
15 self.input_slot = self.define_slot("input", handler=self.validate)
16 self.output_event = self.define_event("output", ["data", "valid"])
17 self.call_count = 0
18
19 def validate(self, data=None, **kwargs):
20 self.call_count += 1
21 data_value = data or kwargs.get("data", "")
22
23 if self.call_count < 2:
24 raise ValueError("Validation failed")
25
26 self.emit("output", data=data_value, valid=True)
27
28class Processor(Routine):
29 def __init__(self):
30 super().__init__()
31 self.input_slot = self.define_slot("input", handler=self.process)
32 self.output_event = self.define_event("output", ["result"])
33
34 def process(self, data=None, valid=None, **kwargs):
35 data_value = data or kwargs.get("data", "")
36 is_valid = valid if valid is not None else kwargs.get("valid", False)
37
38 if is_valid:
39 result = f"Processed: {data_value}"
40 self.emit("output", result=result)
41 else:
42 raise ValueError("Cannot process invalid data")
43
44class Sink(Routine):
45 def __init__(self):
46 super().__init__()
47 self.input_slot = self.define_slot("input", handler=self.receive)
48
49 def receive(self, result=None, **kwargs):
50 result_value = result or kwargs.get("result", "")
51 print(f"Final result: {result_value}")
52
53flow = Flow(flow_id="resilient_pipeline")
54
55source = DataSource()
56validator = UnreliableValidator()
57processor = Processor()
58sink = Sink()
59
60source_id = flow.add_routine(source, "source")
61validator_id = flow.add_routine(validator, "validator")
62processor_id = flow.add_routine(processor, "processor")
63sink_id = flow.add_routine(sink, "sink")
64
65flow.connect(source_id, "output", validator_id, "input")
66flow.connect(validator_id, "output", processor_id, "input")
67flow.connect(processor_id, "output", sink_id, "input")
68
69# Set error handlers
70validator.set_as_critical(max_retries=3, retry_delay=0.1)
71processor.set_error_handler(ErrorHandler(strategy=ErrorStrategy.CONTINUE))
72
73job_state = flow.execute(source_id)
74print(f"Status: {job_state.status}")
75print(f"Validator attempts: {validator.call_count}")
Expected Output:
Final result: Processed: test_data
Status: completed
Validator attempts: 2
Key Points:
Combine error handling with pipeline patterns
Use retry for transient failures
Use continue for non-critical steps
Build resilient workflows that recover from errors
Step 5: Complete Example - Data Processing Pipeline¶
Here’s a complete example combining multiple advanced patterns:
1from routilux import Flow, Routine, ErrorHandler, ErrorStrategy
2from routilux.builtin_routines import ConditionalRouter
3
4class DataSource(Routine):
5 def __init__(self):
6 super().__init__()
7 self.trigger_slot = self.define_slot("trigger", handler=self.generate)
8 self.output_event = self.define_event("output", ["data"])
9
10 def generate(self, count=5, **kwargs):
11 count = count or kwargs.get("count", 5)
12 for i in range(count):
13 self.emit("output", data={"id": i, "value": i * 10})
14
15class Router(Routine):
16 def __init__(self):
17 super().__init__()
18 self.input_slot = self.define_slot("input", handler=self.route)
19 self.high_event = self.define_event("high", ["data"])
20 self.low_event = self.define_event("low", ["data"])
21
22 def route(self, data=None, **kwargs):
23 data_value = data or kwargs.get("data", {})
24 value = data_value.get("value", 0)
25
26 if value >= 30:
27 self.emit("high", data=data_value)
28 else:
29 self.emit("low", data=data_value)
30
31class HighValueProcessor(Routine):
32 def __init__(self):
33 super().__init__()
34 self.input_slot = self.define_slot("input", handler=self.process)
35 self.output_event = self.define_event("output", ["result"])
36
37 def process(self, data=None, **kwargs):
38 data_value = data or kwargs.get("data", {})
39 result = f"HIGH: {data_value['value']}"
40 self.emit("output", result=result)
41
42class LowValueProcessor(Routine):
43 def __init__(self):
44 super().__init__()
45 self.input_slot = self.define_slot("input", handler=self.process)
46 self.output_event = self.define_event("output", ["result"])
47
48 def process(self, data=None, **kwargs):
49 data_value = data or kwargs.get("data", {})
50 result = f"LOW: {data_value['value']}"
51 self.emit("output", result=result)
52
53class Aggregator(Routine):
54 def __init__(self):
55 super().__init__()
56 self.input_slot = self.define_slot(
57 "input",
58 handler=self.aggregate,
59 merge_strategy="append"
60 )
61 self.set_stat("count", 0)
62
63 def aggregate(self, result=None, **kwargs):
64 result_value = result or kwargs.get("result", "")
65 count = self.get_stat("count", 0) + 1
66 self.set_stat("count", count)
67 print(f"[{count}] {result_value}")
68
69def main():
70 flow = Flow(flow_id="advanced_pipeline")
71
72 source = DataSource()
73 router = Router()
74 high_processor = HighValueProcessor()
75 low_processor = LowValueProcessor()
76 aggregator = Aggregator()
77
78 source_id = flow.add_routine(source, "source")
79 router_id = flow.add_routine(router, "router")
80 high_id = flow.add_routine(high_processor, "high_processor")
81 low_id = flow.add_routine(low_processor, "low_processor")
82 agg_id = flow.add_routine(aggregator, "aggregator")
83
84 # Connect pipeline
85 flow.connect(source_id, "output", router_id, "input")
86 flow.connect(router_id, "high", high_id, "input")
87 flow.connect(router_id, "low", low_id, "input")
88 flow.connect(high_id, "output", agg_id, "input")
89 flow.connect(low_id, "output", agg_id, "input")
90
91 # Set error handlers
92 flow.set_error_handler(ErrorHandler(strategy=ErrorStrategy.CONTINUE))
93
94 job_state = flow.execute(source_id, entry_params={"count": 5})
95
96 print(f"\nStatus: {job_state.status}")
97 print(f"Total aggregated: {aggregator.get_stat('count', 0)}")
98
99if __name__ == "__main__":
100 main()
Expected Output:
[1] LOW: 0
[2] LOW: 10
[3] LOW: 20
[4] HIGH: 30
[5] HIGH: 40
Status: completed
Total aggregated: 5
Key Points:
Combine routing, processing, and aggregation
Use conditional routing for dynamic workflows
Fan-in pattern collects results from multiple paths
Error handling ensures resilience
Common Pitfalls¶
Pitfall 1: Using multiple execute() calls for aggregation
# Wrong: Each execute() creates separate execution
flow.execute(source1_id) # JobState 1
flow.execute(source2_id) # JobState 2
# Aggregator won't see both in same execution!
Solution: Emit multiple times from the same routine in a single execution.
Pitfall 2: Not resetting aggregator state
def aggregate(self, **kwargs):
# Accumulates data but never resets
# Next batch will include old data!
all_data = self.input_slot._data
Solution: Reset state after processing: self.input_slot._data = {}
Pitfall 3: Wrong merge strategy for aggregation
# Wrong: "override" replaces data instead of accumulating
self.input_slot = self.define_slot("input", handler=self.aggregate)
# Use "append" instead!
Solution: Use merge_strategy="append" for aggregation patterns.
Best Practices¶
Use single execute() for aggregation: Emit multiple times from one routine
Reset aggregator state: Clear data after processing each batch
Use appropriate merge strategies: “append” for accumulation, “override” for replacement
Combine patterns thoughtfully: Routing, processing, aggregation work well together
Handle errors at appropriate levels: Critical vs optional operations
Test complex workflows: Verify all paths work correctly
Next Steps¶
Now that you understand advanced patterns, let’s move on to Serialization and Persistence to learn how to save and restore workflow state for persistence and recovery.