Working with Routines¶
Routines are the core building blocks of routilux. This guide explains how to create and use routines in the new event queue architecture.
Routine Identifier (routine_id)¶
Each routine in a flow has a routine_id that identifies it within the flow. When you add a routine to a flow using flow.add_routine(), you can specify a custom routine_id or use the auto-generated one.
For details on how to use routine_id, see Identifiers: job_id, flow_id, and routine_id.
Creating a Routine¶
To create a routine, inherit from Routine:
from routilux import Routine
class MyRoutine(Routine):
def __init__(self):
super().__init__()
# Define slots and events here
Defining Slots¶
Slots are input mechanisms for routines. Define a slot with a handler function:
def process_input(self, data=None, **kwargs):
# Process the input data
# Handler should accept **kwargs for flexibility
pass
self.input_slot = self.define_slot("input", handler=process_input)
You can also specify a merge strategy for slots that receive data from multiple events:
self.input_slot = self.define_slot(
"input",
handler=process_input,
merge_strategy="append" # or "override", or custom function
)
Merge Strategies:
- "override" (default): New data replaces old data
- "append": Values are accumulated in lists
- Custom function: callable(old_data, new_data) -> merged_data
Defining Events¶
Events are output mechanisms for routines. Define an event with output parameters:
self.output_event = self.define_event("output", ["result", "status"])
Emitting Events¶
Emit events to trigger connected slots. The flow context is automatically detected:
def _handle_trigger(self, **kwargs):
# Flow is automatically detected from routine context
# No need to pass flow parameter!
self.emit("output", result="success", status="completed")
Automatic Flow Detection:
When called within a Flow execution context,
emit()automatically retrieves the flow fromroutine._current_flowThe flow context is set by
Flow.execute()andFlow.resume()You don’t need to manually pass the flow parameter
Explicit Flow Parameter (optional):
You can still explicitly pass flow for backward compatibility:
flow = getattr(self, "_current_flow", None) self.emit("output", flow=flow, result="success")
Non-blocking Behavior:
- emit() returns immediately after enqueuing tasks
- Downstream execution happens asynchronously
- Don’t expect handlers to complete before emit() returns
Entry Routines¶
Routines used as entry points in a Flow must define a “trigger” slot:
class MyEntryRoutine(Routine):
def __init__(self):
super().__init__()
# Define trigger slot for entry routine
self.trigger_slot = self.define_slot("trigger", handler=self._handle_trigger)
self.output_event = self.define_event("output", ["result"])
def _handle_trigger(self, **kwargs):
# This will be called by Flow.execute()
data = kwargs.get("data", "default")
# Flow context is automatically set
self.emit("output", result=f"Processed: {data}")
The Flow.execute() method will automatically call the “trigger” slot with the provided
entry_params. See Working with Flows for more details on executing flows.
Slot Handlers¶
Handler Signature
Slot handlers should accept **kwargs for flexibility:
def _handle_input(self, data=None, **kwargs):
# Accept data parameter and any other kwargs
# This works with various data formats
pass
Why ``**kwargs``?
- Handlers receive data from events, which may have different parameter names
- Parameter mapping (via Flow.connect()) may transform parameter names
- **kwargs ensures handlers work with any data format
Data Extraction Helper
Use _extract_input_data() to simplify data extraction:
def process_input(self, data=None, **kwargs):
# Extract data using the helper method
extracted_data = self._extract_input_data(data, **kwargs)
# Process the extracted data
result = self.process(extracted_data)
self.emit("output", result=result)
This method handles various input patterns:
- Direct parameter: _extract_input_data("text") → "text"
- ‘data’ key: _extract_input_data(None, data="text") → "text"
- Single value: _extract_input_data(None, text="value") → "value"
- Multiple values: _extract_input_data(None, a=1, b=2) → {"a": 1, "b": 2}
Multiple Slots¶
A routine can have multiple slots, each connected to different upstream routines. When an upstream routine emits data, it triggers the handler of the connected slot.
Important: Each slot has its own handler and is triggered independently.
Example:
class TargetRoutine(Routine):
def __init__(self):
super().__init__()
# Define three slots, each with its own handler
self.slot1 = self.define_slot("input1", handler=self._handle_input1)
self.slot2 = self.define_slot("input2", handler=self._handle_input2)
self.slot3 = self.define_slot("input3", handler=self._handle_input3)
def _handle_input1(self, data1=None, **kwargs):
# This handler is called when slot1 receives data
pass
def _handle_input2(self, data2=None, **kwargs):
# This handler is called when slot2 receives data
pass
def _handle_input3(self, data3=None, **kwargs):
# This handler is called when slot3 receives data
pass
If three upstream routines each emit once: - Source1 emits → slot1 receives → handler1 is called - Source2 emits → slot2 receives → handler2 is called - Source3 emits → slot3 receives → handler3 is called
Result: The target routine’s handlers are called 3 times (once per slot).
Slot Independence:
- Each slot maintains its own _data state
- Each slot’s merge_strategy applies independently
- Each slot’s handler is called when data is received
- Each emission triggers the handler once
Execution Behavior¶
Event Queue Processing
When a routine emits an event:
1. Tasks are created for each connected slot
2. Tasks are enqueued (non-blocking)
3. emit() returns immediately
4. Event loop processes tasks asynchronously
Handler Execution
Slot handlers execute in the event loop’s thread pool:
- Sequential mode: One handler at a time (max_workers=1)
- Concurrent mode: Multiple handlers in parallel (max_workers>1)
Fair Scheduling
Tasks are processed in queue order: - Multiple message chains progress alternately - Long chains don’t block shorter ones - Fair progress for all active chains
Understanding Routine’s Role¶
Routine is responsible for function implementation:
What each node does: Define slots, events, and handler logic
Static Configuration: Store in
_configdictionary (set viaset_config())No Runtime State: Routines must not modify instance variables during execution
What Routine Does NOT Do:
❌ Store runtime execution state (that’s
JobState’s job)❌ Store business data (that’s
JobState.shared_data’s job)❌ Define workflow structure (that’s
Flow’s job)❌ Handle execution-specific output (that’s
JobState.output_handler’s job)
Critical Constraint: The same routine object can be used by multiple concurrent executions.
Modifying instance variables would cause data corruption. All execution-specific state
must be stored in JobState.
Execution State Management¶
Use get_execution_context() for convenient access to execution handles:
def process_data(self, data):
# Get execution context (flow, job_state, routine_id)
ctx = self.get_execution_context()
if ctx:
# Store execution-specific state in JobState
current_state = ctx.job_state.get_routine_state(ctx.routine_id) or {}
processed_count = current_state.get("processed_count", 0) + 1
ctx.job_state.update_routine_state(
ctx.routine_id, {"processed_count": processed_count}
)
# Store business data in JobState
ctx.job_state.update_shared_data("last_processed", data)
ctx.job_state.append_to_shared_log({"action": "process", "data": data})
# Send output via JobState
self.send_output("user_data", message="Processing", value=data)
Using get_execution_context():
The get_execution_context() method returns an ExecutionContext object containing
flow, job_state, and routine_id:
ctx = self.get_execution_context()
if ctx:
# Access flow, job_state, and routine_id
ctx.flow
ctx.job_state
ctx.routine_id
# Or unpack
flow, job_state, routine_id = ctx
Retrieve execution state after execution:
# After execution
routine_state = job_state.get_routine_state(routine_id)
print(routine_state) # {"processed_count": 1, ...}
# Access shared data
shared_data = job_state.get_shared_data("last_processed")
shared_log = job_state.get_shared_log()
Getting Slots and Events¶
Retrieve slots and events by name:
slot = routine.get_slot("input")
event = routine.get_event("output")
Error Handling¶
Routines can handle exceptions in different ways depending on the error handling strategy configured. This section explains how routine exceptions affect routine state and JobState status.
Setting Error Handlers¶
Set error handlers at the routine level:
from routilux import ErrorHandler, ErrorStrategy
routine.set_error_handler(
ErrorHandler(strategy=ErrorStrategy.RETRY, max_retries=3)
)
Error handling priority: 1. Routine-level error handler (if set) 2. Flow-level error handler (if set) 3. Default behavior (STOP)
Routine Exception Types¶
Routines can encounter exceptions in two different contexts:
Entry Routine Execution Errors: Errors raised in an entry routine’s trigger slot handler (when called by
Flow.execute()). These errors propagate to Flow’s error handling mechanisms and trigger error handling strategies (STOP, CONTINUE, RETRY, SKIP).Slot Handler Errors: Errors raised in slot handler functions when processing received data from upstream routines. These errors are always caught and logged to
JobState.execution_history, but they don’t propagate tohandle_task_errorunless the routine has a STOP strategy error handler.
Routine State After Exceptions¶
When a routine encounters an exception, its state in job_state.routine_states[routine_id]
is updated based on the error handling strategy:
Entry Routine Errors:
Strategy |
Routine Status |
JobState Status |
|---|---|---|
STOP |
failed |
failed |
CONTINUE |
error_continued |
completed |
RETRY (succeeds) |
completed |
completed |
RETRY (all fail) |
failed |
failed |
SKIP |
skipped |
completed |
Slot Handler Errors:
Error Handler |
Routine Status |
JobState Status |
|---|---|---|
None |
(not set) |
completed |
STOP |
failed |
failed* |
CONTINUE/RETRY/ SKIP |
(not set) |
completed |
- * JobState status becomes
"failed"only afterwait_for_completion()detects the routine’s
"failed"status.
Key Points:
Entry routine errors always trigger error handling strategies
Slot handler errors are always caught and logged, but only STOP strategy causes routine state to be marked as
"failed"wait_for_completion()checks routine states (not just execution history) to determine final JobState statusOnly routine state
"failed"or"error"causes JobState to be marked as"failed"Routine state
"error_continued"does NOT cause JobState to be marked as"failed"
Example: Routine Exception Handling¶
from routilux import Flow, Routine, ErrorHandler, ErrorStrategy
from routilux.job_state import JobState
class DataProcessor(Routine):
def __init__(self):
super().__init__()
self.input_slot = self.define_slot("input", handler=self.process)
self.output_event = self.define_event("output", ["result"])
def process(self, data=None, **kwargs):
if data is None:
raise ValueError("Data is required")
self.emit("output", result=data * 2)
flow = Flow()
processor = DataProcessor()
# Set STOP strategy - slot handler errors will mark routine as failed
processor.set_error_handler(ErrorHandler(strategy=ErrorStrategy.STOP))
processor_id = flow.add_routine(processor, "processor")
# Trigger with invalid data
job_state = flow.execute(processor_id, entry_params={"data": None})
# Wait for completion to detect the failure
JobState.wait_for_completion(flow, job_state, timeout=5.0)
# Check routine state
routine_state = job_state.get_routine_state("processor")
assert routine_state["status"] == "failed"
assert job_state.status == "failed"
See Error Handling and Exception Management for comprehensive details on error handling strategies, status transitions, and error detection mechanisms.
Configuration¶
Store configuration in _config dictionary:
routine.set_config(timeout=30, retries=3)
timeout = routine.get_config("timeout", default=10)
All configuration values are automatically serialized.
Best Practices¶
Always use ``**kwargs`` in handlers:
def _handle_input(self, data=None, **kwargs): # Flexible handler signature
Define trigger slot for entry routines:
self.trigger_slot = self.define_slot("trigger", handler=self._handle_trigger)
Don’t rely on emit() waiting:
self.emit("output", data="value") # Handler may not have executed yet! # Use wait_for_completion() if needed
Use merge_strategy=”append” for aggregation:
self.input_slot = self.define_slot("input", merge_strategy="append")
Track operations consistently:
self._track_operation("processing", success=True, items=10)