Error Handling

In this tutorial, you’ll learn how to build resilient workflows that handle errors gracefully using Routilux’s error handling strategies.

Learning Objectives

By the end of this tutorial, you’ll be able to:

  • Understand different error handling strategies (STOP, CONTINUE, RETRY, SKIP)

  • Configure error handlers at flow and routine levels

  • Use retry mechanisms with exponential backoff

  • Mark routines as critical or optional

  • Build fault-tolerant workflows

Step 1: Understanding Error Strategies

Routilux provides four error handling strategies:

  1. STOP (default): Stop execution immediately on error

  2. CONTINUE: Log error but continue execution

  3. RETRY: Automatically retry failed routines

  4. SKIP: Skip failed routine and continue

Let’s see each strategy in action:

 1from routilux import Flow, Routine, ErrorHandler, ErrorStrategy
 2
 3class UnreliableRoutine(Routine):
 4    def __init__(self):
 5        super().__init__()
 6        self.trigger_slot = self.define_slot("trigger", handler=self.process)
 7        self.output_event = self.define_event("output", ["data"])
 8        self.call_count = 0
 9
10    def process(self, **kwargs):
11        self.call_count += 1
12        if self.call_count < 3:
13            raise ValueError(f"Error on attempt {self.call_count}")
14        self.emit("output", data=f"Success after {self.call_count} attempts")
15
16class SuccessRoutine(Routine):
17    def __init__(self):
18        super().__init__()
19        self.input_slot = self.define_slot("input", handler=self.receive)
20        self.executed = False
21
22    def receive(self, data=None, **kwargs):
23        data_value = data or kwargs.get("data", "")
24        self.executed = True
25        print(f"Success routine received: {data_value}")
26
27# Test with STOP strategy (default)
28print("=== STOP Strategy (default) ===")
29flow1 = Flow(flow_id="stop_test")
30unreliable1 = UnreliableRoutine()
31success1 = SuccessRoutine()
32
33u1_id = flow1.add_routine(unreliable1, "unreliable")
34s1_id = flow1.add_routine(success1, "success")
35
36flow1.connect(u1_id, "output", s1_id, "input")
37
38# No error handler set - uses default STOP
39job_state1 = flow1.execute(u1_id)
40print(f"Status: {job_state1.status}")
41print(f"Success executed: {success1.executed}")

Expected Output:

=== STOP Strategy (default) ===
Status: failed
Success executed: False

Key Points:

  • STOP is the default strategy

  • Execution stops immediately on error

  • Downstream routines don’t execute

  • Flow status is set to “failed”

Step 2: Using CONTINUE Strategy

CONTINUE strategy logs errors but allows execution to continue:

 1from routilux import Flow, Routine, ErrorHandler, ErrorStrategy
 2
 3class FailingRoutine(Routine):
 4    def __init__(self):
 5        super().__init__()
 6        self.trigger_slot = self.define_slot("trigger", handler=self.process)
 7        self.output_event = self.define_event("output", ["data"])
 8
 9    def process(self, **kwargs):
10        raise ValueError("This error will be logged but execution continues")
11
12class SuccessRoutine(Routine):
13    def __init__(self):
14        super().__init__()
15        self.input_slot = self.define_slot("input", handler=self.receive)
16
17    def receive(self, data=None, **kwargs):
18        print("Success routine executed despite upstream error")
19
20flow = Flow(flow_id="continue_test")
21
22failing = FailingRoutine()
23success = SuccessRoutine()
24
25failing_id = flow.add_routine(failing, "failing")
26success_id = flow.add_routine(success, "success")
27
28flow.connect(failing_id, "output", success_id, "input")
29
30# Set CONTINUE strategy
31error_handler = ErrorHandler(strategy=ErrorStrategy.CONTINUE)
32flow.set_error_handler(error_handler)
33
34job_state = flow.execute(failing_id)
35print(f"Status: {job_state.status}")  # Still "completed" despite error

Expected Output:

Status: completed

