Error Handling¶
In this tutorial, you’ll learn how to build resilient workflows that handle errors gracefully using Routilux’s error handling strategies.
Learning Objectives¶
By the end of this tutorial, you’ll be able to:
Understand different error handling strategies (STOP, CONTINUE, RETRY, SKIP)
Configure error handlers at flow and routine levels
Use retry mechanisms with exponential backoff
Mark routines as critical or optional
Build fault-tolerant workflows
Step 1: Understanding Error Strategies¶
Routilux provides four error handling strategies:
STOP (default): Stop execution immediately on error
CONTINUE: Log error but continue execution
RETRY: Automatically retry failed routines
SKIP: Skip failed routine and continue
Let’s see each strategy in action:
1from routilux import Flow, Routine, ErrorHandler, ErrorStrategy
2
3class UnreliableRoutine(Routine):
4 def __init__(self):
5 super().__init__()
6 self.trigger_slot = self.define_slot("trigger", handler=self.process)
7 self.output_event = self.define_event("output", ["data"])
8 self.call_count = 0
9
10 def process(self, **kwargs):
11 self.call_count += 1
12 if self.call_count < 3:
13 raise ValueError(f"Error on attempt {self.call_count}")
14 self.emit("output", data=f"Success after {self.call_count} attempts")
15
16class SuccessRoutine(Routine):
17 def __init__(self):
18 super().__init__()
19 self.input_slot = self.define_slot("input", handler=self.receive)
20 self.executed = False
21
22 def receive(self, data=None, **kwargs):
23 data_value = data or kwargs.get("data", "")
24 self.executed = True
25 print(f"Success routine received: {data_value}")
26
27# Test with STOP strategy (default)
28print("=== STOP Strategy (default) ===")
29flow1 = Flow(flow_id="stop_test")
30unreliable1 = UnreliableRoutine()
31success1 = SuccessRoutine()
32
33u1_id = flow1.add_routine(unreliable1, "unreliable")
34s1_id = flow1.add_routine(success1, "success")
35
36flow1.connect(u1_id, "output", s1_id, "input")
37
38# No error handler set - uses default STOP
39job_state1 = flow1.execute(u1_id)
40print(f"Status: {job_state1.status}")
41print(f"Success executed: {success1.executed}")
Expected Output:
=== STOP Strategy (default) ===
Status: failed
Success executed: False
Key Points:
STOP is the default strategy
Execution stops immediately on error
Downstream routines don’t execute
Flow status is set to “failed”
Step 2: Using CONTINUE Strategy¶
CONTINUE strategy logs errors but allows execution to continue:
1from routilux import Flow, Routine, ErrorHandler, ErrorStrategy
2
3class FailingRoutine(Routine):
4 def __init__(self):
5 super().__init__()
6 self.trigger_slot = self.define_slot("trigger", handler=self.process)
7 self.output_event = self.define_event("output", ["data"])
8
9 def process(self, **kwargs):
10 raise ValueError("This error will be logged but execution continues")
11
12class SuccessRoutine(Routine):
13 def __init__(self):
14 super().__init__()
15 self.input_slot = self.define_slot("input", handler=self.receive)
16
17 def receive(self, data=None, **kwargs):
18 print("Success routine executed despite upstream error")
19
20flow = Flow(flow_id="continue_test")
21
22failing = FailingRoutine()
23success = SuccessRoutine()
24
25failing_id = flow.add_routine(failing, "failing")
26success_id = flow.add_routine(success, "success")
27
28flow.connect(failing_id, "output", success_id, "input")
29
30# Set CONTINUE strategy
31error_handler = ErrorHandler(strategy=ErrorStrategy.CONTINUE)
32flow.set_error_handler(error_handler)
33
34job_state = flow.execute(failing_id)
35print(f"Status: {job_state.status}") # Still "completed" despite error
Expected Output:
Status: completed
Key Points:
CONTINUE logs errors but doesn’t stop execution
Flow status remains “completed” (not “failed”)
Useful for non-critical operations
Downstream routines still execute
Step 3: Using RETRY Strategy¶
RETRY strategy automatically retries failed routines:
1from routilux import Flow, Routine, ErrorHandler, ErrorStrategy
2import time
3
4class UnreliableRoutine(Routine):
5 def __init__(self):
6 super().__init__()
7 self.trigger_slot = self.define_slot("trigger", handler=self.process)
8 self.output_event = self.define_event("output", ["data"])
9 self.call_count = 0
10
11 def process(self, **kwargs):
12 self.call_count += 1
13 print(f"Attempt {self.call_count}")
14
15 if self.call_count < 3:
16 raise ValueError(f"Error on attempt {self.call_count}")
17
18 self.emit("output", data=f"Success after {self.call_count} attempts")
19
20flow = Flow(flow_id="retry_test")
21
22unreliable = UnreliableRoutine()
23unreliable_id = flow.add_routine(unreliable, "unreliable")
24
25# Set RETRY strategy with configuration
26error_handler = ErrorHandler(
27 strategy=ErrorStrategy.RETRY,
28 max_retries=5,
29 retry_delay=0.1, # Initial delay
30 retry_backoff=2.0 # Exponential backoff multiplier
31)
32flow.set_error_handler(error_handler)
33
34job_state = flow.execute(unreliable_id)
35print(f"Status: {job_state.status}")
36print(f"Total attempts: {unreliable.call_count}")
Expected Output:
Attempt 1
Attempt 2
Attempt 3
Status: completed
Total attempts: 3
Key Points:
RETRY automatically retries failed routines
Uses exponential backoff: delay = retry_delay * (backoff ^ (retry_count - 1))
Retries up to max_retries times
Only retries retryable exceptions (ValueError, RuntimeError by default)
Step 4: Using SKIP Strategy¶
SKIP strategy skips failed routines and continues:
1from routilux import Flow, Routine, ErrorHandler, ErrorStrategy
2
3class OptionalRoutine(Routine):
4 def __init__(self):
5 super().__init__()
6 self.trigger_slot = self.define_slot("trigger", handler=self.process)
7 self.output_event = self.define_event("output", ["data"])
8
9 def process(self, **kwargs):
10 raise ValueError("This routine will be skipped")
11
12class RequiredRoutine(Routine):
13 def __init__(self):
14 super().__init__()
15 self.input_slot = self.define_slot("input", handler=self.receive)
16 self.executed = False
17
18 def receive(self, data=None, **kwargs):
19 self.executed = True
20 print("Required routine executed")
21
22flow = Flow(flow_id="skip_test")
23
24optional = OptionalRoutine()
25required = RequiredRoutine()
26
27opt_id = flow.add_routine(optional, "optional")
28req_id = flow.add_routine(required, "required")
29
30flow.connect(opt_id, "output", req_id, "input")
31
32# Set SKIP strategy for optional routine
33skip_handler = ErrorHandler(strategy=ErrorStrategy.SKIP)
34optional.set_error_handler(skip_handler)
35
36job_state = flow.execute(opt_id)
37print(f"Status: {job_state.status}")
38print(f"Required executed: {required.executed}")
Expected Output:
Status: completed
Required executed: False
Key Points:
SKIP marks routine as “skipped” and continues
Flow status remains “completed”
Useful for optional processing steps
Downstream routines don’t receive data from skipped routine
Step 5: Routine-Level Error Handlers¶
You can set error handlers at the routine level to override flow-level handlers:
1from routilux import Flow, Routine, ErrorHandler, ErrorStrategy
2
3class CriticalRoutine(Routine):
4 def __init__(self):
5 super().__init__()
6 self.trigger_slot = self.define_slot("trigger", handler=self.process)
7 self.output_event = self.define_event("output", ["data"])
8
9 def process(self, **kwargs):
10 raise ValueError("Critical error")
11
12class OptionalRoutine(Routine):
13 def __init__(self):
14 super().__init__()
15 self.trigger_slot = self.define_slot("trigger", handler=self.process)
16 self.output_event = self.define_event("output", ["data"])
17
18 def process(self, **kwargs):
19 raise ValueError("Optional error")
20
21flow = Flow(flow_id="routine_level_test")
22
23critical = CriticalRoutine()
24optional = OptionalRoutine()
25
26crit_id = flow.add_routine(critical, "critical")
27opt_id = flow.add_routine(optional, "optional")
28
29# Flow-level: CONTINUE (non-critical default)
30flow.set_error_handler(ErrorHandler(strategy=ErrorStrategy.CONTINUE))
31
32# Routine-level: STOP for critical routine
33critical.set_error_handler(ErrorHandler(strategy=ErrorStrategy.STOP))
34
35# Routine-level: SKIP for optional routine
36optional.set_error_handler(ErrorHandler(strategy=ErrorStrategy.SKIP))
37
38# Test critical (should fail)
39job_state1 = flow.execute(crit_id)
40print(f"Critical status: {job_state1.status}")
41
42# Test optional (should complete)
43job_state2 = flow.execute(opt_id)
44print(f"Optional status: {job_state2.status}")
Expected Output:
Critical status: failed
Optional status: completed
Key Points:
Routine-level handlers override flow-level handlers
Priority: Routine-level > Flow-level > Default (STOP)
Use routine-level handlers for special cases
Use flow-level handlers for default behavior
Step 6: Critical and Optional Routines¶
You can mark routines as critical (must succeed) or optional using convenience methods:
1from routilux import Flow, Routine, ErrorHandler, ErrorStrategy
2
3class CriticalRoutine(Routine):
4 def __init__(self):
5 super().__init__()
6 self.trigger_slot = self.define_slot("trigger", handler=self.process)
7 self.output_event = self.define_event("output", ["data"])
8 self.call_count = 0
9
10 def process(self, **kwargs):
11 self.call_count += 1
12 if self.call_count < 3:
13 raise ValueError("Critical error")
14 self.emit("output", data="Critical success")
15
16class OptionalRoutine(Routine):
17 def __init__(self):
18 super().__init__()
19 self.trigger_slot = self.define_slot("trigger", handler=self.process)
20 self.output_event = self.define_event("output", ["data"])
21
22 def process(self, **kwargs):
23 raise ValueError("Optional error - will be skipped")
24
25flow = Flow(flow_id="critical_test")
26
27critical = CriticalRoutine()
28optional = OptionalRoutine()
29
30crit_id = flow.add_routine(critical, "critical")
31opt_id = flow.add_routine(optional, "optional")
32
33# Mark as critical with retry
34critical.set_as_critical(max_retries=5, retry_delay=0.1)
35
36# Mark as optional with skip
37optional.set_as_optional()
38
39# Test critical
40job_state1 = flow.execute(crit_id)
41print(f"Critical status: {job_state1.status}")
42print(f"Critical attempts: {critical.call_count}")
43
44# Test optional
45job_state2 = flow.execute(opt_id)
46print(f"Optional status: {job_state2.status}")
Expected Output:
Critical status: completed
Critical attempts: 3
Optional status: completed
Key Points:
set_as_critical()marks routine as critical with retryset_as_optional()marks routine as optional with skipCritical routines that fail after retries cause flow to fail
Optional routines that fail are skipped
Step 7: Complete Example - Resilient Workflow¶
Here’s a complete example combining error handling strategies:
1from routilux import Flow, Routine, ErrorHandler, ErrorStrategy
2
3class DataSource(Routine):
4 def __init__(self):
5 super().__init__()
6 self.trigger_slot = self.define_slot("trigger", handler=self.generate)
7 self.output_event = self.define_event("output", ["data"])
8
9 def generate(self, **kwargs):
10 self.emit("output", data="test_data")
11
12class UnreliableValidator(Routine):
13 def __init__(self):
14 super().__init__()
15 self.input_slot = self.define_slot("input", handler=self.validate)
16 self.output_event = self.define_event("output", ["data", "valid"])
17 self.call_count = 0
18
19 def validate(self, data=None, **kwargs):
20 self.call_count += 1
21 data_value = data or kwargs.get("data", "")
22
23 if self.call_count < 2:
24 raise ValueError("Validation failed")
25
26 self.emit("output", data=data_value, valid=True)
27
28class Processor(Routine):
29 def __init__(self):
30 super().__init__()
31 self.input_slot = self.define_slot("input", handler=self.process)
32 self.output_event = self.define_event("output", ["result"])
33
34 def process(self, data=None, valid=None, **kwargs):
35 data_value = data or kwargs.get("data", "")
36 is_valid = valid if valid is not None else kwargs.get("valid", False)
37
38 if is_valid:
39 result = f"Processed: {data_value}"
40 self.emit("output", result=result)
41 else:
42 raise ValueError("Cannot process invalid data")
43
44class Sink(Routine):
45 def __init__(self):
46 super().__init__()
47 self.input_slot = self.define_slot("input", handler=self.receive)
48
49 def receive(self, result=None, **kwargs):
50 result_value = result or kwargs.get("result", "")
51 print(f"Final result: {result_value}")
52
53def main():
54 flow = Flow(flow_id="resilient_workflow")
55
56 source = DataSource()
57 validator = UnreliableValidator()
58 processor = Processor()
59 sink = Sink()
60
61 source_id = flow.add_routine(source, "source")
62 validator_id = flow.add_routine(validator, "validator")
63 processor_id = flow.add_routine(processor, "processor")
64 sink_id = flow.add_routine(sink, "sink")
65
66 flow.connect(source_id, "output", validator_id, "input")
67 flow.connect(validator_id, "output", processor_id, "input")
68 flow.connect(processor_id, "output", sink_id, "input")
69
70 # Set retry for validator (transient failures)
71 validator.set_as_critical(max_retries=3, retry_delay=0.1)
72
73 # Set continue for processor (non-critical)
74 processor.set_error_handler(ErrorHandler(strategy=ErrorStrategy.CONTINUE))
75
76 job_state = flow.execute(source_id)
77
78 print(f"\nExecution status: {job_state.status}")
79 print(f"Validator attempts: {validator.call_count}")
80
81if __name__ == "__main__":
82 main()
Expected Output:
Final result: Processed: test_data
Execution status: completed
Validator attempts: 2
Key Points:
Combine different strategies for different routines
Use retry for transient failures
Use continue for non-critical operations
Build resilient workflows that handle errors gracefully
Common Pitfalls¶
Pitfall 1: Not setting error handlers
# No error handler - uses default STOP
flow.execute(routine_id) # Fails immediately on any error
Solution: Always set appropriate error handlers for production workflows.
Pitfall 2: Too many retries
# Too many retries can cause long delays
ErrorHandler(strategy=ErrorStrategy.RETRY, max_retries=100)
Solution: Use reasonable retry counts (3-5) with appropriate delays.
Pitfall 3: Not handling retry exhaustion
# If all retries fail, flow still fails
# Need to handle this case
Solution: Use is_critical=True to control behavior when retries are exhausted.
Best Practices¶
Set error handlers for all workflows: Don’t rely on default STOP behavior
Use RETRY for transient failures: Network, timeouts, temporary issues
Use CONTINUE for non-critical operations: Logging, optional processing
Use SKIP for optional steps: Steps that can be safely skipped
Mark critical routines: Use
set_as_critical()for must-succeed operationsUse reasonable retry counts: 3-5 retries with exponential backoff
Test error scenarios: Verify error handling works as expected
Next Steps¶
Now that you understand error handling, let’s move on to Concurrent Execution to learn how to execute independent routines in parallel for better performance.