State Management Example

Examples demonstrating JobState and ExecutionTracker usage.

Example Code

  1#!/usr/bin/env python
  2"""
  3State Management Example: Demonstrating JobState and ExecutionTracker
  4
  5This example demonstrates:
  6- JobState for tracking execution state
  7- ExecutionTracker for performance monitoring
  8- State persistence and recovery
  9"""
 10
 11import json
 12
 13from routilux import Flow, Routine
 14
 15
 16class ProcessingRoutine(Routine):
 17    """A routine that processes data"""
 18
 19    def __init__(self, is_entry: bool = False):
 20        super().__init__()
 21        # Entry routine needs trigger slot
 22        if is_entry:
 23            self.trigger_slot = self.define_slot("trigger", handler=self._handle_trigger)
 24        self.input_slot = self.define_slot("input", handler=self.process)
 25        self.output_event = self.define_event("output", ["result"])
 26
 27    def _handle_trigger(self, data=None, **kwargs):
 28        """Handle trigger for entry routine"""
 29        data = data or kwargs.get("data", "default")
 30        self.process(data)
 31
 32    def process(self, data):
 33        """Process the data"""
 34        if isinstance(data, dict):
 35            data_value = data.get("data", data)
 36        else:
 37            data_value = data
 38
 39        result = f"Processed: {data_value}"
 40        # Execution state should be stored in JobState, not routine._stats
 41        self.emit("output", result=result)
 42
 43
 44def main():
 45    """Main function"""
 46    # Create a flow
 47    flow = Flow(flow_id="state_management_example")
 48
 49    # Create routine instances
 50    # processor1 is the entry routine, so it needs trigger slot
 51    processor1 = ProcessingRoutine(is_entry=True)
 52    processor2 = ProcessingRoutine()
 53
 54    # Add routines to the flow
 55    id1 = flow.add_routine(processor1, "processor1")
 56    id2 = flow.add_routine(processor2, "processor2")
 57
 58    # Connect routines
 59    flow.connect(id1, "output", id2, "input")
 60
 61    # Execute the flow
 62    print("Executing flow...")
 63    job_state = flow.execute(id1, entry_params={"data": "test"})
 64
 65    # Check JobState
 66    print("\nJobState Information:")
 67    print(f"  Status: {job_state.status}")
 68    print(f"  Flow ID: {job_state.flow_id}")
 69    print(f"  Job ID: {job_state.job_id}")
 70    print(f"  Execution History Count: {len(job_state.execution_history)}")
 71
 72    # Check routine states
 73    print("\nRoutine States:")
 74    for routine_id in job_state.routine_states:
 75        state = job_state.get_routine_state(routine_id)
 76        print(f"  {routine_id}: {state}")
 77
 78    # Check ExecutionTracker
 79    if flow.execution_tracker:
 80        print("\nExecutionTracker Information:")
 81        print(f"  Flow ID: {flow.execution_tracker.flow_id}")
 82        print(f"  Routines Executed: {len(flow.execution_tracker.routine_executions)}")
 83        print(f"  Events Recorded: {len(flow.execution_tracker.event_flow)}")
 84
 85        # Get performance metrics
 86        for routine_id in flow.execution_tracker.routine_executions:
 87            perf = flow.execution_tracker.get_routine_performance(routine_id)
 88            if perf:
 89                print(f"\n  Performance for {routine_id}:")
 90                print(f"    Total Executions: {perf['total_executions']}")
 91                print(f"    Success Rate: {perf['success_rate']:.2%}")
 92                print(f"    Avg Execution Time: {perf['avg_execution_time']:.4f}s")
 93
 94        flow_perf = flow.execution_tracker.get_flow_performance()
 95        print("\n  Overall Flow Performance:")
 96        print(f"    Total Routines: {flow_perf['total_routines']}")
 97        print(f"    Total Events: {flow_perf['total_events']}")
 98        print(f"    Total Execution Time: {flow_perf['total_execution_time']:.4f}s")
 99
100    # Demonstrate serialization
101    print("\nSerialization Example:")
102    flow_data = flow.serialize()
103    print(f"  Serialized Flow: {len(flow_data)} fields")
104
105    job_state_data = job_state.serialize()
106    print(f"  Serialized JobState: {len(job_state_data)} fields")
107
108    # Save to JSON (example)
109    with open("/tmp/flow_example.json", "w") as f:
110        json.dump(flow_data, f, indent=2, default=str)
111    print("  Saved flow to /tmp/flow_example.json")
112
113    assert job_state.status == "completed"
114    print("\n✓ State management example completed!")
115
116
117if __name__ == "__main__":
118    main()

This example demonstrates:

  • JobState for execution tracking

  • ExecutionTracker for performance monitoring

  • State serialization and persistence

  • Pending tasks serialization in pause/resume

  • Event queue state management

Key Features:

  • Pending Tasks Serialization: Pending tasks are serialized when pausing

  • Event Queue State: JobState tracks tasks in the event queue

  • Automatic Flow Detection: State management works seamlessly with automatic flow detection