Aggregation Pattern

The aggregation pattern allows a routine to wait for all expected messages from multiple upstream routines before processing and emitting results. This is useful for scenarios like aggregating search results from multiple sources, collecting data from parallel tasks, or combining outputs from different processors.

Overview

When you need to collect data from multiple sources and process it together, you can use the append merge strategy combined with a counter check in the handler. This pattern ensures that:

  1. All incoming messages are accumulated

  2. The handler is called for each message

  3. Processing only occurs when all expected messages are received

  4. Results are emitted once after all data is collected

Key Concepts

Merge Strategy: “append”

With merge_strategy="append", each incoming message’s values are appended to lists. This allows you to accumulate data over multiple receive operations.

Message Counting

In the handler, check the length of any list field to determine how many messages have been received. When the count reaches the expected number, process all accumulated data.

Single Processing

Use a flag to ensure processing only happens once, even if the handler is called multiple times with the same count.

Basic Example

Here’s a simple aggregator that waits for 3 messages:

from routilux import Flow, Routine

class SourceRoutine(Routine):
    def __init__(self, source_id: str):
        super().__init__()
        self.source_id = source_id
        self.output_event = self.define_event("output", ["data", "source_id"])

    def __call__(self, **kwargs):
        super().__call__(**kwargs)
        data = kwargs.get("data", f"data_from_{self.source_id}")
        self.emit("output", data=data, source_id=self.source_id)

class AggregatorRoutine(Routine):
    """Aggregator that waits for all expected messages."""

    def __init__(self, expected_count: int = 3):
        super().__init__()
        self.expected_count = expected_count
        self.set_config(expected_count=expected_count)
        self.processed = False  # Flag to ensure single processing

        # Use append strategy to accumulate data
        self.input_slot = self.define_slot(
            "input",
            handler=self._handle_input,
            merge_strategy="append"  # Key: append strategy
        )
        self.output_event = self.define_event("aggregated", ["all_data", "count"])

    def _handle_input(self, **kwargs):
        """Handle input and check if all messages received."""
        # With append strategy, kwargs contains lists
        # Count messages using any list field
        received_count = 0
        if "source_id" in kwargs and isinstance(kwargs["source_id"], list):
            received_count = len(kwargs["source_id"])
        elif "data" in kwargs and isinstance(kwargs["data"], list):
            received_count = len(kwargs["data"])

        expected_count = self.get_config("expected_count", self.expected_count)

        # Process only when all messages received and not already processed
        if received_count >= expected_count and not self.processed:
            self.processed = True

            # Extract accumulated data
            all_data = []
            if "data" in kwargs and isinstance(kwargs["data"], list):
                all_data = kwargs["data"]

            # Emit aggregated result
            self.emit("aggregated", all_data=all_data, count=len(all_data))

            # Reset for next aggregation (optional)
            self.input_slot._data = {}

# Create flow
flow = Flow(flow_id="aggregator_demo")

# Create sources
source1 = SourceRoutine("source1")
source2 = SourceRoutine("source2")
source3 = SourceRoutine("source3")

# Create aggregator
aggregator = AggregatorRoutine(expected_count=3)

# Add to flow
id1 = flow.add_routine(source1, "source1")
id2 = flow.add_routine(source2, "source2")
id3 = flow.add_routine(source3, "source3")
agg_id = flow.add_routine(aggregator, "aggregator")

# Connect all sources to aggregator
flow.connect(id1, "output", agg_id, "input")
flow.connect(id2, "output", agg_id, "input")
flow.connect(id3, "output", agg_id, "input")

# Execute sources
flow.execute(id1, entry_params={"data": "data1"})
flow.execute(id2, entry_params={"data": "data2"})
flow.execute(id3, entry_params={"data": "data3"})

# Aggregator will process when all 3 messages are received

How It Works

  1. Append Strategy: When merge_strategy="append" is used, each receive() call appends values to lists in slot._data.

  2. Handler Invocation: The handler is called after each receive() with the accumulated data (where values are lists).

  3. Message Counting: Check the length of any list field in kwargs to count received messages.

  4. Conditional Processing: Only process when: - Count reaches expected number - Not already processed (use a flag)

  5. Data Extraction: Extract all accumulated data from the lists and process it together.

  6. Emission: Emit the aggregated result once.

  7. Reset (Optional): Clear slot._data to prepare for the next aggregation cycle.

Complete Example: Search Result Aggregation

