Source code for routilux.error_handler

"""
Error handling strategies.

Defines error handling strategies and retry mechanisms.
"""

from __future__ import annotations

import logging
import time
from enum import Enum
from typing import TYPE_CHECKING, Any

from serilux import Serializable, register_serializable

if TYPE_CHECKING:
    from routilux.flow import Flow
    from routilux.routine import Routine

logger = logging.getLogger(__name__)


[docs] class ErrorStrategy(Enum): """Error handling strategy enumeration. Defines how errors in routine execution should be handled. Each strategy has different behavior regarding flow continuation, error logging, and retry mechanisms. Available strategies: - STOP: Immediately stop flow execution when an error occurs. Flow status set to "failed", execution stops immediately, no retry attempts. Use for critical workflows where any error is unacceptable. - CONTINUE: Log error but continue flow execution. Flow status set to "completed" (not "failed"), routine marked as "error_continued", execution continues to downstream routines. Use for non-critical operations where failures are acceptable. - RETRY: Automatically retry the failed routine. Retries up to max_retries times, uses exponential backoff for delays, only retries retryable exceptions. If all retries fail: stops (or fails flow if is_critical=True). Use for transient failures (network, timeouts). - SKIP: Skip the failed routine and continue. Routine marked as "skipped", flow status set to "completed", execution continues to downstream routines. Use for optional processing steps. """ STOP = "stop" # Stop execution immediately CONTINUE = "continue" # Continue to next routine RETRY = "retry" # Retry the operation SKIP = "skip" # Skip the routine
[docs] @register_serializable class ErrorHandler(Serializable): """Error handler for managing error handling strategies and retry mechanisms. An ErrorHandler defines how errors in routine execution should be handled. It can be set at the Flow level (default for all routines) or at the Routine level (override for specific routines). Routine-level handlers take priority. Key Features: - Multiple Strategies: STOP, CONTINUE, RETRY, SKIP - Configurable Retry: Max retries, delays, exponential backoff - Exception Filtering: Only retry specific exception types - Critical Flag: Mark routines as critical (must succeed) Retry Mechanism: When using RETRY strategy, retries only occur for exceptions in retryable_exceptions tuple. Delay between retries is calculated as: ``retry_delay * (retry_backoff ** (retry_count - 1))``. Example with retry_delay=1.0, retry_backoff=2.0: Retry 1: 1.0s delay, Retry 2: 2.0s delay, Retry 3: 4.0s delay. If max_retries exceeded: is_critical=True causes flow to fail, is_critical=False stops execution. Priority System: Error handlers are checked in this order: 1. Routine-level handler (if set via routine.set_error_handler()) 2. Flow-level handler (if set via flow.set_error_handler()) 3. Default behavior (STOP) Examples: Basic error handler: >>> handler = ErrorHandler(strategy=ErrorStrategy.CONTINUE) >>> flow.set_error_handler(handler) Retry handler with custom config: >>> handler = ErrorHandler( ... strategy=ErrorStrategy.RETRY, ... max_retries=5, ... retry_delay=2.0, ... retry_backoff=1.5, ... retryable_exceptions=(ConnectionError, TimeoutError) ... ) >>> routine.set_error_handler(handler) Critical routine handler: >>> handler = ErrorHandler( ... strategy=ErrorStrategy.RETRY, ... max_retries=3, ... is_critical=True # Flow fails if all retries fail ... ) >>> critical_routine.set_error_handler(handler) """
[docs] def __init__( self, strategy: str = "stop", max_retries: int = 3, retry_delay: float = 1.0, retry_backoff: float = 2.0, retryable_exceptions: tuple | None = None, is_critical: bool = False, ): """Initialize ErrorHandler with configuration. Args: strategy: Error handling strategy. Can be: - String: "stop", "continue", "retry", "skip" - ErrorStrategy enum: ErrorStrategy.STOP, etc. Default: "stop" max_retries: Maximum number of retry attempts for RETRY strategy. Only used when strategy is RETRY. Default: 3 retry_delay: Initial delay in seconds before first retry. Subsequent retries use: retry_delay * (retry_backoff ** retry_count) Default: 1.0 seconds retry_backoff: Multiplier for exponential backoff between retries. Each retry delay is multiplied by this value. Example: delay=1.0, backoff=2.0 → delays: 1s, 2s, 4s, 8s... Default: 2.0 retryable_exceptions: Tuple of exception types that should be retried. If an exception is not in this tuple, it won't be retried (stops immediately). If None, defaults to (Exception,) - retries all exceptions. Example: (ConnectionError, TimeoutError, OSError) is_critical: If True, marks the routine as critical. For RETRY strategy: If all retries fail, the flow will fail. For other strategies: This flag has no effect (for future use). Default: False Examples: Stop on any error: >>> handler = ErrorHandler(strategy="stop") Continue on error: >>> handler = ErrorHandler(strategy=ErrorStrategy.CONTINUE) Retry with exponential backoff: >>> handler = ErrorHandler( ... strategy="retry", ... max_retries=5, ... retry_delay=1.0, ... retry_backoff=2.0 ... ) Retry only network errors: >>> handler = ErrorHandler( ... strategy="retry", ... retryable_exceptions=(ConnectionError, TimeoutError) ... ) """ super().__init__() # Support both string and enum if isinstance(strategy, str): self.strategy: ErrorStrategy = ErrorStrategy(strategy) else: self.strategy: ErrorStrategy = strategy self.max_retries: int = max_retries self.retry_delay: float = retry_delay self.retry_backoff: float = retry_backoff self.retryable_exceptions: tuple = retryable_exceptions or (Exception,) self.retry_count: int = 0 self.is_critical: bool = is_critical # Register serializable fields self.add_serializable_fields( [ "strategy", "max_retries", "retry_delay", "retry_backoff", "retry_count", "is_critical", ] )
[docs] def handle_error( self, error: Exception, routine: Routine, routine_id: str, flow: Flow, job_state=None, context: dict[str, Any] | None = None, ) -> bool: """Handle an error according to the configured strategy. This method is called by the Flow when a routine execution fails. It implements the error handling logic based on the configured strategy and returns whether execution should continue. Strategy Behaviors: STOP: Logs error and returns False (stop execution) CONTINUE: Logs warning, records error, returns True (continue) SKIP: Logs warning, marks routine as skipped, returns True (continue) RETRY: - If exception not retryable: Returns False (stop immediately) - If retries remaining: Increments count, sleeps, returns True (retry) - If max retries exceeded: - is_critical=True: Returns False (flow fails) - is_critical=False: Returns False (stop execution) Args: error: Exception object that occurred during routine execution. The exception type is checked against retryable_exceptions for RETRY strategy. routine: Routine instance where the error occurred. Used for accessing routine state and statistics. routine_id: Unique identifier of the routine in the flow. Used for logging and state tracking. flow: Flow object managing the execution. Used for accessing job_state and recording execution history. context: Optional context dictionary with additional information. Currently not used but reserved for future extensions. Returns: bool: True if execution should continue (retry or proceed), False if execution should stop (error or max retries). The Flow uses this return value to decide whether to: - Continue execution (True): Retry routine or proceed to next - Stop execution (False): Mark flow as failed and stop Side Effects: - Logs error/warning messages - Updates retry_count for RETRY strategy - Records error in job_state for CONTINUE/SKIP strategies - Sleeps for retry delay in RETRY strategy Examples: The method is called automatically by Flow: >>> # Flow detects error in routine execution >>> should_continue = error_handler.handle_error( ... error=ValueError("Something went wrong"), ... routine=my_routine, ... routine_id="my_routine", ... flow=my_flow ... ) >>> # Returns True/False based on strategy """ context = context or {} if self.strategy == ErrorStrategy.STOP: logger.error(f"Error in routine {routine_id}: {error}. Stopping execution.") return False elif self.strategy == ErrorStrategy.CONTINUE: logger.warning(f"Error in routine {routine_id}: {error}. Continuing execution.") # Record error but continue execution if job_state: job_state.record_execution( routine_id, "error_continued", {"error": str(error), "error_type": type(error).__name__}, ) return True elif self.strategy == ErrorStrategy.RETRY: # Check if exception is retryable if not isinstance(error, self.retryable_exceptions): logger.error( f"Error in routine {routine_id}: {error}. " f"Exception type {type(error).__name__} is not retryable. Stopping." ) # Non-retryable exceptions should stop immediately (don't enter retry loop) # The is_critical flag only affects behavior after retries are exhausted return False if self.retry_count < self.max_retries: self.retry_count += 1 delay = self.retry_delay * (self.retry_backoff ** (self.retry_count - 1)) logger.warning( f"Error in routine {routine_id}: {error}. " f"Retrying ({self.retry_count}/{self.max_retries}) after {delay}s..." ) time.sleep(delay) return True # Return True to indicate retry should occur else: # Max retries exceeded if self.is_critical: logger.error( f"Error in routine {routine_id}: {error}. " f"Critical routine failed after {self.max_retries} retries. Flow will fail." ) else: logger.error( f"Error in routine {routine_id}: {error}. " f"Max retries ({self.max_retries}) exceeded. Stopping." ) # For critical routines, retry failure means flow must fail return not self.is_critical elif self.strategy == ErrorStrategy.SKIP: logger.warning(f"Error in routine {routine_id}: {error}. Skipping routine.") # Mark as skipped if job_state: job_state.update_routine_state( routine_id, {"status": "skipped", "error": str(error)} ) return True return False
[docs] def reset(self) -> None: """Reset the retry count.""" self.retry_count = 0
[docs] def serialize(self) -> dict[str, Any]: """Serialize the ErrorHandler. Returns: Serialized dictionary containing error handler configuration. """ data = super().serialize() # ErrorStrategy enum needs to be converted to string if isinstance(data.get("strategy"), ErrorStrategy): data["strategy"] = data["strategy"].value return data
[docs] def deserialize(self, data: dict[str, Any], registry: Any | None = None) -> None: """Deserialize the ErrorHandler. Args: data: Serialized data dictionary. registry: Optional ObjectRegistry for deserializing callables. """ # ErrorStrategy needs to be converted from string to enum if "strategy" in data and isinstance(data["strategy"], str): data["strategy"] = ErrorStrategy(data["strategy"]) super().deserialize(data, registry=registry)