Key Points:

  • CONTINUE logs errors but doesn’t stop execution

  • Flow status remains “completed” (not “failed”)

  • Useful for non-critical operations

  • Downstream routines still execute

Step 3: Using RETRY Strategy

RETRY strategy automatically retries failed routines:

 1from routilux import Flow, Routine, ErrorHandler, ErrorStrategy
 2import time
 3
 4class UnreliableRoutine(Routine):
 5    def __init__(self):
 6        super().__init__()
 7        self.trigger_slot = self.define_slot("trigger", handler=self.process)
 8        self.output_event = self.define_event("output", ["data"])
 9        self.call_count = 0
10
11    def process(self, **kwargs):
12        self.call_count += 1
13        print(f"Attempt {self.call_count}")
14
15        if self.call_count < 3:
16            raise ValueError(f"Error on attempt {self.call_count}")
17
18        self.emit("output", data=f"Success after {self.call_count} attempts")
19
20flow = Flow(flow_id="retry_test")
21
22unreliable = UnreliableRoutine()
23unreliable_id = flow.add_routine(unreliable, "unreliable")
24
25# Set RETRY strategy with configuration
26error_handler = ErrorHandler(
27    strategy=ErrorStrategy.RETRY,
28    max_retries=5,
29    retry_delay=0.1,  # Initial delay
30    retry_backoff=2.0  # Exponential backoff multiplier
31)
32flow.set_error_handler(error_handler)
33
34job_state = flow.execute(unreliable_id)
35print(f"Status: {job_state.status}")
36print(f"Total attempts: {unreliable.call_count}")

Expected Output:

Attempt 1
Attempt 2
Attempt 3
Status: completed
Total attempts: 3

Key Points:

  • RETRY automatically retries failed routines

  • Uses exponential backoff: delay = retry_delay * (backoff ^ (retry_count - 1))

  • Retries up to max_retries times

  • Only retries retryable exceptions (ValueError, RuntimeError by default)

Step 4: Using SKIP Strategy

SKIP strategy skips failed routines and continues:

 1from routilux import Flow, Routine, ErrorHandler, ErrorStrategy
 2
 3class OptionalRoutine(Routine):
 4    def __init__(self):
 5        super().__init__()
 6        self.trigger_slot = self.define_slot("trigger", handler=self.process)
 7        self.output_event = self.define_event("output", ["data"])
 8
 9    def process(self, **kwargs):
10        raise ValueError("This routine will be skipped")
11
12class RequiredRoutine(Routine):
13    def __init__(self):
14        super().__init__()
15        self.input_slot = self.define_slot("input", handler=self.receive)
16        self.executed = False
17
18    def receive(self, data=None, **kwargs):
19        self.executed = True
20        print("Required routine executed")
21
22flow = Flow(flow_id="skip_test")
23
24optional = OptionalRoutine()
25required = RequiredRoutine()
26
27opt_id = flow.add_routine(optional, "optional")
28req_id = flow.add_routine(required, "required")
29
30flow.connect(opt_id, "output", req_id, "input")
31
32# Set SKIP strategy for optional routine
33skip_handler = ErrorHandler(strategy=ErrorStrategy.SKIP)
34optional.set_error_handler(skip_handler)
35
36job_state = flow.execute(opt_id)
37print(f"Status: {job_state.status}")
38print(f"Required executed: {required.executed}")

Expected Output:

Status: completed
Required executed: False

Key Points:

  • SKIP marks routine as “skipped” and continues

  • Flow status remains “completed”

  • Useful for optional processing steps

  • Downstream routines don’t receive data from skipped routine

Step 5: Routine-Level Error Handlers

You can set error handlers at the routine level to override flow-level handlers:

 1from routilux import Flow, Routine, ErrorHandler, ErrorStrategy
 2
 3class CriticalRoutine(Routine):
 4    def __init__(self):
 5        super().__init__()
 6        self.trigger_slot = self.define_slot("trigger", handler=self.process)
 7        self.output_event = self.define_event("output", ["data"])
 8
 9    def process(self, **kwargs):
