Error Handling Example

Examples demonstrating different error handling strategies.

Example Code

  1#!/usr/bin/env python
  2"""
  3Error Handling Example: Demonstrating different error handling strategies
  4
  5This example demonstrates:
  6- Error handling strategies (STOP, CONTINUE, RETRY, SKIP)
  7- Error handler configuration
  8- Error recovery
  9"""
 10
 11from routilux import ErrorHandler, ErrorStrategy, Flow, Routine
 12
 13
 14class UnreliableRoutine(Routine):
 15    """A routine that may fail"""
 16
 17    def __init__(self):
 18        super().__init__()
 19        # Define trigger slot for entry routine
 20        self.trigger_slot = self.define_slot("trigger", handler=self._handle_trigger)
 21        self.output_event = self.define_event("output", ["data"])
 22        self.call_count = 0
 23
 24    def _handle_trigger(self, **kwargs):
 25        """May fail on first few calls"""
 26        self.call_count += 1
 27        if self.call_count < 3:
 28            raise ValueError(f"Simulated error (attempt {self.call_count})")
 29        self.emit("output", data=f"Success after {self.call_count} attempts")
 30
 31
 32class SuccessRoutine(Routine):
 33    """A routine that always succeeds"""
 34
 35    def __init__(self):
 36        super().__init__()
 37        self.input_slot = self.define_slot("input", handler=self.process)
 38        self.executed = False
 39
 40    def process(self, data):
 41        """Process the data"""
 42        self.executed = True
 43        print(f"Success routine received: {data}")
 44
 45
 46def test_retry_strategy():
 47    """Test RETRY strategy"""
 48    print("\n" + "=" * 50)
 49    print("Testing RETRY Strategy")
 50    print("=" * 50)
 51
 52    flow = Flow(flow_id="retry_test")
 53
 54    unreliable = UnreliableRoutine()
 55    success = SuccessRoutine()
 56
 57    unreliable_id = flow.add_routine(unreliable, "unreliable")
 58    success_id = flow.add_routine(success, "success")
 59
 60    flow.connect(unreliable_id, "output", success_id, "input")
 61
 62    # Set retry strategy
 63    error_handler = ErrorHandler(strategy=ErrorStrategy.RETRY, max_retries=5, retry_delay=0.1)
 64    flow.set_error_handler(error_handler)
 65
 66    # Execute
 67    job_state = flow.execute(unreliable_id)
 68
 69    from routilux.job_state import JobState
 70
 71    JobState.wait_for_completion(flow, job_state, timeout=2.0)
 72
 73    print(f"Job Status: {job_state.status}")
 74    print(f"Call Count: {unreliable.call_count}")
 75    print(f"Success Routine Executed: {success.executed}")
 76
 77    assert job_state.status == "completed"
 78    assert unreliable.call_count == 3
 79
 80
 81def test_continue_strategy():
 82    """Test CONTINUE strategy"""
 83    print("\n" + "=" * 50)
 84    print("Testing CONTINUE Strategy")
 85    print("=" * 50)
 86
 87    flow = Flow(flow_id="continue_test")
 88
 89    class FailingRoutine(Routine):
 90        def __init__(self):
 91            super().__init__()
 92            # Define trigger slot for entry routine
 93            self.trigger_slot = self.define_slot("trigger", handler=self._handle_trigger)
 94
 95        def _handle_trigger(self, **kwargs):
 96            raise ValueError("This will be logged but execution continues")
 97
 98    failing = FailingRoutine()
 99    failing_id = flow.add_routine(failing, "failing")
100
101    # Set continue strategy
102    error_handler = ErrorHandler(strategy=ErrorStrategy.CONTINUE)
103    flow.set_error_handler(error_handler)
104
105    # Execute
106    job_state = flow.execute(failing_id)
107
108    print(f"Job Status: {job_state.status}")
109    print(f"Execution History: {len(job_state.execution_history)}")
110
111    assert job_state.status == "completed"
112
113
114def test_skip_strategy():
115    """Test SKIP strategy"""
116    print("\n" + "=" * 50)
117    print("Testing SKIP Strategy")
118    print("=" * 50)
119
120    flow = Flow(flow_id="skip_test")
121
122    class FailingRoutine(Routine):
123        def __init__(self):
124            super().__init__()
125            # Define trigger slot for entry routine
126            self.trigger_slot = self.define_slot("trigger", handler=self._handle_trigger)
127            self.output_event = self.define_event("output", ["data"])
128
129        def _handle_trigger(self, **kwargs):
130            raise ValueError("This routine will be skipped")
131
132    failing = FailingRoutine()
133    failing_id = flow.add_routine(failing, "failing")
134
135    # Set skip strategy
136    error_handler = ErrorHandler(strategy=ErrorStrategy.SKIP)
137    flow.set_error_handler(error_handler)
138
139    # Execute
140    job_state = flow.execute(failing_id)
141
142    print(f"Job Status: {job_state.status}")
143    routine_state = job_state.get_routine_state("failing")
144    print(f"Routine State: {routine_state}")
145
146    assert job_state.status == "completed"
147    assert routine_state.get("status") == "skipped"
148
149
150def main():
151    """Main function"""
152    test_retry_strategy()
153    test_continue_strategy()
154    test_skip_strategy()
155    print("\n" + "=" * 50)
156    print("All error handling examples completed!")
157    print("=" * 50)
158
159
160if __name__ == "__main__":
161    main()

This example demonstrates:

  • RETRY strategy with retry configuration

  • CONTINUE strategy for error logging

  • SKIP strategy for fault tolerance

  • Error handling in event queue architecture

  • Automatic flow detection in error scenarios

Key Features:

  • Task-level Error Handling: Errors are handled at the task level in the event queue

  • Automatic Flow Detection: emit() calls automatically detect flow context

  • Non-blocking Error Recovery: Error handling doesn’t block the event loop