State Management Example¶
Examples demonstrating JobState and ExecutionTracker usage.
Example Code¶
1#!/usr/bin/env python
2"""
3State Management Example: Demonstrating JobState and ExecutionTracker
4
5This example demonstrates:
6- JobState for tracking execution state
7- ExecutionTracker for performance monitoring
8- State persistence and recovery
9"""
10
11import json
12
13from routilux import Flow, Routine
14
15
16class ProcessingRoutine(Routine):
17 """A routine that processes data"""
18
19 def __init__(self, is_entry: bool = False):
20 super().__init__()
21 # Entry routine needs trigger slot
22 if is_entry:
23 self.trigger_slot = self.define_slot("trigger", handler=self._handle_trigger)
24 self.input_slot = self.define_slot("input", handler=self.process)
25 self.output_event = self.define_event("output", ["result"])
26
27 def _handle_trigger(self, data=None, **kwargs):
28 """Handle trigger for entry routine"""
29 data = data or kwargs.get("data", "default")
30 self.process(data)
31
32 def process(self, data):
33 """Process the data"""
34 if isinstance(data, dict):
35 data_value = data.get("data", data)
36 else:
37 data_value = data
38
39 result = f"Processed: {data_value}"
40 # Execution state should be stored in JobState, not routine._stats
41 self.emit("output", result=result)
42
43
44def main():
45 """Main function"""
46 # Create a flow
47 flow = Flow(flow_id="state_management_example")
48
49 # Create routine instances
50 # processor1 is the entry routine, so it needs trigger slot
51 processor1 = ProcessingRoutine(is_entry=True)
52 processor2 = ProcessingRoutine()
53
54 # Add routines to the flow
55 id1 = flow.add_routine(processor1, "processor1")
56 id2 = flow.add_routine(processor2, "processor2")
57
58 # Connect routines
59 flow.connect(id1, "output", id2, "input")
60
61 # Execute the flow
62 print("Executing flow...")
63 job_state = flow.execute(id1, entry_params={"data": "test"})
64
65 # Check JobState
66 print("\nJobState Information:")
67 print(f" Status: {job_state.status}")
68 print(f" Flow ID: {job_state.flow_id}")
69 print(f" Job ID: {job_state.job_id}")
70 print(f" Execution History Count: {len(job_state.execution_history)}")
71
72 # Check routine states
73 print("\nRoutine States:")
74 for routine_id in job_state.routine_states:
75 state = job_state.get_routine_state(routine_id)
76 print(f" {routine_id}: {state}")
77
78 # Check ExecutionTracker
79 if flow.execution_tracker:
80 print("\nExecutionTracker Information:")
81 print(f" Flow ID: {flow.execution_tracker.flow_id}")
82 print(f" Routines Executed: {len(flow.execution_tracker.routine_executions)}")
83 print(f" Events Recorded: {len(flow.execution_tracker.event_flow)}")
84
85 # Get performance metrics
86 for routine_id in flow.execution_tracker.routine_executions:
87 perf = flow.execution_tracker.get_routine_performance(routine_id)
88 if perf:
89 print(f"\n Performance for {routine_id}:")
90 print(f" Total Executions: {perf['total_executions']}")
91 print(f" Success Rate: {perf['success_rate']:.2%}")
92 print(f" Avg Execution Time: {perf['avg_execution_time']:.4f}s")
93
94 flow_perf = flow.execution_tracker.get_flow_performance()
95 print("\n Overall Flow Performance:")
96 print(f" Total Routines: {flow_perf['total_routines']}")
97 print(f" Total Events: {flow_perf['total_events']}")
98 print(f" Total Execution Time: {flow_perf['total_execution_time']:.4f}s")
99
100 # Demonstrate serialization
101 print("\nSerialization Example:")
102 flow_data = flow.serialize()
103 print(f" Serialized Flow: {len(flow_data)} fields")
104
105 job_state_data = job_state.serialize()
106 print(f" Serialized JobState: {len(job_state_data)} fields")
107
108 # Save to JSON (example)
109 with open("/tmp/flow_example.json", "w") as f:
110 json.dump(flow_data, f, indent=2, default=str)
111 print(" Saved flow to /tmp/flow_example.json")
112
113 assert job_state.status == "completed"
114 print("\n✓ State management example completed!")
115
116
117if __name__ == "__main__":
118 main()
This example demonstrates:
JobState for execution tracking
ExecutionTracker for performance monitoring
State serialization and persistence
Pending tasks serialization in pause/resume
Event queue state management
Key Features:
Pending Tasks Serialization: Pending tasks are serialized when pausing
Event Queue State: JobState tracks tasks in the event queue
Automatic Flow Detection: State management works seamlessly with automatic flow detection