Aggregation Pattern

The aggregation pattern allows a routine to wait for all expected messages from multiple upstream routines before processing and emitting results. This is useful for scenarios like aggregating search results from multiple sources, collecting data from parallel tasks, or combining outputs from different processors.

Overview

When you need to collect data from multiple sources and process it together, you can use the append merge strategy combined with a counter check in the handler. This pattern ensures that:

  1. All incoming messages are accumulated

  2. The handler is called for each message

  3. Processing only occurs when all expected messages are received

  4. Results are emitted once after all data is collected

Key Concepts

Merge Strategy: “append”

With merge_strategy="append", each incoming message’s values are appended to lists. This allows you to accumulate data over multiple receive operations.

Message Counting

In the handler, check the length of any list field to determine how many messages have been received. When the count reaches the expected number, process all accumulated data.

Single Processing

Use a flag to ensure processing only happens once, even if the handler is called multiple times with the same count.

Basic Example

Here’s a simple aggregator that waits for 3 messages:

from routilux import Flow, Routine

class SourceRoutine(Routine):
    def __init__(self, source_id: str):
        super().__init__()
        self.source_id = source_id
        self.output_event = self.define_event("output", ["data", "source_id"])

    def __call__(self, **kwargs):
        super().__call__(**kwargs)
        data = kwargs.get("data", f"data_from_{self.source_id}")
        self.emit("output", data=data, source_id=self.source_id)

class AggregatorRoutine(Routine):
    """Aggregator that waits for all expected messages."""

    def __init__(self, expected_count: int = 3):
        super().__init__()
        self.expected_count = expected_count
        self.set_config(expected_count=expected_count)
        self.processed = False  # Flag to ensure single processing

        # Use append strategy to accumulate data
        self.input_slot = self.define_slot(
            "input",
            handler=self._handle_input,
            merge_strategy="append"  # Key: append strategy
        )
        self.output_event = self.define_event("aggregated", ["all_data", "count"])

    def _handle_input(self, **kwargs):
        """Handle input and check if all messages received."""
        # With append strategy, kwargs contains lists
        # Count messages using any list field
        received_count = 0
        if "source_id" in kwargs and isinstance(kwargs["source_id"], list):
            received_count = len(kwargs["source_id"])
        elif "data" in kwargs and isinstance(kwargs["data"], list):
            received_count = len(kwargs["data"])

        expected_count = self.get_config("expected_count", self.expected_count)

        # Process only when all messages received and not already processed
        if received_count >= expected_count and not self.processed:
            self.processed = True

            # Extract accumulated data
            all_data = []
            if "data" in kwargs and isinstance(kwargs["data"], list):
                all_data = kwargs["data"]

            # Emit aggregated result
            self.emit("aggregated", all_data=all_data, count=len(all_data))

            # Reset for next aggregation (optional)
            self.input_slot._data = {}

# Create flow
flow = Flow(flow_id="aggregator_demo")

# Create sources
source1 = SourceRoutine("source1")
source2 = SourceRoutine("source2")
source3 = SourceRoutine("source3")

# Create aggregator
aggregator = AggregatorRoutine(expected_count=3)

# Add to flow
id1 = flow.add_routine(source1, "source1")
id2 = flow.add_routine(source2, "source2")
id3 = flow.add_routine(source3, "source3")
agg_id = flow.add_routine(aggregator, "aggregator")

# Connect all sources to aggregator
flow.connect(id1, "output", agg_id, "input")
flow.connect(id2, "output", agg_id, "input")
flow.connect(id3, "output", agg_id, "input")

# Execute sources
flow.execute(id1, entry_params={"data": "data1"})
flow.execute(id2, entry_params={"data": "data2"})
flow.execute(id3, entry_params={"data": "data3"})

# Aggregator will process when all 3 messages are received

How It Works

  1. Append Strategy: When merge_strategy="append" is used, each receive() call appends values to lists in slot._data.

  2. Handler Invocation: The handler is called after each receive() with the accumulated data (where values are lists).

  3. Message Counting: Check the length of any list field in kwargs to count received messages.

  4. Conditional Processing: Only process when: - Count reaches expected number - Not already processed (use a flag)

  5. Data Extraction: Extract all accumulated data from the lists and process it together.

  6. Emission: Emit the aggregated result once.

  7. Reset (Optional): Clear slot._data to prepare for the next aggregation cycle.

Complete Example: Search Result Aggregation