Here’s a complete example that aggregates search results from multiple search engines:

  1"""
  2Aggregator Routine Demo
  3
  4Demonstrates how to create a routine that waits for all expected messages
  5before processing and emitting results.
  6"""
  7
  8import time
  9
 10from routilux import Flow, Routine
 11
 12
 13class SearchTask(Routine):
 14    """A search task routine that simulates searching."""
 15
 16    def __init__(self):
 17        super().__init__()
 18        self.set_config(task_name="default")
 19
 20        # Define trigger slot
 21        self.trigger_slot = self.define_slot("trigger", handler=self._handle_trigger)
 22
 23        # Define output event
 24        self.output_event = self.define_event("result", ["query", "results", "task_name"])
 25
 26    def _handle_trigger(self, query: str = None, **kwargs):
 27        """Handle search trigger."""
 28        query = query or kwargs.get("query", "default")
 29
 30        # Simulate search operation
 31        time.sleep(0.1)  # Simulate I/O delay
 32
 33        # Generate mock results
 34        results = [
 35            f"{self.task_name}_result_1",
 36            f"{self.task_name}_result_2",
 37            f"{self.task_name}_result_3",
 38        ]
 39
 40        # Track operation
 41        self._track_operation("searches", success=True, results_count=len(results))
 42
 43        # Emit results
 44        self.emit("result", query=query, results=results, task_name=self.task_name)
 45
 46
 47class ResultAggregator(Routine):
 48    """Aggregator routine that waits for all expected messages before processing."""
 49
 50    def __init__(self):
 51        super().__init__()
 52        self.set_config(expected_count=3)
 53
 54        # Set configuration
 55        self.set_config(timeout=10.0)  # Optional timeout
 56
 57        # Define input slot with append strategy to collect all results
 58        self.input_slot = self.define_slot(
 59            "input",
 60            handler=self._handle_input,
 61            merge_strategy="append",  # Collect all incoming data
 62        )
 63
 64        # Define output event
 65        self.output_event = self.define_event(
 66            "aggregated", ["all_results", "total_count", "queries"]
 67        )
 68
 69    def _handle_input(self, **kwargs):
 70        """Handle input and check if we have all expected messages.
 71
 72        With merge_strategy="append", each receive() call adds to the accumulated data.
 73        The handler receives the merged data (with lists), so we can check the length
 74        of any list field to determine how many messages we've received.
 75
 76        Args:
 77            **kwargs: Merged data from slot. With append strategy, values are lists.
 78                For example: {'task_name': ['task1', 'task2'], 'results': [[...], [...]]}
 79        """
 80        # With merge_strategy="append", kwargs contains accumulated data where
 81        # values are lists. We can check the length of any list to count messages.
 82
 83        # Count how many messages we've received
 84        # Use task_name list length as the count (since each message has a task_name)
 85        received_count = 0
 86        if "task_name" in kwargs and isinstance(kwargs["task_name"], list):
 87            received_count = len(kwargs["task_name"])
 88        elif "results" in kwargs and isinstance(kwargs["results"], list):
 89            # If task_name not available, use results list length
 90            received_count = len(kwargs["results"])
 91        elif "query" in kwargs and isinstance(kwargs["query"], list):
 92            received_count = len(kwargs["query"])
 93        else:
 94            # Fallback: count any list field
 95            for key, value in kwargs.items():
 96                if isinstance(value, list) and value:
 97                    received_count = len(value)
 98                    break
 99