10        raise ValueError("Critical error")
11
12class OptionalRoutine(Routine):
13    def __init__(self):
14        super().__init__()
15        self.trigger_slot = self.define_slot("trigger", handler=self.process)
16        self.output_event = self.define_event("output", ["data"])
17
18    def process(self, **kwargs):
19        raise ValueError("Optional error")
20
21flow = Flow(flow_id="routine_level_test")
22
23critical = CriticalRoutine()
24optional = OptionalRoutine()
25
26crit_id = flow.add_routine(critical, "critical")
27opt_id = flow.add_routine(optional, "optional")
28
29# Flow-level: CONTINUE (non-critical default)
30flow.set_error_handler(ErrorHandler(strategy=ErrorStrategy.CONTINUE))
31
32# Routine-level: STOP for critical routine
33critical.set_error_handler(ErrorHandler(strategy=ErrorStrategy.STOP))
34
35# Routine-level: SKIP for optional routine
36optional.set_error_handler(ErrorHandler(strategy=ErrorStrategy.SKIP))
37
38# Test critical (should fail)
39job_state1 = flow.execute(crit_id)
40print(f"Critical status: {job_state1.status}")
41
42# Test optional (should complete)
43job_state2 = flow.execute(opt_id)
44print(f"Optional status: {job_state2.status}")

Expected Output:

Critical status: failed
Optional status: completed

Key Points:

  • Routine-level handlers override flow-level handlers

  • Priority: Routine-level > Flow-level > Default (STOP)

  • Use routine-level handlers for special cases

  • Use flow-level handlers for default behavior

Step 6: Critical and Optional Routines

You can mark routines as critical (must succeed) or optional using convenience methods:

 1from routilux import Flow, Routine, ErrorHandler, ErrorStrategy
 2
 3class CriticalRoutine(Routine):
 4    def __init__(self):
 5        super().__init__()
 6        self.trigger_slot = self.define_slot("trigger", handler=self.process)
 7        self.output_event = self.define_event("output", ["data"])
 8        self.call_count = 0
 9
10    def process(self, **kwargs):
11        self.call_count += 1
12        if self.call_count < 3:
13            raise ValueError("Critical error")
14        self.emit("output", data="Critical success")
15
16class OptionalRoutine(Routine):
17    def __init__(self):
18        super().__init__()
19        self.trigger_slot = self.define_slot("trigger", handler=self.process)
20        self.output_event = self.define_event("output", ["data"])
21
22    def process(self, **kwargs):
23        raise ValueError("Optional error - will be skipped")
24
25flow = Flow(flow_id="critical_test")
26
27critical = CriticalRoutine()
28optional = OptionalRoutine()
29
30crit_id = flow.add_routine(critical, "critical")
31opt_id = flow.add_routine(optional, "optional")
32
33# Mark as critical with retry
34critical.set_as_critical(max_retries=5, retry_delay=0.1)
35
36# Mark as optional with skip
37optional.set_as_optional()
38
39# Test critical
40job_state1 = flow.execute(crit_id)
41print(f"Critical status: {job_state1.status}")
42print(f"Critical attempts: {critical.call_count}")
43
44# Test optional
45job_state2 = flow.execute(opt_id)
46print(f"Optional status: {job_state2.status}")

Expected Output:

Critical status: completed
Critical attempts: 3
Optional status: completed

Key Points:

  • set_as_critical() marks routine as critical with retry

  • set_as_optional() marks routine as optional with skip

  • Critical routines that fail after retries cause flow to fail

  • Optional routines that fail are skipped

Step 7: Complete Example - Resilient Workflow

Here’s a complete example combining error handling strategies:

 1from routilux import Flow, Routine, ErrorHandler, ErrorStrategy
 2
 3class DataSource(Routine):
 4    def __init__(self):
 5        super().__init__()
 6        self.trigger_slot = self.define_slot("trigger", handler=self.generate)
 7        self.output_event = self.define_event("output", ["data"])
 8
 9    def generate(self, **kwargs):
