Error Handling Example¶
Examples demonstrating different error handling strategies.
Example Code¶
1#!/usr/bin/env python
2"""
3Error Handling Example: Demonstrating different error handling strategies
4
5This example demonstrates:
6- Error handling strategies (STOP, CONTINUE, RETRY, SKIP)
7- Error handler configuration
8- Error recovery
9"""
10from routilux import Flow, Routine, ErrorHandler, ErrorStrategy
11
12
13class UnreliableRoutine(Routine):
14 """A routine that may fail"""
15
16 def __init__(self):
17 super().__init__()
18 # Define trigger slot for entry routine
19 self.trigger_slot = self.define_slot("trigger", handler=self._handle_trigger)
20 self.output_event = self.define_event("output", ["data"])
21 self.call_count = 0
22
23 def _handle_trigger(self, **kwargs):
24 """May fail on first few calls"""
25 self.call_count += 1
26 if self.call_count < 3:
27 raise ValueError(f"Simulated error (attempt {self.call_count})")
28 self.emit("output", data=f"Success after {self.call_count} attempts")
29
30
31class SuccessRoutine(Routine):
32 """A routine that always succeeds"""
33
34 def __init__(self):
35 super().__init__()
36 self.input_slot = self.define_slot("input", handler=self.process)
37 self.executed = False
38
39 def process(self, data):
40 """Process the data"""
41 self.executed = True
42 print(f"Success routine received: {data}")
43
44
45def test_retry_strategy():
46 """Test RETRY strategy"""
47 print("\n" + "=" * 50)
48 print("Testing RETRY Strategy")
49 print("=" * 50)
50
51 flow = Flow(flow_id="retry_test")
52
53 unreliable = UnreliableRoutine()
54 success = SuccessRoutine()
55
56 unreliable_id = flow.add_routine(unreliable, "unreliable")
57 success_id = flow.add_routine(success, "success")
58
59 flow.connect(unreliable_id, "output", success_id, "input")
60
61 # Set retry strategy
62 error_handler = ErrorHandler(strategy=ErrorStrategy.RETRY, max_retries=5, retry_delay=0.1)
63 flow.set_error_handler(error_handler)
64
65 # Execute
66 job_state = flow.execute(unreliable_id)
67
68 from routilux.job_state import JobState
69
70 JobState.wait_for_completion(flow, job_state, timeout=2.0)
71
72 print(f"Job Status: {job_state.status}")
73 print(f"Call Count: {unreliable.call_count}")
74 print(f"Success Routine Executed: {success.executed}")
75
76 assert job_state.status == "completed"
77 assert unreliable.call_count == 3
78
79
80def test_continue_strategy():
81 """Test CONTINUE strategy"""
82 print("\n" + "=" * 50)
83 print("Testing CONTINUE Strategy")
84 print("=" * 50)
85
86 flow = Flow(flow_id="continue_test")
87
88 class FailingRoutine(Routine):
89 def __init__(self):
90 super().__init__()
91 # Define trigger slot for entry routine
92 self.trigger_slot = self.define_slot("trigger", handler=self._handle_trigger)
93
94 def _handle_trigger(self, **kwargs):
95 raise ValueError("This will be logged but execution continues")
96
97 failing = FailingRoutine()
98 failing_id = flow.add_routine(failing, "failing")
99
100 # Set continue strategy
101 error_handler = ErrorHandler(strategy=ErrorStrategy.CONTINUE)
102 flow.set_error_handler(error_handler)
103
104 # Execute
105 job_state = flow.execute(failing_id)
106
107 print(f"Job Status: {job_state.status}")
108 print(f"Execution History: {len(job_state.execution_history)}")
109
110 assert job_state.status == "completed"
111
112
113def test_skip_strategy():
114 """Test SKIP strategy"""
115 print("\n" + "=" * 50)
116 print("Testing SKIP Strategy")
117 print("=" * 50)
118
119 flow = Flow(flow_id="skip_test")
120
121 class FailingRoutine(Routine):
122 def __init__(self):
123 super().__init__()
124 # Define trigger slot for entry routine
125 self.trigger_slot = self.define_slot("trigger", handler=self._handle_trigger)
126 self.output_event = self.define_event("output", ["data"])
127
128 def _handle_trigger(self, **kwargs):
129 raise ValueError("This routine will be skipped")
130
131 failing = FailingRoutine()
132 failing_id = flow.add_routine(failing, "failing")
133
134 # Set skip strategy
135 error_handler = ErrorHandler(strategy=ErrorStrategy.SKIP)
136 flow.set_error_handler(error_handler)
137
138 # Execute
139 job_state = flow.execute(failing_id)
140
141 print(f"Job Status: {job_state.status}")
142 routine_state = job_state.get_routine_state("failing")
143 print(f"Routine State: {routine_state}")
144
145 assert job_state.status == "completed"
146 assert routine_state.get("status") == "skipped"
147
148
149def main():
150 """Main function"""
151 test_retry_strategy()
152 test_continue_strategy()
153 test_skip_strategy()
154 print("\n" + "=" * 50)
155 print("All error handling examples completed!")
156 print("=" * 50)
157
158
159if __name__ == "__main__":
160 main()
This example demonstrates:
RETRY strategy with retry configuration
CONTINUE strategy for error logging
SKIP strategy for fault tolerance
Error handling in event queue architecture
Automatic flow detection in error scenarios
Key Features:
Task-level Error Handling: Errors are handled at the task level in the event queue
Automatic Flow Detection:
emit()calls automatically detect flow contextNon-blocking Error Recovery: Error handling doesn’t block the event loop