Here’s a complete example that aggregates search results from multiple search engines:

  1"""
  2Aggregator Routine Demo
  3
  4Demonstrates how to create a routine that waits for all expected messages
  5before processing and emitting results.
  6"""
  7
  8from routilux import Flow, Routine
  9import time
 10
 11
 12class SearchTask(Routine):
 13    """A search task routine that simulates searching."""
 14
 15    def __init__(self):
 16        super().__init__()
 17        self.set_config(task_name="default")
 18
 19        # Define trigger slot
 20        self.trigger_slot = self.define_slot("trigger", handler=self._handle_trigger)
 21
 22        # Define output event
 23        self.output_event = self.define_event("result", ["query", "results", "task_name"])
 24
 25    def _handle_trigger(self, query: str = None, **kwargs):
 26        """Handle search trigger."""
 27        query = query or kwargs.get("query", "default")
 28
 29        # Simulate search operation
 30        time.sleep(0.1)  # Simulate I/O delay
 31
 32        # Generate mock results
 33        results = [
 34            f"{self.task_name}_result_1",
 35            f"{self.task_name}_result_2",
 36            f"{self.task_name}_result_3",
 37        ]
 38
 39        # Track operation
 40        self._track_operation("searches", success=True, results_count=len(results))
 41
 42        # Emit results
 43        self.emit("result", query=query, results=results, task_name=self.task_name)
 44
 45
 46class ResultAggregator(Routine):
 47    """Aggregator routine that waits for all expected messages before processing."""
 48
 49    def __init__(self):
 50        super().__init__()
 51        self.set_config(expected_count=3)
 52
 53        # Set configuration
 54        self.set_config(timeout=10.0)  # Optional timeout
 55
 56        # Define input slot with append strategy to collect all results
 57        self.input_slot = self.define_slot(
 58            "input",
 59            handler=self._handle_input,
 60            merge_strategy="append",  # Collect all incoming data
 61        )
 62
 63        # Define output event
 64        self.output_event = self.define_event(
 65            "aggregated", ["all_results", "total_count", "queries"]
 66        )
 67
 68    def _handle_input(self, **kwargs):
 69        """Handle input and check if we have all expected messages.
 70
 71        With merge_strategy="append", each receive() call adds to the accumulated data.
 72        The handler receives the merged data (with lists), so we can check the length
 73        of any list field to determine how many messages we've received.
 74
 75        Args:
 76            **kwargs: Merged data from slot. With append strategy, values are lists.
 77                For example: {'task_name': ['task1', 'task2'], 'results': [[...], [...]]}
 78        """
 79        # With merge_strategy="append", kwargs contains accumulated data where
 80        # values are lists. We can check the length of any list to count messages.
 81
 82        # Count how many messages we've received
 83        # Use task_name list length as the count (since each message has a task_name)
 84        received_count = 0
 85        if "task_name" in kwargs and isinstance(kwargs["task_name"], list):
 86            received_count = len(kwargs["task_name"])
 87        elif "results" in kwargs and isinstance(kwargs["results"], list):
 88            # If task_name not available, use results list length
 89            received_count = len(kwargs["results"])
 90        elif "query" in kwargs and isinstance(kwargs["query"], list):
 91            received_count = len(kwargs["query"])
 92        else:
 93            # Fallback: count any list field
 94            for key, value in kwargs.items():
 95                if isinstance(value, list) and value:
 96                    received_count = len(value)
 97                    break
 98
 99        expected_count = self.get_config("expected_count", 3)
