Aggregation Pattern¶
The aggregation pattern allows a routine to wait for all expected messages from multiple upstream routines before processing and emitting results. This is useful for scenarios like aggregating search results from multiple sources, collecting data from parallel tasks, or combining outputs from different processors.
Overview¶
When you need to collect data from multiple sources and process it together, you can use the append merge strategy combined with a counter check in the handler. This pattern ensures that:
All incoming messages are accumulated
The handler is called for each message
Processing only occurs when all expected messages are received
Results are emitted once after all data is collected
Key Concepts¶
- Merge Strategy: “append”
With
merge_strategy="append", each incoming message’s values are appended to lists. This allows you to accumulate data over multiple receive operations.- Message Counting
In the handler, check the length of any list field to determine how many messages have been received. When the count reaches the expected number, process all accumulated data.
- Single Processing
Use a flag to ensure processing only happens once, even if the handler is called multiple times with the same count.
Basic Example¶
Here’s a simple aggregator that waits for 3 messages:
from routilux import Flow, Routine
class SourceRoutine(Routine):
def __init__(self, source_id: str):
super().__init__()
self.source_id = source_id
self.output_event = self.define_event("output", ["data", "source_id"])
def __call__(self, **kwargs):
super().__call__(**kwargs)
data = kwargs.get("data", f"data_from_{self.source_id}")
self.emit("output", data=data, source_id=self.source_id)
class AggregatorRoutine(Routine):
"""Aggregator that waits for all expected messages."""
def __init__(self, expected_count: int = 3):
super().__init__()
self.expected_count = expected_count
self.set_config(expected_count=expected_count)
self.processed = False # Flag to ensure single processing
# Use append strategy to accumulate data
self.input_slot = self.define_slot(
"input",
handler=self._handle_input,
merge_strategy="append" # Key: append strategy
)
self.output_event = self.define_event("aggregated", ["all_data", "count"])
def _handle_input(self, **kwargs):
"""Handle input and check if all messages received."""
# With append strategy, kwargs contains lists
# Count messages using any list field
received_count = 0
if "source_id" in kwargs and isinstance(kwargs["source_id"], list):
received_count = len(kwargs["source_id"])
elif "data" in kwargs and isinstance(kwargs["data"], list):
received_count = len(kwargs["data"])
expected_count = self.get_config("expected_count", self.expected_count)
# Process only when all messages received and not already processed
if received_count >= expected_count and not self.processed:
self.processed = True
# Extract accumulated data
all_data = []
if "data" in kwargs and isinstance(kwargs["data"], list):
all_data = kwargs["data"]
# Emit aggregated result
self.emit("aggregated", all_data=all_data, count=len(all_data))
# Reset for next aggregation (optional)
self.input_slot._data = {}
# Create flow
flow = Flow(flow_id="aggregator_demo")
# Create sources
source1 = SourceRoutine("source1")
source2 = SourceRoutine("source2")
source3 = SourceRoutine("source3")
# Create aggregator
aggregator = AggregatorRoutine(expected_count=3)
# Add to flow
id1 = flow.add_routine(source1, "source1")
id2 = flow.add_routine(source2, "source2")
id3 = flow.add_routine(source3, "source3")
agg_id = flow.add_routine(aggregator, "aggregator")
# Connect all sources to aggregator
flow.connect(id1, "output", agg_id, "input")
flow.connect(id2, "output", agg_id, "input")
flow.connect(id3, "output", agg_id, "input")
# Execute sources
flow.execute(id1, entry_params={"data": "data1"})
flow.execute(id2, entry_params={"data": "data2"})
flow.execute(id3, entry_params={"data": "data3"})
# Aggregator will process when all 3 messages are received
How It Works¶
Append Strategy: When
merge_strategy="append"is used, eachreceive()call appends values to lists inslot._data.Handler Invocation: The handler is called after each
receive()with the accumulated data (where values are lists).Message Counting: Check the length of any list field in
kwargsto count received messages.Conditional Processing: Only process when: - Count reaches expected number - Not already processed (use a flag)
Data Extraction: Extract all accumulated data from the lists and process it together.
Emission: Emit the aggregated result once.
Reset (Optional): Clear
slot._datato prepare for the next aggregation cycle.
Complete Example: Search Result Aggregation¶
Here’s a complete example that aggregates search results from multiple search engines:
1"""
2Aggregator Routine Demo
3
4Demonstrates how to create a routine that waits for all expected messages
5before processing and emitting results.
6"""
7
8from routilux import Flow, Routine
9import time
10
11
12class SearchTask(Routine):
13 """A search task routine that simulates searching."""
14
15 def __init__(self):
16 super().__init__()
17 self.set_config(task_name="default")
18
19 # Define trigger slot
20 self.trigger_slot = self.define_slot("trigger", handler=self._handle_trigger)
21
22 # Define output event
23 self.output_event = self.define_event("result", ["query", "results", "task_name"])
24
25 def _handle_trigger(self, query: str = None, **kwargs):
26 """Handle search trigger."""
27 query = query or kwargs.get("query", "default")
28
29 # Simulate search operation
30 time.sleep(0.1) # Simulate I/O delay
31
32 # Generate mock results
33 results = [
34 f"{self.task_name}_result_1",
35 f"{self.task_name}_result_2",
36 f"{self.task_name}_result_3",
37 ]
38
39 # Track operation
40 self._track_operation("searches", success=True, results_count=len(results))
41
42 # Emit results
43 self.emit("result", query=query, results=results, task_name=self.task_name)
44
45
46class ResultAggregator(Routine):
47 """Aggregator routine that waits for all expected messages before processing."""
48
49 def __init__(self):
50 super().__init__()
51 self.set_config(expected_count=3)
52
53 # Set configuration
54 self.set_config(timeout=10.0) # Optional timeout
55
56 # Define input slot with append strategy to collect all results
57 self.input_slot = self.define_slot(
58 "input",
59 handler=self._handle_input,
60 merge_strategy="append", # Collect all incoming data
61 )
62
63 # Define output event
64 self.output_event = self.define_event(
65 "aggregated", ["all_results", "total_count", "queries"]
66 )
67
68 def _handle_input(self, **kwargs):
69 """Handle input and check if we have all expected messages.
70
71 With merge_strategy="append", each receive() call adds to the accumulated data.
72 The handler receives the merged data (with lists), so we can check the length
73 of any list field to determine how many messages we've received.
74
75 Args:
76 **kwargs: Merged data from slot. With append strategy, values are lists.
77 For example: {'task_name': ['task1', 'task2'], 'results': [[...], [...]]}
78 """
79 # With merge_strategy="append", kwargs contains accumulated data where
80 # values are lists. We can check the length of any list to count messages.
81
82 # Count how many messages we've received
83 # Use task_name list length as the count (since each message has a task_name)
84 received_count = 0
85 if "task_name" in kwargs and isinstance(kwargs["task_name"], list):
86 received_count = len(kwargs["task_name"])
87 elif "results" in kwargs and isinstance(kwargs["results"], list):
88 # If task_name not available, use results list length
89 received_count = len(kwargs["results"])
90 elif "query" in kwargs and isinstance(kwargs["query"], list):
91 received_count = len(kwargs["query"])
92 else:
93 # Fallback: count any list field
94 for key, value in kwargs.items():
95 if isinstance(value, list) and value:
96 received_count = len(value)
97 break
98
99 expected_count = self.get_config("expected_count", 3)
100
101 # Get current task_name from kwargs if available
102 current_task = kwargs.get("task_name", "unknown")
103 if isinstance(current_task, list) and current_task:
104 current_task = current_task[-1] # Get last one
105
106 print(
107 f"Aggregator received message from {current_task}. "
108 f"Total received: {received_count}/{expected_count}"
109 )
110
111 # Check if we've received all expected messages
112 if received_count >= expected_count:
113 print(f"✅ All {expected_count} messages received! Processing aggregated results...")
114
115 # Process all accumulated data (kwargs contains the merged data)
116 self._process_aggregated_results(kwargs)
117
118 # Reset for next aggregation (optional)
119 self.input_slot._data = {}
120 else:
121 print(f"⏳ Waiting for more messages ({received_count}/{expected_count})...")
122
123 def _process_aggregated_results(self, accumulated_data: dict):
124 """Process all aggregated results and emit.
125
126 Args:
127 accumulated_data: Dictionary with accumulated data. With append strategy,
128 values are lists containing all received values.
129 """
130 # Track operation
131 self._track_operation("aggregations", success=True)
132
133 # Extract all results
134 all_results = []
135 queries = []
136 task_names = []
137
138 if "results" in accumulated_data:
139 # results is a list of lists (each search task's results)
140 results_list = accumulated_data["results"]
141 if isinstance(results_list, list):
142 for result_list in results_list:
143 if isinstance(result_list, list):
144 all_results.extend(result_list)
145 else:
146 all_results.append(result_list)
147
148 if "query" in accumulated_data:
149 query_list = accumulated_data["query"]
150 queries = query_list if isinstance(query_list, list) else [query_list]
151
152 if "task_name" in accumulated_data:
153 task_name_list = accumulated_data["task_name"]
154 task_names = task_name_list if isinstance(task_name_list, list) else [task_name_list]
155
156 print(f"📊 Aggregated {len(all_results)} results from {len(task_names)} search tasks")
157
158 # Emit aggregated result
159 self.emit(
160 "aggregated", all_results=all_results, total_count=len(all_results), queries=queries
161 )
162
163 # Reset for next aggregation (optional)
164 # self.input_slot._data = {}
165
166
167def demo_aggregator():
168 """Demonstrate aggregator routine."""
169 print("=" * 70)
170 print("Aggregator Routine Demo")
171 print("=" * 70)
172
173 # Create flow
174 flow = Flow(flow_id="aggregator_demo")
175
176 # Create search tasks
177 search1 = SearchTask()
178 search1.set_config(task_name="SearchEngine1")
179 search2 = SearchTask()
180 search2.set_config(task_name="SearchEngine2")
181 search3 = SearchTask()
182 search3.set_config(task_name="SearchEngine3")
183
184 # Create aggregator (expects 3 results)
185 aggregator = ResultAggregator()
186 aggregator.set_config(expected_count=3)
187
188 # Add to flow
189 id1 = flow.add_routine(search1, "search1")
190 id2 = flow.add_routine(search2, "search2")
191 id3 = flow.add_routine(search3, "search3")
192 agg_id = flow.add_routine(aggregator, "aggregator")
193
194 # Connect all search tasks to aggregator
195 flow.connect(id1, "result", agg_id, "input")
196 flow.connect(id2, "result", agg_id, "input")
197 flow.connect(id3, "result", agg_id, "input")
198
199 # Create a consumer to receive aggregated results
200 class ResultConsumer(Routine):
201 def __init__(self):
202 super().__init__()
203 self.received_results = []
204 self.input_slot = self.define_slot("input", handler=self._handle_input)
205
206 def _handle_input(self, all_results: list = None, total_count: int = None, **kwargs):
207 self.received_results.append({"results": all_results, "count": total_count})
208 print(f"📦 Consumer received aggregated result: {total_count} total results")
209
210 consumer = ResultConsumer()
211 consumer_id = flow.add_routine(consumer, "consumer")
212 flow.connect(agg_id, "aggregated", consumer_id, "input")
213
214 # Create a multi-source trigger routine that triggers all search tasks
215 class MultiSourceTrigger(Routine):
216 """Trigger routine that emits to multiple search tasks in a single execute()"""
217
218 def __init__(self):
219 super().__init__()
220 self.trigger_slot = self.define_slot("trigger", handler=self._handle_trigger)
221 self.output_event = self.define_event("trigger_search", ["query"])
222
223 def _handle_trigger(self, query: str = None, **kwargs):
224 """Trigger all search tasks in a single execute() call"""
225 query = query or kwargs.get("query", "test query")
226 # Emit to all connected search tasks - they all share the same execution
227 # Flow is automatically detected from routine context
228 self.emit("trigger_search", query=query)
229
230 trigger = MultiSourceTrigger()
231 trigger_id = flow.add_routine(trigger, "trigger")
232
233 # Connect trigger to all search tasks
234 flow.connect(trigger_id, "trigger_search", id1, "trigger")
235 flow.connect(trigger_id, "trigger_search", id2, "trigger")
236 flow.connect(trigger_id, "trigger_search", id3, "trigger")
237
238 print("\nFlow structure:")
239 print(" trigger -> search1 -> aggregator -> consumer")
240 print(" trigger -> search2 -> aggregator")
241 print(" trigger -> search3 -> aggregator")
242 print("\nAggregator expects: 3 messages")
243
244 # Execute once - all search tasks will be triggered in the same execution
245 print("\n🚀 Executing all search tasks (single execute, multiple emits)...")
246 job_state = flow.execute(trigger_id, entry_params={"query": "test query"})
247
248 # Wait for all async tasks to complete
249 from routilux.job_state import JobState
250
251 JobState.wait_for_completion(flow, job_state, timeout=2.0)
252
253 print("\n" + "=" * 70)
254 print("Results:")
255 # Execution state is tracked in JobState, not routine._stats
256 # Note: job_state is not available in this scope, execution state would be in JobState
257 print(f" Consumer received: {len(consumer.received_results)} aggregated result(s)")
258 if consumer.received_results:
259 print(f" Total results in aggregation: {consumer.received_results[0]['count']}")
260 print("=" * 70)
261
262
263if __name__ == "__main__":
264 demo_aggregator()
Key Points¶
- Handler is Called for Each Message
The handler is called immediately after each message is received. You check the count inside the handler to decide when to process.
- Append Strategy Behavior
With
merge_strategy="append":First message:
kwargs = {"data": ["data1"], "source_id": ["source1"]}Second message:
kwargs = {"data": ["data1", "data2"], "source_id": ["source1", "source2"]}Third message:
kwargs = {"data": ["data1", "data2", "data3"], "source_id": ["source1", "source2", "source3"]}
- Counting Messages
Use any field that appears in every message to count:
if "source_id" in kwargs and isinstance(kwargs["source_id"], list): count = len(kwargs["source_id"]) elif "data" in kwargs and isinstance(kwargs["data"], list): count = len(kwargs["data"])
- Preventing Duplicate Processing
Use a flag to ensure processing only happens once:
if count >= expected_count and not self.processed: self.processed = True # Process and emit
- Resetting for Next Cycle
After processing, optionally reset the slot data:
self.input_slot._data = {}
Concurrent Execution¶
The aggregation pattern works the same way in concurrent execution mode. However, be aware that:
Handler calls may be interleaved across threads
Use thread-safe operations if handlers share state
The total number of calls remains the same as sequential mode
Example with concurrent execution:
flow = Flow(execution_strategy="concurrent", max_workers=5)
# Same setup as before
# ...
# Execute sources concurrently
flow.execute(id1, entry_params={"data": "data1"})
flow.execute(id2, entry_params={"data": "data2"})
flow.execute(id3, entry_params={"data": "data3"})
# Wait for completion
from routilux.job_state import JobState
JobState.wait_for_completion(flow, job_state, timeout=10.0)
# Aggregator will still process when all 3 messages are received
Best Practices¶
Use Configuration: Store
expected_countin_configfor flexibility:self.set_config(expected_count=expected_count)
Check List Type: Always check if a value is a list before using
len():if "field" in kwargs and isinstance(kwargs["field"], list): count = len(kwargs["field"])
Prevent Duplicate Processing: Use a flag to ensure processing happens only once:
if count >= expected_count and not self.processed: self.processed = True # Process
Reset After Processing: Clear slot data after processing to prepare for the next aggregation cycle:
self.input_slot._data = {}
Handle Edge Cases: Consider what happens if: - Fewer messages arrive than expected (timeout handling) - More messages arrive than expected - Messages arrive out of order
Thread Safety: In concurrent mode, if you need to share state between handlers, use thread-safe operations (locks, atomic operations).
Common Patterns¶
- Timeout Handling
Add timeout logic to process even if not all messages arrive:
import time def __init__(self, expected_count: int = 3, timeout: float = 10.0): # ... self.start_time = None self.set_config(timeout=timeout) def _handle_input(self, **kwargs): if self.start_time is None: self.start_time = time.time() # ... count messages ... timeout = self.get_config("timeout", 10.0) elapsed = time.time() - self.start_time if (count >= expected_count) or (elapsed >= timeout): # Process with available data
- Dynamic Expected Count
Allow the expected count to be set dynamically:
def set_expected_count(self, count: int): self.set_config(expected_count=count) self.expected_count = count
- Multiple Aggregation Cycles
Reset properly to support multiple aggregation cycles:
def _handle_input(self, **kwargs): # ... check count ... if count >= expected_count and not self.processed: self.processed = True # Process self.input_slot._data = {} self.processed = False # Reset for next cycle
See Also¶
Working with Routines - Working with routines
Working with Connections - Connecting routines
Working with Flows - Flow execution and concurrent mode