100        expected_count = self.get_config("expected_count", 3)
101
102        # Get current task_name from kwargs if available
103        current_task = kwargs.get("task_name", "unknown")
104        if isinstance(current_task, list) and current_task:
105            current_task = current_task[-1]  # Get last one
106
107        print(
108            f"Aggregator received message from {current_task}. "
109            f"Total received: {received_count}/{expected_count}"
110        )
111
112        # Check if we've received all expected messages
113        if received_count >= expected_count:
114            print(f"✅ All {expected_count} messages received! Processing aggregated results...")
115
116            # Process all accumulated data (kwargs contains the merged data)
117            self._process_aggregated_results(kwargs)
118
119            # Reset for next aggregation (optional)
120            self.input_slot._data = {}
121        else:
122            print(f"⏳ Waiting for more messages ({received_count}/{expected_count})...")
123
124    def _process_aggregated_results(self, accumulated_data: dict):
125        """Process all aggregated results and emit.
126
127        Args:
128            accumulated_data: Dictionary with accumulated data. With append strategy,
129                values are lists containing all received values.
130        """
131        # Track operation
132        self._track_operation("aggregations", success=True)
133
134        # Extract all results
135        all_results = []
136        queries = []
137        task_names = []
138
139        if "results" in accumulated_data:
140            # results is a list of lists (each search task's results)
141            results_list = accumulated_data["results"]
142            if isinstance(results_list, list):
143                for result_list in results_list:
144                    if isinstance(result_list, list):
145                        all_results.extend(result_list)
146                    else:
147                        all_results.append(result_list)
148
149        if "query" in accumulated_data:
150            query_list = accumulated_data["query"]
151            queries = query_list if isinstance(query_list, list) else [query_list]
152
153        if "task_name" in accumulated_data:
154            task_name_list = accumulated_data["task_name"]
155            task_names = task_name_list if isinstance(task_name_list, list) else [task_name_list]
156
157        print(f"📊 Aggregated {len(all_results)} results from {len(task_names)} search tasks")
158
159        # Emit aggregated result
160        self.emit(
161            "aggregated", all_results=all_results, total_count=len(all_results), queries=queries
162        )
163
164        # Reset for next aggregation (optional)
165        # self.input_slot._data = {}
166
167
168def demo_aggregator():
169    """Demonstrate aggregator routine."""
170    print("=" * 70)
171    print("Aggregator Routine Demo")
172    print("=" * 70)
173
174    # Create flow
175    flow = Flow(flow_id="aggregator_demo")
176
177    # Create search tasks
178    search1 = SearchTask()
179    search1.set_config(task_name="SearchEngine1")
180    search2 = SearchTask()
181    search2.set_config(task_name="SearchEngine2")
182    search3 = SearchTask()
183    search3.set_config(task_name="SearchEngine3")
184
185    # Create aggregator (expects 3 results)
186    aggregator = ResultAggregator()
187    aggregator.set_config(expected_count=3)
188
189    # Add to flow
190    id1 = flow.add_routine(search1, "search1")
191    id2 = flow.add_routine(search2, "search2")
192    id3 = flow.add_routine(search3, "search3")
193    agg_id = flow.add_routine(aggregator, "aggregator")
194
195    # Connect all search tasks to aggregator
196    flow.connect(id1, "result", agg_id, "input")
197    flow.connect(id2, "result", agg_id, "input")
198    flow.connect(id3, "result", agg_id, "input")
199
200    # Create a consumer to receive aggregated results
201    class ResultConsumer(Routine):
202        def __init__(self):
203            super().__init__()
204            self.received_results = []
205            self.input_slot = self.define_slot("input", handler=self._handle_input)
206
207        def _handle_input(self, all_results: list = None, total_count: int = None, **kwargs):
208            self.received_results.append({"results": all_results, "count": total_count})
209            print(f"📦 Consumer received aggregated result: {total_count} total results")
210
211    consumer = ResultConsumer()
212    consumer_id = flow.add_routine(consumer, "consumer")
213    flow.connect(agg_id, "aggregated", consumer_id, "input")
214
215    # Create a multi-source trigger routine that triggers all search tasks
216    class MultiSourceTrigger(Routine):
217        """Trigger routine that emits to multiple search tasks in a single execute()"""
218
219        def __init__(self):
220            super().__init__()
221            self.trigger_slot = self.define_slot("trigger", handler=self._handle_trigger)
222            self.output_event = self.define_event("trigger_search", ["query"])
223
224        def _handle_trigger(self, query: str = None, **kwargs):
225            """Trigger all search tasks in a single execute() call"""
226            query = query or kwargs.get("query", "test query")
227            # Emit to all connected search tasks - they all share the same execution
228            # Flow is automatically detected from routine context
229            self.emit("trigger_search", query=query)
230
231    trigger = MultiSourceTrigger()
232    trigger_id = flow.add_routine(trigger, "trigger")
233
234    # Connect trigger to all search tasks
235    flow.connect(trigger_id, "trigger_search", id1, "trigger")
236    flow.connect(trigger_id, "trigger_search", id2, "trigger")
237    flow.connect(trigger_id, "trigger_search", id3, "trigger")
238
239    print("\nFlow structure:")
240    print("  trigger -> search1 -> aggregator -> consumer")
241    print("  trigger -> search2 -> aggregator")
242    print("  trigger -> search3 -> aggregator")
243    print("\nAggregator expects: 3 messages")
244
245    # Execute once - all search tasks will be triggered in the same execution
246    print("\n🚀 Executing all search tasks (single execute, multiple emits)...")
247    job_state = flow.execute(trigger_id, entry_params={"query": "test query"})
248
249    # Wait for all async tasks to complete
250    from routilux.job_state import JobState
251
252    JobState.wait_for_completion(flow, job_state, timeout=2.0)
253
254    print("\n" + "=" * 70)
255    print("Results:")
256    # Execution state is tracked in JobState, not routine._stats
257    # Note: job_state is not available in this scope, execution state would be in JobState
258    print(f"  Consumer received: {len(consumer.received_results)} aggregated result(s)")
259    if consumer.received_results:
260        print(f"  Total results in aggregation: {consumer.received_results[0]['count']}")
261    print("=" * 70)
262
263
264if __name__ == "__main__":
265    demo_aggregator()

