Error Handling Example¶
Examples demonstrating different error handling strategies.
Example Code¶
1#!/usr/bin/env python
2"""
3Error Handling Example: Demonstrating different error handling strategies
4
5This example demonstrates:
6- Error handling strategies (STOP, CONTINUE, RETRY, SKIP)
7- Error handler configuration
8- Error recovery
9"""
10
11from routilux import ErrorHandler, ErrorStrategy, Flow, Routine
12
13
14class UnreliableRoutine(Routine):
15 """A routine that may fail"""
16
17 def __init__(self):
18 super().__init__()
19 # Define trigger slot for entry routine
20 self.trigger_slot = self.define_slot("trigger", handler=self._handle_trigger)
21 self.output_event = self.define_event("output", ["data"])
22 self.call_count = 0
23
24 def _handle_trigger(self, **kwargs):
25 """May fail on first few calls"""
26 self.call_count += 1
27 if self.call_count < 3:
28 raise ValueError(f"Simulated error (attempt {self.call_count})")
29 self.emit("output", data=f"Success after {self.call_count} attempts")
30
31
32class SuccessRoutine(Routine):
33 """A routine that always succeeds"""
34
35 def __init__(self):
36 super().__init__()
37 self.input_slot = self.define_slot("input", handler=self.process)
38 self.executed = False
39
40 def process(self, data):
41 """Process the data"""
42 self.executed = True
43 print(f"Success routine received: {data}")
44
45
46def test_retry_strategy():
47 """Test RETRY strategy"""
48 print("\n" + "=" * 50)
49 print("Testing RETRY Strategy")
50 print("=" * 50)
51
52 flow = Flow(flow_id="retry_test")
53
54 unreliable = UnreliableRoutine()
55 success = SuccessRoutine()
56
57 unreliable_id = flow.add_routine(unreliable, "unreliable")
58 success_id = flow.add_routine(success, "success")
59
60 flow.connect(unreliable_id, "output", success_id, "input")
61
62 # Set retry strategy
63 error_handler = ErrorHandler(strategy=ErrorStrategy.RETRY, max_retries=5, retry_delay=0.1)
64 flow.set_error_handler(error_handler)
65
66 # Execute
67 job_state = flow.execute(unreliable_id)
68
69 from routilux.job_state import JobState
70
71 JobState.wait_for_completion(flow, job_state, timeout=2.0)
72
73 print(f"Job Status: {job_state.status}")
74 print(f"Call Count: {unreliable.call_count}")
75 print(f"Success Routine Executed: {success.executed}")
76
77 assert job_state.status == "completed"
78 assert unreliable.call_count == 3
79
80
81def test_continue_strategy():
82 """Test CONTINUE strategy"""
83 print("\n" + "=" * 50)
84 print("Testing CONTINUE Strategy")
85 print("=" * 50)
86
87 flow = Flow(flow_id="continue_test")
88
89 class FailingRoutine(Routine):
90 def __init__(self):
91 super().__init__()
92 # Define trigger slot for entry routine
93 self.trigger_slot = self.define_slot("trigger", handler=self._handle_trigger)
94
95 def _handle_trigger(self, **kwargs):
96 raise ValueError("This will be logged but execution continues")
97
98 failing = FailingRoutine()
99 failing_id = flow.add_routine(failing, "failing")
100
101 # Set continue strategy
102 error_handler = ErrorHandler(strategy=ErrorStrategy.CONTINUE)
103 flow.set_error_handler(error_handler)
104
105 # Execute
106 job_state = flow.execute(failing_id)
107
108 print(f"Job Status: {job_state.status}")
109 print(f"Execution History: {len(job_state.execution_history)}")
110
111 assert job_state.status == "completed"
112
113
114def test_skip_strategy():
115 """Test SKIP strategy"""
116 print("\n" + "=" * 50)
117 print("Testing SKIP Strategy")
118 print("=" * 50)
119
120 flow = Flow(flow_id="skip_test")
121
122 class FailingRoutine(Routine):
123 def __init__(self):
124 super().__init__()
125 # Define trigger slot for entry routine
126 self.trigger_slot = self.define_slot("trigger", handler=self._handle_trigger)
127 self.output_event = self.define_event("output", ["data"])
128
129 def _handle_trigger(self, **kwargs):
130 raise ValueError("This routine will be skipped")
131
132 failing = FailingRoutine()
133 failing_id = flow.add_routine(failing, "failing")
134
135 # Set skip strategy
136 error_handler = ErrorHandler(strategy=ErrorStrategy.SKIP)
137 flow.set_error_handler(error_handler)
138
139 # Execute
140 job_state = flow.execute(failing_id)
141
142 print(f"Job Status: {job_state.status}")
143 routine_state = job_state.get_routine_state("failing")
144 print(f"Routine State: {routine_state}")
145
146 assert job_state.status == "completed"
147 assert routine_state.get("status") == "skipped"
148
149
150def main():
151 """Main function"""
152 test_retry_strategy()
153 test_continue_strategy()
154 test_skip_strategy()
155 print("\n" + "=" * 50)
156 print("All error handling examples completed!")
157 print("=" * 50)
158
159
160if __name__ == "__main__":
161 main()
This example demonstrates:
RETRY strategy with retry configuration
CONTINUE strategy for error logging
SKIP strategy for fault tolerance
Error handling in event queue architecture
Automatic flow detection in error scenarios
Key Features:
Task-level Error Handling: Errors are handled at the task level in the event queue
Automatic Flow Detection:
emit()calls automatically detect flow contextNon-blocking Error Recovery: Error handling doesn’t block the event loop