100
101        # Get current task_name from kwargs if available
102        current_task = kwargs.get("task_name", "unknown")
103        if isinstance(current_task, list) and current_task:
104            current_task = current_task[-1]  # Get last one
105
106        print(
107            f"Aggregator received message from {current_task}. "
108            f"Total received: {received_count}/{expected_count}"
109        )
110
111        # Check if we've received all expected messages
112        if received_count >= expected_count:
113            print(f"✅ All {expected_count} messages received! Processing aggregated results...")
114
115            # Process all accumulated data (kwargs contains the merged data)
116            self._process_aggregated_results(kwargs)
117
118            # Reset for next aggregation (optional)
119            self.input_slot._data = {}
120        else:
121            print(f"⏳ Waiting for more messages ({received_count}/{expected_count})...")
122
123    def _process_aggregated_results(self, accumulated_data: dict):
124        """Process all aggregated results and emit.
125
126        Args:
127            accumulated_data: Dictionary with accumulated data. With append strategy,
128                values are lists containing all received values.
129        """
130        # Track operation
131        self._track_operation("aggregations", success=True)
132
133        # Extract all results
134        all_results = []
135        queries = []
136        task_names = []
137
138        if "results" in accumulated_data:
139            # results is a list of lists (each search task's results)
140            results_list = accumulated_data["results"]
141            if isinstance(results_list, list):
142                for result_list in results_list:
143                    if isinstance(result_list, list):
144                        all_results.extend(result_list)
145                    else:
146                        all_results.append(result_list)
147
148        if "query" in accumulated_data:
149            query_list = accumulated_data["query"]
150            queries = query_list if isinstance(query_list, list) else [query_list]
151
152        if "task_name" in accumulated_data:
153            task_name_list = accumulated_data["task_name"]
154            task_names = task_name_list if isinstance(task_name_list, list) else [task_name_list]
155
156        print(f"📊 Aggregated {len(all_results)} results from {len(task_names)} search tasks")
157
158        # Emit aggregated result
159        self.emit(
160            "aggregated", all_results=all_results, total_count=len(all_results), queries=queries
161        )
162
163        # Reset for next aggregation (optional)
164        # self.input_slot._data = {}
165
166
167def demo_aggregator():
168    """Demonstrate aggregator routine."""
169    print("=" * 70)
170    print("Aggregator Routine Demo")
171    print("=" * 70)
172
173    # Create flow
174    flow = Flow(flow_id="aggregator_demo")
175
176    # Create search tasks
177    search1 = SearchTask()
178    search1.set_config(task_name="SearchEngine1")
179    search2 = SearchTask()
180    search2.set_config(task_name="SearchEngine2")
181    search3 = SearchTask()
182    search3.set_config(task_name="SearchEngine3")
183
184    # Create aggregator (expects 3 results)
185    aggregator = ResultAggregator()
186    aggregator.set_config(expected_count=3)
187
188    # Add to flow
189    id1 = flow.add_routine(search1, "search1")
190    id2 = flow.add_routine(search2, "search2")
191    id3 = flow.add_routine(search3, "search3")
192    agg_id = flow.add_routine(aggregator, "aggregator")
193
194    # Connect all search tasks to aggregator
195    flow.connect(id1, "result", agg_id, "input")
196    flow.connect(id2, "result", agg_id, "input")
197    flow.connect(id3, "result", agg_id, "input")
198
199    # Create a consumer to receive aggregated results
200    class ResultConsumer(Routine):
201        def __init__(self):
202            super().__init__()
203            self.received_results = []
204            self.input_slot = self.define_slot("input", handler=self._handle_input)
205
206        def _handle_input(self, all_results: list = None, total_count: int = None, **kwargs):
207            self.received_results.append({"results": all_results, "count": total_count})
208            print(f"📦 Consumer received aggregated result: {total_count} total results")
209
210    consumer = ResultConsumer()
211    consumer_id = flow.add_routine(consumer, "consumer")
212    flow.connect(agg_id, "aggregated", consumer_id, "input")
213
214    # Create a multi-source trigger routine that triggers all search tasks
215    class MultiSourceTrigger(Routine):
216        """Trigger routine that emits to multiple search tasks in a single execute()"""
217
218        def __init__(self):
219            super().__init__()
220            self.trigger_slot = self.define_slot("trigger", handler=self._handle_trigger)
221            self.output_event = self.define_event("trigger_search", ["query"])
222
223        def _handle_trigger(self, query: str = None, **kwargs):
224            """Trigger all search tasks in a single execute() call"""
225            query = query or kwargs.get("query", "test query")
226            # Emit to all connected search tasks - they all share the same execution
227            # Flow is automatically detected from routine context
228            self.emit("trigger_search", query=query)
229
230    trigger = MultiSourceTrigger()
231    trigger_id = flow.add_routine(trigger, "trigger")
232
233    # Connect trigger to all search tasks
234    flow.connect(trigger_id, "trigger_search", id1, "trigger")
235    flow.connect(trigger_id, "trigger_search", id2, "trigger")
236    flow.connect(trigger_id, "trigger_search", id3, "trigger")
237
238    print("\nFlow structure:")
239    print("  trigger -> search1 -> aggregator -> consumer")
240    print("  trigger -> search2 -> aggregator")
241    print("  trigger -> search3 -> aggregator")
242    print("\nAggregator expects: 3 messages")
243
244    # Execute once - all search tasks will be triggered in the same execution
245    print("\n🚀 Executing all search tasks (single execute, multiple emits)...")
246    job_state = flow.execute(trigger_id, entry_params={"query": "test query"})
247
248    # Wait for all async tasks to complete
249    from routilux.job_state import JobState
250
251    JobState.wait_for_completion(flow, job_state, timeout=2.0)
252
253    print("\n" + "=" * 70)
254    print("Results:")
255    # Execution state is tracked in JobState, not routine._stats
256    # Note: job_state is not available in this scope, execution state would be in JobState
257    print(f"  Consumer received: {len(consumer.received_results)} aggregated result(s)")
258    if consumer.received_results:
259        print(f"  Total results in aggregation: {consumer.received_results[0]['count']}")
260    print("=" * 70)
261
262
263if __name__ == "__main__":
264    demo_aggregator()