Key Points

Handler is Called for Each Message

The handler is called immediately after each message is received. You check the count inside the handler to decide when to process.

Append Strategy Behavior

With merge_strategy="append":

  • First message: kwargs = {"data": ["data1"], "source_id": ["source1"]}

  • Second message: kwargs = {"data": ["data1", "data2"], "source_id": ["source1", "source2"]}

  • Third message: kwargs = {"data": ["data1", "data2", "data3"], "source_id": ["source1", "source2", "source3"]}

Counting Messages

Use any field that appears in every message to count:

if "source_id" in kwargs and isinstance(kwargs["source_id"], list):
    count = len(kwargs["source_id"])
elif "data" in kwargs and isinstance(kwargs["data"], list):
    count = len(kwargs["data"])
Preventing Duplicate Processing

Use a flag to ensure processing only happens once:

if count >= expected_count and not self.processed:
    self.processed = True
    # Process and emit
Resetting for Next Cycle

After processing, optionally reset the slot data:

self.input_slot._data = {}

Concurrent Execution

The aggregation pattern works the same way in concurrent execution mode. However, be aware that:

  • Handler calls may be interleaved across threads

  • Use thread-safe operations if handlers share state

  • The total number of calls remains the same as sequential mode

Example with concurrent execution:

flow = Flow(execution_strategy="concurrent", max_workers=5)

# Same setup as before
# ...

# Execute sources concurrently
flow.execute(id1, entry_params={"data": "data1"})
flow.execute(id2, entry_params={"data": "data2"})
flow.execute(id3, entry_params={"data": "data3"})

# Wait for completion
from routilux.job_state import JobState
JobState.wait_for_completion(flow, job_state, timeout=10.0)

# Aggregator will still process when all 3 messages are received

Best Practices

  1. Use Configuration: Store expected_count in _config for flexibility:

    self.set_config(expected_count=expected_count)
    
  2. Check List Type: Always check if a value is a list before using len():

    if "field" in kwargs and isinstance(kwargs["field"], list):
        count = len(kwargs["field"])
    
  3. Prevent Duplicate Processing: Use a flag to ensure processing happens only once:

    if count >= expected_count and not self.processed:
        self.processed = True
        # Process
    
  4. Reset After Processing: Clear slot data after processing to prepare for the next aggregation cycle:

    self.input_slot._data = {}
    
  5. Handle Edge Cases: Consider what happens if: - Fewer messages arrive than expected (timeout handling) - More messages arrive than expected - Messages arrive out of order

  6. Thread Safety: In concurrent mode, if you need to share state between handlers, use thread-safe operations (locks, atomic operations).

Common Patterns

Timeout Handling

Add timeout logic to process even if not all messages arrive:

import time

def __init__(self, expected_count: int = 3, timeout: float = 10.0):
    # ...
    self.start_time = None
    self.set_config(timeout=timeout)

def _handle_input(self, **kwargs):
    if self.start_time is None:
        self.start_time = time.time()

    # ... count messages ...

    timeout = self.get_config("timeout", 10.0)
    elapsed = time.time() - self.start_time

    if (count >= expected_count) or (elapsed >= timeout):
        # Process with available data
Dynamic Expected Count

Allow the expected count to be set dynamically:

def set_expected_count(self, count: int):
    self.set_config(expected_count=count)
    self.expected_count = count
Multiple Aggregation Cycles

Reset properly to support multiple aggregation cycles:

def _handle_input(self, **kwargs):
    # ... check count ...
    if count >= expected_count and not self.processed:
        self.processed = True
        # Process
        self.input_slot._data = {}
        self.processed = False  # Reset for next cycle

See Also