Slot API¶
Slot class.
Input slot for receiving data from other routines.
- class routilux.slot.Slot(name='', routine=None, handler=None, merge_strategy='override')[source]¶
Bases:
SerializableInput slot for receiving data from other routines.
A Slot represents an input point in a Routine that can receive data from connected Events in other routines. Slots enable many-to-many data reception: a slot can connect to multiple events, and an event can connect to multiple slots. When data arrives, it’s merged with existing data and passed to a handler function.
- Key Concepts:
Slots are defined in routines using define_slot()
Slots connect to events via Flow.connect()
Data is received automatically when connected events are emitted
Data merging follows the configured merge_strategy
Handler functions process the merged data
- Merge Strategy Behavior:
“override” (default): Each new data completely replaces the previous data. The handler receives only the latest data. Best for stateless processing where only the most recent value matters. Example: Latest sensor reading, current configuration
“append”: New data values are appended to lists. If a key doesn’t exist, it’s initialized as an empty list. If the existing value is not a list, it’s converted to a list first. The handler receives the accumulated data each time. Best for aggregation scenarios. Example: Collecting multiple data points, building arrays
Custom function: A callable(old_data, new_data) -> merged_data. Allows custom merge logic like deep merging, averaging, or other domain-specific operations. Provides full control over merge behavior.
- Handler Function:
The handler can accept data in two ways (auto-detected): 1. handler(data) - receives merged data as a dictionary 2.
handler(**data)- receives unpacked keyword argumentsHandler errors are caught and logged to JobState execution history, but don’t stop flow execution (slot handlers are always error-tolerant).
- Important Notes:
The merge_strategy affects both what data is stored in self._data and what data is passed to the handler.
In concurrent execution, merge operations are not atomic. If multiple events send data simultaneously, race conditions may occur.
The handler is called immediately after each receive() with the merged data, not deferred until all data is collected.
Parameter mapping (from Flow.connect()) is applied before merging.
Examples
- Override strategy (default):
>>> slot = routine.define_slot("input", handler=process, merge_strategy="override") >>> # Event emits {"value": 1} -> handler receives {"value": 1} >>> # Event emits {"value": 2} -> handler receives {"value": 2} >>> # slot._data is {"value": 2} (previous data replaced)
- Append strategy:
>>> slot = routine.define_slot("input", handler=aggregate, merge_strategy="append") >>> # Event emits {"value": 1} -> handler receives {"value": [1]} >>> # Event emits {"value": 2} -> handler receives {"value": [1, 2]} >>> # slot._data is {"value": [1, 2]} (values accumulated)
- Custom merge function:
>>> def custom_merge(old, new): ... return {**old, **new, "merged_at": time.time()} >>> slot = routine.define_slot("input", handler=process, merge_strategy=custom_merge) >>> # Custom logic: deep merge with timestamp
- __init__(name='', routine=None, handler=None, merge_strategy='override')[source]¶
Initialize Slot.
- Parameters:
name (str) – Slot name. Used to identify the slot within its parent routine.
routine (Routine | None) – Parent Routine object that owns this slot.
handler (Callable | None) –
Handler function called when data is received. The function signature can be flexible:
If it accepts
**kwargs, all merged data is passed as keyword argumentsIf it accepts a single ‘data’ parameter, the entire merged dict is passed
If it accepts a single parameter with a different name, the matching value from merged data is passed, or the entire dict if no match
If it accepts multiple parameters, matching values are passed as kwargs
If None, no handler is called when data is received.
merge_strategy (str) –
Strategy for merging new data with existing data. Possible values:
”override” (default): New data completely replaces old data. Each receive() call passes only the new data to the handler. Use this when you only need the latest data.
”append”: New values are appended to lists. Existing non-list values are converted to lists first. The handler receives accumulated data each time. Use this for aggregation scenarios.
Callable: A function(old_data: Dict, new_data: Dict) -> Dict that implements custom merge logic. The function should return the merged result. Use this for complex merge requirements like deep merging, averaging, or domain-specific operations.
Note
The merge_strategy determines how data accumulates in self._data and what data is passed to the handler. See the class docstring for detailed examples and behavior descriptions.
- disconnect(event)[source]¶
Disconnect from an event.
- Parameters:
event (Event) – Event object to disconnect from.
- receive(data, job_state=None, flow=None)[source]¶
Receive data, merge with existing data, and call handler.
This method is called automatically when a connected event is emitted. You typically don’t call this directly - it’s invoked by the event emission mechanism. However, you may call it directly for testing or manual data injection.
- Processing Steps:
Merge new data with existing slot data according to merge_strategy
Update slot’s internal _data dictionary with merged result
Call the handler function (if defined) with the merged data
Handler receives data either as dict or unpacked kwargs (auto-detected)
Handler Invocation:
The handler is called immediately after merging. If the handler accepts
**kwargs, data is unpacked; otherwise it’s passed as a dict. Errors in the handler are caught and logged to JobState execution history, but don’t stop flow execution (slot handler errors are always tolerated).- Parameters:
data (dict[str, Any]) – Dictionary of data to receive. This is typically the data emitted by a connected event, possibly transformed by parameter mapping from the Connection. Example: {“result”: “success”, “count”: 42}
Examples
- Manual data injection (for testing):
>>> slot = routine.define_slot("input", handler=process_data) >>> slot.receive({"value": "test", "count": 1}) >>> # Handler is called with merged data
- Automatic reception (normal usage):
>>> # When connected event emits, receive() is called automatically >>> event.emit(flow=my_flow, value="data", count=5) >>> # Slot's receive() is called internally with {"value": "data", "count": 5}
- call_handler(data, propagate_exceptions=False)[source]¶
Call handler with data, optionally propagating exceptions.
This method is used for entry routine trigger slots where exceptions need to propagate to Flow’s error handling logic. It merges data according to merge_strategy and calls the handler with appropriate parameters.
- Parameters:
data (dict[str, Any]) – Data dictionary to pass to handler. This will be merged with existing slot data according to merge_strategy.
propagate_exceptions (bool) – If True, exceptions propagate to caller; if False, they are caught and logged (default: False). For entry routine trigger slots, set to True to allow Flow’s error handling strategies to work.
- Raises:
Exception – If propagate_exceptions is True and handler raises an exception, it will be re-raised.
Examples
- Entry routine trigger slot (exceptions propagate):
>>> trigger_slot.call_handler(entry_params, propagate_exceptions=True)
- Normal slot handler (exceptions caught):
>>> slot.call_handler(data, propagate_exceptions=False)