Key Points

Handler is Called for Each Message

The handler is called immediately after each message is received. You check the count inside the handler to decide when to process.

Append Strategy Behavior

With merge_strategy="append":

  • First message: kwargs = {"data": ["data1"], "source_id": ["source1"]}

  • Second message: kwargs = {"data": ["data1", "data2"], "source_id": ["source1", "source2"]}

  • Third message: kwargs = {"data": ["data1", "data2", "data3"], "source_id": ["source1", "source2", "source3"]}

Counting Messages

Use any field that appears in every message to count:

if "source_id" in kwargs and isinstance(kwargs["source_id"], list):
    count = len(kwargs["source_id"])
elif "data" in kwargs and isinstance(kwargs["data"], list):
    count = len(kwargs["data"])
Preventing Duplicate Processing

Use a flag to ensure processing only happens once:

if count >= expected_count and not self.processed:
    self.processed = True
    # Process and emit
Resetting for Next Cycle

After processing, optionally reset the slot data:

self.input_slot._data = {}

Concurrent Execution

The aggregation pattern works the same way in concurrent execution mode. However, be aware that:

  • Handler calls may be interleaved across threads

  • Use thread-safe operations if handlers share state

  • The total number of calls remains the same as sequential mode

Example with concurrent execution:

flow = Flow(execution_strategy="concurrent", max_workers=5)

# Same setup as before
# ...

# Execute sources concurrently
flow.execute(id1, entry_params={"data": "data1"})
flow.execute(id2, entry_params={"data": "data2"})
flow.execute(id3, entry_params={"data": "data3"})

# Wait for completion
from routilux.job_state import JobState
JobState.wait_for_completion(flow, job_state, timeout=10.0)

# Aggregator will still process when all 3 messages are received

Best Practices

  1. Use Configuration: Store expected_count in _config for flexibility:

    self.set_config(expected_count=expected_count)
    
  2. Check List Type: Always check if a value is a list before using len():

    if "field" in kwargs and isinstance(kwargs["field"], list):
        count = len(kwargs["field"])
    
  3. Prevent Duplicate Processing: Use a flag to ensure processing happens only once:

    if count >= expected_count and not self.processed:
        self.processed = True
        # Process
    
  4. Reset After Processing: Clear slot data after processing to prepare for the next aggregation cycle:

    self.input_slot._data = {}
    
  5. Handle Edge Cases: Consider what happens if: - Fewer messages arrive than expected (timeout handling) - More messages arrive than expected - Messages arrive out of order

  6. Thread Safety: In concurrent mode, if you need to share state between handlers, use thread-safe operations (locks, atomic operations).

Common Patterns

Timeout Handling

Add timeout logic to process even if not all messages arrive:

import time

def __init__(self, expected_count: int = 3, timeout: float = 10.0):
    # ...
    self.start_time = None
    self.set_config(timeout=timeout)

def _handle_input(self, **kwargs):
    if self.start_time is None:
        self.start_time = time.time()

    # ... count messages ...

    timeout = self.get_config("timeout", 10.0)
    elapsed = time.time() - self.start_time

    if (count >= expected_count) or (elapsed >= timeout):
        # Process with available data
Dynamic Expected Count

Allow the expected count to be set dynamically:

def set_expected_count(self, count: int):
    self.set_config(expected_count=count)
    self.expected_count = count
Multiple Aggregation Cycles

Reset properly to support multiple aggregation cycles:

def _handle_input(self, **kwargs):
    # ... check count ...
    if count >= expected_count and not self.processed:
        self.processed = True
        # Process
        self.input_slot._data = {}
        self.processed = False  # Reset for next cycle

See Also