Error Handling Example

Examples demonstrating different error handling strategies.

Example Code

  1#!/usr/bin/env python
  2"""
  3Error Handling Example: Demonstrating different error handling strategies
  4
  5This example demonstrates:
  6- Error handling strategies (STOP, CONTINUE, RETRY, SKIP)
  7- Error handler configuration
  8- Error recovery
  9"""
 10from routilux import Flow, Routine, ErrorHandler, ErrorStrategy
 11
 12
 13class UnreliableRoutine(Routine):
 14    """A routine that may fail"""
 15
 16    def __init__(self):
 17        super().__init__()
 18        # Define trigger slot for entry routine
 19        self.trigger_slot = self.define_slot("trigger", handler=self._handle_trigger)
 20        self.output_event = self.define_event("output", ["data"])
 21        self.call_count = 0
 22
 23    def _handle_trigger(self, **kwargs):
 24        """May fail on first few calls"""
 25        self.call_count += 1
 26        if self.call_count < 3:
 27            raise ValueError(f"Simulated error (attempt {self.call_count})")
 28        self.emit("output", data=f"Success after {self.call_count} attempts")
 29
 30
 31class SuccessRoutine(Routine):
 32    """A routine that always succeeds"""
 33
 34    def __init__(self):
 35        super().__init__()
 36        self.input_slot = self.define_slot("input", handler=self.process)
 37        self.executed = False
 38
 39    def process(self, data):
 40        """Process the data"""
 41        self.executed = True
 42        print(f"Success routine received: {data}")
 43
 44
 45def test_retry_strategy():
 46    """Test RETRY strategy"""
 47    print("\n" + "=" * 50)
 48    print("Testing RETRY Strategy")
 49    print("=" * 50)
 50
 51    flow = Flow(flow_id="retry_test")
 52
 53    unreliable = UnreliableRoutine()
 54    success = SuccessRoutine()
 55
 56    unreliable_id = flow.add_routine(unreliable, "unreliable")
 57    success_id = flow.add_routine(success, "success")
 58
 59    flow.connect(unreliable_id, "output", success_id, "input")
 60
 61    # Set retry strategy
 62    error_handler = ErrorHandler(strategy=ErrorStrategy.RETRY, max_retries=5, retry_delay=0.1)
 63    flow.set_error_handler(error_handler)
 64
 65    # Execute
 66    job_state = flow.execute(unreliable_id)
 67
 68    from routilux.job_state import JobState
 69
 70    JobState.wait_for_completion(flow, job_state, timeout=2.0)
 71
 72    print(f"Job Status: {job_state.status}")
 73    print(f"Call Count: {unreliable.call_count}")
 74    print(f"Success Routine Executed: {success.executed}")
 75
 76    assert job_state.status == "completed"
 77    assert unreliable.call_count == 3
 78
 79
 80def test_continue_strategy():
 81    """Test CONTINUE strategy"""
 82    print("\n" + "=" * 50)
 83    print("Testing CONTINUE Strategy")
 84    print("=" * 50)
 85
 86    flow = Flow(flow_id="continue_test")
 87
 88    class FailingRoutine(Routine):
 89        def __init__(self):
 90            super().__init__()
 91            # Define trigger slot for entry routine
 92            self.trigger_slot = self.define_slot("trigger", handler=self._handle_trigger)
 93
 94        def _handle_trigger(self, **kwargs):
 95            raise ValueError("This will be logged but execution continues")
 96
 97    failing = FailingRoutine()
 98    failing_id = flow.add_routine(failing, "failing")
 99
100    # Set continue strategy
101    error_handler = ErrorHandler(strategy=ErrorStrategy.CONTINUE)
102    flow.set_error_handler(error_handler)
103
104    # Execute
105    job_state = flow.execute(failing_id)
106
107    print(f"Job Status: {job_state.status}")
108    print(f"Execution History: {len(job_state.execution_history)}")
109
110    assert job_state.status == "completed"
111
112
113def test_skip_strategy():
114    """Test SKIP strategy"""
115    print("\n" + "=" * 50)
116    print("Testing SKIP Strategy")
117    print("=" * 50)
118
119    flow = Flow(flow_id="skip_test")
120
121    class FailingRoutine(Routine):
122        def __init__(self):
123            super().__init__()
124            # Define trigger slot for entry routine
125            self.trigger_slot = self.define_slot("trigger", handler=self._handle_trigger)
126            self.output_event = self.define_event("output", ["data"])
127
128        def _handle_trigger(self, **kwargs):
129            raise ValueError("This routine will be skipped")
130
131    failing = FailingRoutine()
132    failing_id = flow.add_routine(failing, "failing")
133
134    # Set skip strategy
135    error_handler = ErrorHandler(strategy=ErrorStrategy.SKIP)
136    flow.set_error_handler(error_handler)
137
138    # Execute
139    job_state = flow.execute(failing_id)
140
141    print(f"Job Status: {job_state.status}")
142    routine_state = job_state.get_routine_state("failing")
143    print(f"Routine State: {routine_state}")
144
145    assert job_state.status == "completed"
146    assert routine_state.get("status") == "skipped"
147
148
149def main():
150    """Main function"""
151    test_retry_strategy()
152    test_continue_strategy()
153    test_skip_strategy()
154    print("\n" + "=" * 50)
155    print("All error handling examples completed!")
156    print("=" * 50)
157
158
159if __name__ == "__main__":
160    main()

This example demonstrates:

  • RETRY strategy with retry configuration

  • CONTINUE strategy for error logging

  • SKIP strategy for fault tolerance

  • Error handling in event queue architecture

  • Automatic flow detection in error scenarios

Key Features:

  • Task-level Error Handling: Errors are handled at the task level in the event queue

  • Automatic Flow Detection: emit() calls automatically detect flow context

  • Non-blocking Error Recovery: Error handling doesn’t block the event loop