10        self.emit("output", data="test_data")
11
12class UnreliableValidator(Routine):
13    def __init__(self):
14        super().__init__()
15        self.input_slot = self.define_slot("input", handler=self.validate)
16        self.output_event = self.define_event("output", ["data", "valid"])
17        self.call_count = 0
18
19    def validate(self, data=None, **kwargs):
20        self.call_count += 1
21        data_value = data or kwargs.get("data", "")
22
23        if self.call_count < 2:
24            raise ValueError("Validation failed")
25
26        self.emit("output", data=data_value, valid=True)
27
28class Processor(Routine):
29    def __init__(self):
30        super().__init__()
31        self.input_slot = self.define_slot("input", handler=self.process)
32        self.output_event = self.define_event("output", ["result"])
33
34    def process(self, data=None, valid=None, **kwargs):
35        data_value = data or kwargs.get("data", "")
36        is_valid = valid if valid is not None else kwargs.get("valid", False)
37
38        if is_valid:
39            result = f"Processed: {data_value}"
40            self.emit("output", result=result)
41        else:
42            raise ValueError("Cannot process invalid data")
43
44class Sink(Routine):
45    def __init__(self):
46        super().__init__()
47        self.input_slot = self.define_slot("input", handler=self.receive)
48
49    def receive(self, result=None, **kwargs):
50        result_value = result or kwargs.get("result", "")
51        print(f"Final result: {result_value}")
52
53def main():
54    flow = Flow(flow_id="resilient_workflow")
55
56    source = DataSource()
57    validator = UnreliableValidator()
58    processor = Processor()
59    sink = Sink()
60
61    source_id = flow.add_routine(source, "source")
62    validator_id = flow.add_routine(validator, "validator")
63    processor_id = flow.add_routine(processor, "processor")
64    sink_id = flow.add_routine(sink, "sink")
65
66    flow.connect(source_id, "output", validator_id, "input")
67    flow.connect(validator_id, "output", processor_id, "input")
68    flow.connect(processor_id, "output", sink_id, "input")
69
70    # Set retry for validator (transient failures)
71    validator.set_as_critical(max_retries=3, retry_delay=0.1)
72
73    # Set continue for processor (non-critical)
74    processor.set_error_handler(ErrorHandler(strategy=ErrorStrategy.CONTINUE))
75
76    job_state = flow.execute(source_id)
77
78    print(f"\nExecution status: {job_state.status}")
79    print(f"Validator attempts: {validator.call_count}")
80
81if __name__ == "__main__":
82    main()

Expected Output:

Final result: Processed: test_data

Execution status: completed
Validator attempts: 2

Key Points:

  • Combine different strategies for different routines

  • Use retry for transient failures

  • Use continue for non-critical operations

  • Build resilient workflows that handle errors gracefully

Common Pitfalls

Pitfall 1: Not setting error handlers

# No error handler - uses default STOP
flow.execute(routine_id)  # Fails immediately on any error

Solution: Always set appropriate error handlers for production workflows.

Pitfall 2: Too many retries

# Too many retries can cause long delays
ErrorHandler(strategy=ErrorStrategy.RETRY, max_retries=100)

Solution: Use reasonable retry counts (3-5) with appropriate delays.

Pitfall 3: Not handling retry exhaustion

# If all retries fail, flow still fails
# Need to handle this case

Solution: Use is_critical=True to control behavior when retries are exhausted.

Best Practices

  1. Set error handlers for all workflows: Don’t rely on default STOP behavior

  2. Use RETRY for transient failures: Network, timeouts, temporary issues

  3. Use CONTINUE for non-critical operations: Logging, optional processing

  4. Use SKIP for optional steps: Steps that can be safely skipped

  5. Mark critical routines: Use set_as_critical() for must-succeed operations

  6. Use reasonable retry counts: 3-5 retries with exponential backoff

  7. Test error scenarios: Verify error handling works as expected

Next Steps

Now that you understand error handling, let’s move on to Concurrent Execution to learn how to execute independent routines in parallel for better performance.