State Management¶
In this tutorial, you’ll learn how to track execution state and monitor your workflows using Routilux’s built-in state management features.
Learning Objectives¶
By the end of this tutorial, you’ll be able to:
Track execution state using JobState
Store routine-specific state in JobState
Access execution history
Monitor workflow performance
Use shared data areas for execution-wide state
Step 1: Understanding JobState¶
JobState is the central mechanism for tracking execution state. Each execution creates a unique JobState that tracks:
Execution status (pending, running, completed, failed, etc.)
Execution history (all routine executions)
Routine states (per-routine execution information)
Shared data (execution-wide data storage)
1from routilux import Flow, Routine
2
3class SimpleProcessor(Routine):
4 def __init__(self):
5 super().__init__()
6 self.input_slot = self.define_slot("input", handler=self.process)
7 self.output_event = self.define_event("output", ["result"])
8
9 def process(self, data=None, **kwargs):
10 data_value = data or kwargs.get("data", "")
11 result = f"Processed: {data_value}"
12
13 # Store execution state in JobState using get_execution_context()
14 ctx = self.get_execution_context()
15 if ctx:
16 ctx.job_state.update_routine_state(
17 ctx.routine_id, {"last_processed": data_value, "processed": True}
18 )
19
20 self.emit("output", result=result)
21
22class DataSource(Routine):
23 def __init__(self):
24 super().__init__()
25 self.trigger_slot = self.define_slot("trigger", handler=self.send)
26 self.output_event = self.define_event("output", ["data"])
27
28 def send(self, **kwargs):
29 self.emit("output", data="test")
30
31flow = Flow(flow_id="jobstate_flow")
32
33source = DataSource()
34processor = SimpleProcessor()
35
36source_id = flow.add_routine(source, "source")
37processor_id = flow.add_routine(processor, "processor")
38
39flow.connect(source_id, "output", processor_id, "input")
40
41job_state = flow.execute(source_id)
42
43# Access JobState information
44print(f"Status: {job_state.status}")
45print(f"Flow ID: {job_state.flow_id}")
46print(f"Created at: {job_state.created_at}")
47
48# Get execution history
49history = job_state.get_execution_history()
50print(f"Execution records: {len(history)}")
51
52# Check routine states
53processor_state = job_state.get_routine_state(processor_id)
54print(f"Processor state: {processor_state}")
Expected Output:
Status: completed
Flow ID: jobstate_flow
Created at: 2024-01-01 12:00:00
Execution records: 2
Processor state: {'last_processed': 'test', 'processed': True}
Key Points:
JobState is created for each execution
Execution state should be stored in JobState, not routine instance variables
update_routine_state()stores routine-specific stateget_routine_state()retrieves routine stateExecution history records all routine executions
Step 2: Storing Routine State¶
Routines should store execution-specific state in JobState:
1from routilux import Flow, Routine
2
3class StatefulProcessor(Routine):
4 def __init__(self):
5 super().__init__()
6 self.input_slot = self.define_slot("input", handler=self.process)
7 self.output_event = self.define_event("output", ["result"])
8
9 def process(self, data=None, **kwargs):
10 data_value = data or kwargs.get("data", "")
11
12 # Get execution context
13 ctx = self.get_execution_context()
14 if not ctx:
15 return
16
17 # Get current state
18 current_state = ctx.job_state.get_routine_state(ctx.routine_id) or {}
19 processed_count = current_state.get("processed_count", 0) + 1
20
21 try:
22 # Process data
23 result = data_value.upper()
24
25 # Update state in JobState
26 ctx.job_state.update_routine_state(
27 ctx.routine_id,
28 {
29 "processed_count": processed_count,
30 "last_processed": data_value,
31 "last_result": result,
32 "status": "success",
33 },
34 )
35
36 self.emit("output", result=result)
37
38 except Exception as e:
39 # Update error state in JobState
40 error_count = current_state.get("error_count", 0) + 1
41 ctx.job_state.update_routine_state(
42 ctx.routine_id,
43 {
44 "processed_count": processed_count,
45 "error_count": error_count,
46 "last_error": str(e),
47 "status": "error",
48 },
49 )
50 raise
51
52class DataSource(Routine):
53 def __init__(self):
54 super().__init__()
55 self.trigger_slot = self.define_slot("trigger", handler=self.send)
56 self.output_event = self.define_event("output", ["data"])
57
58 def send(self, **kwargs):
59 for item in ["hello", "world", "routilux"]:
60 self.emit("output", data=item)
61
62flow = Flow(flow_id="stateful_flow")
63
64source = DataSource()
65processor = StatefulProcessor()
66
67source_id = flow.add_routine(source, "source")
68processor_id = flow.add_routine(processor, "processor")
69
70flow.connect(source_id, "output", processor_id, "input")
71
72from routilux.job_state import JobState
73job_state = flow.execute(source_id)
74JobState.wait_for_completion(flow, job_state, timeout=2.0)
75
76# Check routine state
77processor_state = job_state.get_routine_state(processor_id)
78print(f"Processed: {processor_state.get('processed_count', 0)}")
79print(f"Errors: {processor_state.get('error_count', 0)}")
80print(f"Last processed: {processor_state.get('last_processed', 'none')}")
Expected Output:
Processed: 3
Errors: 0
Last processed: routilux
Key Points:
Always get flow and job_state at the start of handler
Use
get_routine_state()to get current stateUse
update_routine_state()to update stateState is execution-specific and isolated per execution
Step 4: Accessing Execution History¶
Execution history records all routine executions:
1from routilux import Flow, Routine
2
3class SimpleProcessor(Routine):
4 def __init__(self):
5 super().__init__()
6 self.input_slot = self.define_slot("input", handler=self.process)
7 self.output_event = self.define_event("output", ["result"])
8
9 def process(self, data=None, **kwargs):
10 data_value = data or kwargs.get("data", "")
11 result = f"Processed: {data_value}"
12 self.emit("output", result=result)
13
14class DataSource(Routine):
15 def __init__(self):
16 super().__init__()
17 self.trigger_slot = self.define_slot("trigger", handler=self.send)
18 self.output_event = self.define_event("output", ["data"])
19
20 def send(self, **kwargs):
21 for item in ["a", "b", "c"]:
22 self.emit("output", data=item)
23
24flow = Flow(flow_id="history_flow")
25
26source = DataSource()
27processor = SimpleProcessor()
28
29source_id = flow.add_routine(source, "source")
30processor_id = flow.add_routine(processor, "processor")
31
32flow.connect(source_id, "output", processor_id, "input")
33
34from routilux.job_state import JobState
35job_state = flow.execute(source_id)
36JobState.wait_for_completion(flow, job_state, timeout=2.0)
37
38# Get all execution history
39history = job_state.get_execution_history()
40print(f"Total records: {len(history)}")
41
42# Get history for specific routine
43processor_history = job_state.get_execution_history(processor_id)
44print(f"Processor executions: {len(processor_history)}")
45
46# Print history
47for record in history:
48 print(f" {record.routine_id}: {record.event_name} at {record.timestamp}")
Expected Output:
Total records: 4
Processor executions: 3
source: output at 2024-01-01 12:00:00
processor: output at 2024-01-01 12:00:00.001
source: output at 2024-01-01 12:00:00.002
processor: output at 2024-01-01 12:00:00.003
Key Points:
get_execution_history()returns all execution recordsget_execution_history(routine_id)filters by routineEach record contains routine_id, event_name, data, and timestamp
History is automatically recorded for all event emissions
Step 5: Complete Example - Stateful Workflow¶
Here’s a complete example combining state management features:
1from routilux import Flow, Routine
2
3class DataSource(Routine):
4 def __init__(self):
5 super().__init__()
6 self.trigger_slot = self.define_slot("trigger", handler=self.generate)
7 self.output_event = self.define_event("output", ["data"])
8
9 def generate(self, count=3, **kwargs):
10 count = count or kwargs.get("count", 3)
11
12 # Use get_execution_context() for convenient access
13 ctx = self.get_execution_context()
14 if ctx:
15 ctx.job_state.update_routine_state(ctx.routine_id, {"generated_count": count})
16
17 for i in range(count):
18 self.emit("output", data=f"item_{i}")
19
20class Validator(Routine):
21 def __init__(self):
22 super().__init__()
23 self.input_slot = self.define_slot("input", handler=self.validate)
24 self.output_event = self.define_event("output", ["data", "valid"])
25
26 def validate(self, data=None, **kwargs):
27 data_value = data or kwargs.get("data", "")
28
29 # Use get_execution_context() for convenient access
30 ctx = self.get_execution_context()
31 if not ctx:
32 return
33
34 current_state = ctx.job_state.get_routine_state(ctx.routine_id) or {}
35
36 # Simple validation: must contain "item"
37 is_valid = "item" in str(data_value)
38
39 if is_valid:
40 valid_count = current_state.get("valid_count", 0) + 1
41 ctx.job_state.update_routine_state(
42 ctx.routine_id, {"valid_count": valid_count, "last_valid": data_value}
43 )
44 else:
45 invalid_count = current_state.get("invalid_count", 0) + 1
46 ctx.job_state.update_routine_state(
47 ctx.routine_id, {"invalid_count": invalid_count, "last_invalid": data_value}
48 )
49
50 self.emit("output", data=data_value, valid=is_valid)
51
52class Aggregator(Routine):
53 def __init__(self):
54 super().__init__()
55 self.input_slot = self.define_slot("input", handler=self.aggregate)
56
57 def aggregate(self, data=None, valid=None, **kwargs):
58 data_value = data or kwargs.get("data", "")
59 is_valid = valid if valid is not None else kwargs.get("valid", False)
60
61 # Use get_execution_context() for convenient access
62 ctx = self.get_execution_context()
63 if ctx:
64 current_state = ctx.job_state.get_routine_state(ctx.routine_id) or {}
65
66 total_received = current_state.get("total_received", 0) + 1
67 total_valid = current_state.get("total_valid", 0)
68 if is_valid:
69 total_valid += 1
70
71 ctx.job_state.update_routine_state(
72 ctx.routine_id, {"total_received": total_received, "total_valid": total_valid}
73 )
74
75 print(f"Aggregated: {data_value} (valid={is_valid})")
76
77def main():
78 flow = Flow(flow_id="stateful_workflow")
79
80 source = DataSource()
81 validator = Validator()
82 aggregator = Aggregator()
83
84 source_id = flow.add_routine(source, "source")
85 validator_id = flow.add_routine(validator, "validator")
86 agg_id = flow.add_routine(aggregator, "aggregator")
87
88 flow.connect(source_id, "output", validator_id, "input")
89 flow.connect(validator_id, "output", agg_id, "input")
90
91 from routilux.job_state import JobState
92 job_state = flow.execute(source_id, entry_params={"count": 5})
93 JobState.wait_for_completion(flow, job_state, timeout=2.0)
94
95 print("\n=== Execution State ===")
96 source_state = job_state.get_routine_state(source_id)
97 validator_state = job_state.get_routine_state(validator_id)
98 aggregator_state = job_state.get_routine_state(agg_id)
99
100 print(f"Source generated: {source_state.get('generated_count', 0)}")
101 print(
102 f"Validator - Valid: {validator_state.get('valid_count', 0)}, "
103 f"Invalid: {validator_state.get('invalid_count', 0)}"
104 )
105 print(
106 f"Aggregator - Total: {aggregator_state.get('total_received', 0)}, "
107 f"Valid: {aggregator_state.get('total_valid', 0)}"
108 )
109 print(f"\nExecution status: {job_state.status}")
110
111if __name__ == "__main__":
112 main()
Expected Output:
Aggregated: item_0 (valid=True)
Aggregated: item_1 (valid=True)
Aggregated: item_2 (valid=True)
Aggregated: item_3 (valid=True)
Aggregated: item_4 (valid=True)
=== Execution State ===
Source generated: 5
Validator - Valid: 5, Invalid: 0
Aggregator - Total: 5, Valid: 5
Execution status: completed
Key Points:
Store all execution state in JobState
Use
update_routine_state()to update stateUse
get_routine_state()to retrieve stateState is execution-specific and isolated per execution
Common Pitfalls¶
Pitfall 1: Modifying routine instance variables during execution
def process(self, **kwargs):
# ❌ Don't do this - breaks execution isolation
self.counter += 1
self.data.append(kwargs)
Solution: Store execution state in JobState:
def process(self, **kwargs):
# Use get_execution_context() for convenient access
ctx = self.get_execution_context()
if ctx:
current_state = ctx.job_state.get_routine_state(ctx.routine_id) or {}
counter = current_state.get("counter", 0) + 1
ctx.job_state.update_routine_state(ctx.routine_id, {"counter": counter})
Pitfall 2: Not checking for flow and job_state
def process(self, **kwargs):
job_state = getattr(self, "_current_job_state", None) # Set by slot.receive()
Solution: Use get_execution_context() which handles all checks:
def process(self, **kwargs):
ctx = self.get_execution_context()
if ctx:
# Safe to use ctx.job_state here
ctx.job_state.update_routine_state(ctx.routine_id, {"processed": True})
Pitfall 3: Using wrong routine_id
def process(self, **kwargs):
# ❌ Wrong - self._id is memory address, not routine_id in flow
job_state.update_routine_state(self._id, {"processed": True})
Solution: Use get_execution_context() which provides the correct routine_id:
def process(self, **kwargs):
ctx = self.get_execution_context()
if ctx:
# ctx.routine_id is the correct routine_id from flow
ctx.job_state.update_routine_state(ctx.routine_id, {"processed": True})
Best Practices¶
Always store execution state in JobState: Never modify routine instance variables during execution
Use get_execution_context(): Use
get_execution_context()for convenient access to flow, job_state, and routine_idUse send_output() for output: Use
send_output()to send execution-specific output (not events)Use emit_deferred_event() for pause/resume: Use
emit_deferred_event()to emit events that will be processed on resumeUse get_routine_state() first: Get current state before updating to avoid overwriting
Use shared data for execution-wide state: Use
update_shared_data()andappend_to_shared_log()Access execution history: Use
get_execution_history()to track all executions
Next Steps¶
Now that you understand state management, let’s move on to Error Handling to learn how to build resilient workflows that handle errors gracefully.