Aggregation Pattern¶
The aggregation pattern allows a routine to wait for all expected messages from multiple upstream routines before processing and emitting results. This is useful for scenarios like aggregating search results from multiple sources, collecting data from parallel tasks, or combining outputs from different processors.
Overview¶
When you need to collect data from multiple sources and process it together, you can use the append merge strategy combined with a counter check in the handler. This pattern ensures that:
All incoming messages are accumulated
The handler is called for each message
Processing only occurs when all expected messages are received
Results are emitted once after all data is collected
Key Concepts¶
- Merge Strategy: “append”
With
merge_strategy="append", each incoming message’s values are appended to lists. This allows you to accumulate data over multiple receive operations.- Message Counting
In the handler, check the length of any list field to determine how many messages have been received. When the count reaches the expected number, process all accumulated data.
- Single Processing
Use a flag to ensure processing only happens once, even if the handler is called multiple times with the same count.
Basic Example¶
Here’s a simple aggregator that waits for 3 messages:
from routilux import Flow, Routine
class SourceRoutine(Routine):
def __init__(self, source_id: str):
super().__init__()
self.source_id = source_id
self.output_event = self.define_event("output", ["data", "source_id"])
def __call__(self, **kwargs):
super().__call__(**kwargs)
data = kwargs.get("data", f"data_from_{self.source_id}")
self.emit("output", data=data, source_id=self.source_id)
class AggregatorRoutine(Routine):
"""Aggregator that waits for all expected messages."""
def __init__(self, expected_count: int = 3):
super().__init__()
self.expected_count = expected_count
self.set_config(expected_count=expected_count)
self.processed = False # Flag to ensure single processing
# Use append strategy to accumulate data
self.input_slot = self.define_slot(
"input",
handler=self._handle_input,
merge_strategy="append" # Key: append strategy
)
self.output_event = self.define_event("aggregated", ["all_data", "count"])
def _handle_input(self, **kwargs):
"""Handle input and check if all messages received."""
# With append strategy, kwargs contains lists
# Count messages using any list field
received_count = 0
if "source_id" in kwargs and isinstance(kwargs["source_id"], list):
received_count = len(kwargs["source_id"])
elif "data" in kwargs and isinstance(kwargs["data"], list):
received_count = len(kwargs["data"])
expected_count = self.get_config("expected_count", self.expected_count)
# Process only when all messages received and not already processed
if received_count >= expected_count and not self.processed:
self.processed = True
# Extract accumulated data
all_data = []
if "data" in kwargs and isinstance(kwargs["data"], list):
all_data = kwargs["data"]
# Emit aggregated result
self.emit("aggregated", all_data=all_data, count=len(all_data))
# Reset for next aggregation (optional)
self.input_slot._data = {}
# Create flow
flow = Flow(flow_id="aggregator_demo")
# Create sources
source1 = SourceRoutine("source1")
source2 = SourceRoutine("source2")
source3 = SourceRoutine("source3")
# Create aggregator
aggregator = AggregatorRoutine(expected_count=3)
# Add to flow
id1 = flow.add_routine(source1, "source1")
id2 = flow.add_routine(source2, "source2")
id3 = flow.add_routine(source3, "source3")
agg_id = flow.add_routine(aggregator, "aggregator")
# Connect all sources to aggregator
flow.connect(id1, "output", agg_id, "input")
flow.connect(id2, "output", agg_id, "input")
flow.connect(id3, "output", agg_id, "input")
# Execute sources
flow.execute(id1, entry_params={"data": "data1"})
flow.execute(id2, entry_params={"data": "data2"})
flow.execute(id3, entry_params={"data": "data3"})
# Aggregator will process when all 3 messages are received
How It Works¶
Append Strategy: When
merge_strategy="append"is used, eachreceive()call appends values to lists inslot._data.Handler Invocation: The handler is called after each
receive()with the accumulated data (where values are lists).Message Counting: Check the length of any list field in
kwargsto count received messages.Conditional Processing: Only process when: - Count reaches expected number - Not already processed (use a flag)
Data Extraction: Extract all accumulated data from the lists and process it together.
Emission: Emit the aggregated result once.
Reset (Optional): Clear
slot._datato prepare for the next aggregation cycle.
Complete Example: Search Result Aggregation¶
Here’s a complete example that aggregates search results from multiple search engines:
1"""
2Aggregator Routine Demo
3
4Demonstrates how to create a routine that waits for all expected messages
5before processing and emitting results.
6"""
7
8import time
9
10from routilux import Flow, Routine
11
12
13class SearchTask(Routine):
14 """A search task routine that simulates searching."""
15
16 def __init__(self):
17 super().__init__()
18 self.set_config(task_name="default")
19
20 # Define trigger slot
21 self.trigger_slot = self.define_slot("trigger", handler=self._handle_trigger)
22
23 # Define output event
24 self.output_event = self.define_event("result", ["query", "results", "task_name"])
25
26 def _handle_trigger(self, query: str = None, **kwargs):
27 """Handle search trigger."""
28 query = query or kwargs.get("query", "default")
29
30 # Simulate search operation
31 time.sleep(0.1) # Simulate I/O delay
32
33 # Generate mock results
34 results = [
35 f"{self.task_name}_result_1",
36 f"{self.task_name}_result_2",
37 f"{self.task_name}_result_3",
38 ]
39
40 # Track operation
41 self._track_operation("searches", success=True, results_count=len(results))
42
43 # Emit results
44 self.emit("result", query=query, results=results, task_name=self.task_name)
45
46
47class ResultAggregator(Routine):
48 """Aggregator routine that waits for all expected messages before processing."""
49
50 def __init__(self):
51 super().__init__()
52 self.set_config(expected_count=3)
53
54 # Set configuration
55 self.set_config(timeout=10.0) # Optional timeout
56
57 # Define input slot with append strategy to collect all results
58 self.input_slot = self.define_slot(
59 "input",
60 handler=self._handle_input,
61 merge_strategy="append", # Collect all incoming data
62 )
63
64 # Define output event
65 self.output_event = self.define_event(
66 "aggregated", ["all_results", "total_count", "queries"]
67 )
68
69 def _handle_input(self, **kwargs):
70 """Handle input and check if we have all expected messages.
71
72 With merge_strategy="append", each receive() call adds to the accumulated data.
73 The handler receives the merged data (with lists), so we can check the length
74 of any list field to determine how many messages we've received.
75
76 Args:
77 **kwargs: Merged data from slot. With append strategy, values are lists.
78 For example: {'task_name': ['task1', 'task2'], 'results': [[...], [...]]}
79 """
80 # With merge_strategy="append", kwargs contains accumulated data where
81 # values are lists. We can check the length of any list to count messages.
82
83 # Count how many messages we've received
84 # Use task_name list length as the count (since each message has a task_name)
85 received_count = 0
86 if "task_name" in kwargs and isinstance(kwargs["task_name"], list):
87 received_count = len(kwargs["task_name"])
88 elif "results" in kwargs and isinstance(kwargs["results"], list):
89 # If task_name not available, use results list length
90 received_count = len(kwargs["results"])
91 elif "query" in kwargs and isinstance(kwargs["query"], list):
92 received_count = len(kwargs["query"])
93 else:
94 # Fallback: count any list field
95 for key, value in kwargs.items():
96 if isinstance(value, list) and value:
97 received_count = len(value)
98 break
99
100 expected_count = self.get_config("expected_count", 3)
101
102 # Get current task_name from kwargs if available
103 current_task = kwargs.get("task_name", "unknown")
104 if isinstance(current_task, list) and current_task:
105 current_task = current_task[-1] # Get last one
106
107 print(
108 f"Aggregator received message from {current_task}. "
109 f"Total received: {received_count}/{expected_count}"
110 )
111
112 # Check if we've received all expected messages
113 if received_count >= expected_count:
114 print(f"✅ All {expected_count} messages received! Processing aggregated results...")
115
116 # Process all accumulated data (kwargs contains the merged data)
117 self._process_aggregated_results(kwargs)
118
119 # Reset for next aggregation (optional)
120 self.input_slot._data = {}
121 else:
122 print(f"⏳ Waiting for more messages ({received_count}/{expected_count})...")
123
124 def _process_aggregated_results(self, accumulated_data: dict):
125 """Process all aggregated results and emit.
126
127 Args:
128 accumulated_data: Dictionary with accumulated data. With append strategy,
129 values are lists containing all received values.
130 """
131 # Track operation
132 self._track_operation("aggregations", success=True)
133
134 # Extract all results
135 all_results = []
136 queries = []
137 task_names = []
138
139 if "results" in accumulated_data:
140 # results is a list of lists (each search task's results)
141 results_list = accumulated_data["results"]
142 if isinstance(results_list, list):
143 for result_list in results_list:
144 if isinstance(result_list, list):
145 all_results.extend(result_list)
146 else:
147 all_results.append(result_list)
148
149 if "query" in accumulated_data:
150 query_list = accumulated_data["query"]
151 queries = query_list if isinstance(query_list, list) else [query_list]
152
153 if "task_name" in accumulated_data:
154 task_name_list = accumulated_data["task_name"]
155 task_names = task_name_list if isinstance(task_name_list, list) else [task_name_list]
156
157 print(f"📊 Aggregated {len(all_results)} results from {len(task_names)} search tasks")
158
159 # Emit aggregated result
160 self.emit(
161 "aggregated", all_results=all_results, total_count=len(all_results), queries=queries
162 )
163
164 # Reset for next aggregation (optional)
165 # self.input_slot._data = {}
166
167
168def demo_aggregator():
169 """Demonstrate aggregator routine."""
170 print("=" * 70)
171 print("Aggregator Routine Demo")
172 print("=" * 70)
173
174 # Create flow
175 flow = Flow(flow_id="aggregator_demo")
176
177 # Create search tasks
178 search1 = SearchTask()
179 search1.set_config(task_name="SearchEngine1")
180 search2 = SearchTask()
181 search2.set_config(task_name="SearchEngine2")
182 search3 = SearchTask()
183 search3.set_config(task_name="SearchEngine3")
184
185 # Create aggregator (expects 3 results)
186 aggregator = ResultAggregator()
187 aggregator.set_config(expected_count=3)
188
189 # Add to flow
190 id1 = flow.add_routine(search1, "search1")
191 id2 = flow.add_routine(search2, "search2")
192 id3 = flow.add_routine(search3, "search3")
193 agg_id = flow.add_routine(aggregator, "aggregator")
194
195 # Connect all search tasks to aggregator
196 flow.connect(id1, "result", agg_id, "input")
197 flow.connect(id2, "result", agg_id, "input")
198 flow.connect(id3, "result", agg_id, "input")
199
200 # Create a consumer to receive aggregated results
201 class ResultConsumer(Routine):
202 def __init__(self):
203 super().__init__()
204 self.received_results = []
205 self.input_slot = self.define_slot("input", handler=self._handle_input)
206
207 def _handle_input(self, all_results: list = None, total_count: int = None, **kwargs):
208 self.received_results.append({"results": all_results, "count": total_count})
209 print(f"📦 Consumer received aggregated result: {total_count} total results")
210
211 consumer = ResultConsumer()
212 consumer_id = flow.add_routine(consumer, "consumer")
213 flow.connect(agg_id, "aggregated", consumer_id, "input")
214
215 # Create a multi-source trigger routine that triggers all search tasks
216 class MultiSourceTrigger(Routine):
217 """Trigger routine that emits to multiple search tasks in a single execute()"""
218
219 def __init__(self):
220 super().__init__()
221 self.trigger_slot = self.define_slot("trigger", handler=self._handle_trigger)
222 self.output_event = self.define_event("trigger_search", ["query"])
223
224 def _handle_trigger(self, query: str = None, **kwargs):
225 """Trigger all search tasks in a single execute() call"""
226 query = query or kwargs.get("query", "test query")
227 # Emit to all connected search tasks - they all share the same execution
228 # Flow is automatically detected from routine context
229 self.emit("trigger_search", query=query)
230
231 trigger = MultiSourceTrigger()
232 trigger_id = flow.add_routine(trigger, "trigger")
233
234 # Connect trigger to all search tasks
235 flow.connect(trigger_id, "trigger_search", id1, "trigger")
236 flow.connect(trigger_id, "trigger_search", id2, "trigger")
237 flow.connect(trigger_id, "trigger_search", id3, "trigger")
238
239 print("\nFlow structure:")
240 print(" trigger -> search1 -> aggregator -> consumer")
241 print(" trigger -> search2 -> aggregator")
242 print(" trigger -> search3 -> aggregator")
243 print("\nAggregator expects: 3 messages")
244
245 # Execute once - all search tasks will be triggered in the same execution
246 print("\n🚀 Executing all search tasks (single execute, multiple emits)...")
247 job_state = flow.execute(trigger_id, entry_params={"query": "test query"})
248
249 # Wait for all async tasks to complete
250 from routilux.job_state import JobState
251
252 JobState.wait_for_completion(flow, job_state, timeout=2.0)
253
254 print("\n" + "=" * 70)
255 print("Results:")
256 # Execution state is tracked in JobState, not routine._stats
257 # Note: job_state is not available in this scope, execution state would be in JobState
258 print(f" Consumer received: {len(consumer.received_results)} aggregated result(s)")
259 if consumer.received_results:
260 print(f" Total results in aggregation: {consumer.received_results[0]['count']}")
261 print("=" * 70)
262
263
264if __name__ == "__main__":
265 demo_aggregator()
Key Points¶
- Handler is Called for Each Message
The handler is called immediately after each message is received. You check the count inside the handler to decide when to process.
- Append Strategy Behavior
With
merge_strategy="append":First message:
kwargs = {"data": ["data1"], "source_id": ["source1"]}Second message:
kwargs = {"data": ["data1", "data2"], "source_id": ["source1", "source2"]}Third message:
kwargs = {"data": ["data1", "data2", "data3"], "source_id": ["source1", "source2", "source3"]}
- Counting Messages
Use any field that appears in every message to count:
if "source_id" in kwargs and isinstance(kwargs["source_id"], list): count = len(kwargs["source_id"]) elif "data" in kwargs and isinstance(kwargs["data"], list): count = len(kwargs["data"])
- Preventing Duplicate Processing
Use a flag to ensure processing only happens once:
if count >= expected_count and not self.processed: self.processed = True # Process and emit
- Resetting for Next Cycle
After processing, optionally reset the slot data:
self.input_slot._data = {}
Concurrent Execution¶
The aggregation pattern works the same way in concurrent execution mode. However, be aware that:
Handler calls may be interleaved across threads
Use thread-safe operations if handlers share state
The total number of calls remains the same as sequential mode
Example with concurrent execution:
flow = Flow(execution_strategy="concurrent", max_workers=5)
# Same setup as before
# ...
# Execute sources concurrently
flow.execute(id1, entry_params={"data": "data1"})
flow.execute(id2, entry_params={"data": "data2"})
flow.execute(id3, entry_params={"data": "data3"})
# Wait for completion
from routilux.job_state import JobState
JobState.wait_for_completion(flow, job_state, timeout=10.0)
# Aggregator will still process when all 3 messages are received
Best Practices¶
Use Configuration: Store
expected_countin_configfor flexibility:self.set_config(expected_count=expected_count)
Check List Type: Always check if a value is a list before using
len():if "field" in kwargs and isinstance(kwargs["field"], list): count = len(kwargs["field"])
Prevent Duplicate Processing: Use a flag to ensure processing happens only once:
if count >= expected_count and not self.processed: self.processed = True # Process
Reset After Processing: Clear slot data after processing to prepare for the next aggregation cycle:
self.input_slot._data = {}
Handle Edge Cases: Consider what happens if: - Fewer messages arrive than expected (timeout handling) - More messages arrive than expected - Messages arrive out of order
Thread Safety: In concurrent mode, if you need to share state between handlers, use thread-safe operations (locks, atomic operations).
Common Patterns¶
- Timeout Handling
Add timeout logic to process even if not all messages arrive:
import time def __init__(self, expected_count: int = 3, timeout: float = 10.0): # ... self.start_time = None self.set_config(timeout=timeout) def _handle_input(self, **kwargs): if self.start_time is None: self.start_time = time.time() # ... count messages ... timeout = self.get_config("timeout", 10.0) elapsed = time.time() - self.start_time if (count >= expected_count) or (elapsed >= timeout): # Process with available data
- Dynamic Expected Count
Allow the expected count to be set dynamically:
def set_expected_count(self, count: int): self.set_config(expected_count=count) self.expected_count = count
- Multiple Aggregation Cycles
Reset properly to support multiple aggregation cycles:
def _handle_input(self, **kwargs): # ... check count ... if count >= expected_count and not self.processed: self.processed = True # Process self.input_slot._data = {} self.processed = False # Reset for next cycle
See Also¶
Working with Routines - Working with routines
Working with Connections - Connecting routines
Working with Flows - Flow execution and concurrent mode