Event API¶
Event class.
Output events for sending data to other routines.
- class routilux.event.Event(name='', routine=None, output_params=None)[source]¶
Bases:
SerializableOutput event for transmitting data to other routines.
An Event represents an output point in a Routine that can transmit data to connected Slots in other routines. Events enable one-to-many data distribution: when an event is emitted, all connected slots receive the data simultaneously.
- Key Concepts:
Events are defined in routines using define_event()
Events are emitted using emit() or Routine.emit()
Events can connect to multiple slots (broadcast pattern)
Slots can connect to multiple events (aggregation pattern)
Parameter mapping can transform data during transmission
- Connection Model:
Events support many-to-many connections: - One event can connect to many slots (broadcasting) - One slot can connect to many events (aggregation) - Connections are managed via Flow.connect() - Parameter mappings can rename parameters per connection
Examples
- Basic usage:
>>> class MyRoutine(Routine): ... def __init__(self): ... super().__init__() ... self.output = self.define_event("output", ["result"]) ... ... def __call__(self): ... self.emit("output", result="success", status=200)
- Multiple connections:
>>> # One event, multiple receivers >>> flow.connect(source_id, "output", target1_id, "input1") >>> flow.connect(source_id, "output", target2_id, "input2") >>> # Both targets receive data when source emits "output"
- disconnect(slot)[source]¶
Disconnect from a slot.
- Parameters:
slot (Slot) – Slot object to disconnect from.
- emit(flow=None, **kwargs)[source]¶
Emit the event and send data to all connected slots.
This method transmits data to all slots connected to this event using a queue-based mechanism. Tasks are enqueued and executed asynchronously, allowing emit() to return immediately without waiting for downstream execution.
- Execution Mode:
All execution uses a unified queue-based mechanism
Sequential mode: max_workers=1, tasks execute one at a time
Concurrent mode: max_workers>1, tasks execute in parallel
emit() is always non-blocking and returns immediately
- Parameter Mapping:
If a Connection has param_mapping defined (via Flow.connect()), parameter names are transformed before being sent to the slot. Unmapped parameters are passed with their original names.
- Flow Context Auto-Detection:
If flow parameter is None, this method automatically attempts to get the flow from the routine’s context (routine._current_flow). This allows simpler usage: event.emit(data=”value”) instead of event.emit(flow=my_flow, data=”value”).
The flow context is automatically set by Flow.execute() and Flow.resume().
- Parameters:
flow (Flow | None) – Optional Flow object. If None, automatically attempts to get from routine._current_flow (set by Flow.execute()). Required for: - Finding Connection objects to apply parameter mappings - Recording execution history in JobState - Queue-based task execution If no flow is available, falls back to direct slot.receive() call (legacy mode).
**kwargs – Data to transmit. These keyword arguments form the data dictionary sent to connected slots. All values must be serializable if the flow uses persistence. Example: emit(result=”success”, count=42)
Examples
- Basic emission (automatic flow detection):
>>> event = routine.define_event("output", ["result"]) >>> # Inside a routine handler called by Flow.execute(): >>> event.emit(result="data", status="ok") >>> # Automatically uses routine._current_flow
- Explicit flow parameter:
>>> event.emit(flow=my_flow, result="data", status="ok") >>> # Explicitly specify flow (useful for testing or edge cases)
- Without flow (legacy mode):
>>> event.emit(result="data") # Direct call, no queue >>> # Only works if no flow context available