Event API

Event class.

Output events for sending data to other routines.

class routilux.event.Event(name='', routine=None, output_params=None)[source]

Bases: Serializable

Output event for transmitting data to other routines.

An Event represents an output point in a Routine that can transmit data to connected Slots in other routines. Events enable one-to-many data distribution: when an event is emitted, all connected slots receive the data simultaneously.

Key Concepts:
  • Events are defined in routines using define_event()

  • Events are emitted using emit() or Routine.emit()

  • Events can connect to multiple slots (broadcast pattern)

  • Slots can connect to multiple events (aggregation pattern)

  • Parameter mapping can transform data during transmission

Connection Model:

Events support many-to-many connections: - One event can connect to many slots (broadcasting) - One slot can connect to many events (aggregation) - Connections are managed via Flow.connect() - Parameter mappings can rename parameters per connection

Examples

Basic usage:
>>> class MyRoutine(Routine):
...     def __init__(self):
...         super().__init__()
...         self.output = self.define_event("output", ["result"])
...
...     def __call__(self):
...         self.emit("output", result="success", status=200)
Multiple connections:
>>> # One event, multiple receivers
>>> flow.connect(source_id, "output", target1_id, "input1")
>>> flow.connect(source_id, "output", target2_id, "input2")
>>> # Both targets receive data when source emits "output"
__init__(name='', routine=None, output_params=None)[source]

Initialize an Event.

Parameters:
  • name (str) – Event name.

  • routine (Routine | None) – Parent Routine object.

  • output_params (list[str] | None) – List of output parameter names (for documentation).

serialize()[source]

Serialize the Event.

Returns:

Serialized dictionary containing event data.

Return type:

dict[str, Any]

deserialize(data, registry=None)[source]

Deserialize the Event.

Parameters:
  • data (dict[str, Any]) – Serialized data dictionary.

  • registry (Any | None) – Optional ObjectRegistry for deserializing callables.

__repr__()[source]

Return string representation of the Event.

connect(slot, param_mapping=None)[source]

Connect to a slot.

Parameters:
  • slot (Slot) – Slot object to connect to.

  • param_mapping (dict[str, str] | None) – Parameter mapping dictionary (managed by Connection, this method only establishes the connection).

disconnect(slot)[source]

Disconnect from a slot.

Parameters:

slot (Slot) – Slot object to disconnect from.

emit(flow=None, **kwargs)[source]

Emit the event and send data to all connected slots.

This method transmits data to all slots connected to this event using a queue-based mechanism. Tasks are enqueued and executed asynchronously, allowing emit() to return immediately without waiting for downstream execution.

Execution Mode:
  • All execution uses a unified queue-based mechanism

  • Sequential mode: max_workers=1, tasks execute one at a time

  • Concurrent mode: max_workers>1, tasks execute in parallel

  • emit() is always non-blocking and returns immediately

Parameter Mapping:

If a Connection has param_mapping defined (via Flow.connect()), parameter names are transformed before being sent to the slot. Unmapped parameters are passed with their original names.

Flow Context Auto-Detection:

If flow parameter is None, this method automatically attempts to get the flow from the routine’s context (routine._current_flow). This allows simpler usage: event.emit(data=”value”) instead of event.emit(flow=my_flow, data=”value”).

The flow context is automatically set by Flow.execute() and Flow.resume().

Parameters:
  • flow (Flow | None) – Optional Flow object. If None, automatically attempts to get from routine._current_flow (set by Flow.execute()). Required for: - Finding Connection objects to apply parameter mappings - Recording execution history in JobState - Queue-based task execution If no flow is available, falls back to direct slot.receive() call (legacy mode).

  • **kwargs – Data to transmit. These keyword arguments form the data dictionary sent to connected slots. All values must be serializable if the flow uses persistence. Example: emit(result=”success”, count=42)

Examples

Basic emission (automatic flow detection):
>>> event = routine.define_event("output", ["result"])
>>> # Inside a routine handler called by Flow.execute():
>>> event.emit(result="data", status="ok")
>>> # Automatically uses routine._current_flow
Explicit flow parameter:
>>> event.emit(flow=my_flow, result="data", status="ok")
>>> # Explicitly specify flow (useful for testing or edge cases)
Without flow (legacy mode):
>>> event.emit(result="data")  # Direct call, no queue
>>> # Only works if no flow context available