Source code for routilux.job_state

"""
JobState and ExecutionRecord classes.

Used for recording flow execution state.
"""

import json
import logging
import time
import uuid
from datetime import datetime
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional

from serilux import Serializable, register_serializable

if TYPE_CHECKING:
    from routilux.flow.flow import Flow

logger = logging.getLogger(__name__)


[docs] @register_serializable class ExecutionRecord(Serializable): """Execution record for a single routine execution. Captures information about when and how a routine was executed, including parameters, timestamp, and event information. """
[docs] def __init__( self, routine_id: str = "", event_name: str = "", data: Optional[Dict[str, Any]] = None, timestamp: Optional[datetime] = None, ): """Initialize ExecutionRecord. Args: routine_id: Routine identifier. event_name: Event name. data: Transmitted data. timestamp: Timestamp (uses current time if None). """ super().__init__() self.routine_id: str = routine_id self.event_name: str = event_name self.data: Dict[str, Any] = data or {} self.timestamp: datetime = timestamp or datetime.now() # Register serializable fields self.add_serializable_fields(["routine_id", "event_name", "data", "timestamp"])
[docs] def __repr__(self) -> str: """Return string representation of the ExecutionRecord.""" return f"ExecutionRecord[{self.routine_id}.{self.event_name}@{self.timestamp}]"
[docs] def serialize(self) -> Dict[str, Any]: """Serialize, handling datetime conversion.""" data = super().serialize() # Convert datetime to string if isinstance(data.get("timestamp"), datetime): data["timestamp"] = data["timestamp"].isoformat() return data
[docs] def deserialize(self, data: Dict[str, Any]) -> None: """Deserialize, handling datetime conversion.""" # Convert string to datetime if isinstance(data.get("timestamp"), str): data["timestamp"] = datetime.fromisoformat(data["timestamp"]) super().deserialize(data)
[docs] @register_serializable class JobState(Serializable): """Job state for tracking flow execution. JobState maintains comprehensive state information about flow execution, including routine states, execution history, pause points, and overall execution status. It provides a complete snapshot of workflow execution that can be serialized, persisted, and used for resumption. Key Responsibilities: - Status Tracking: Monitor overall flow execution status - Routine States: Track individual routine execution states - Execution History: Record all routine executions with timestamps - Pause Points: Support execution pausing and resumption - State Queries: Provide methods to query execution state Status Values: - "pending": Flow created but not yet started - "running": Flow execution in progress - "paused": Flow execution paused (can be resumed) - "completed": Flow execution completed successfully - "failed": Flow execution failed due to error - "cancelled": Flow execution cancelled by user Routine State Values: - "pending": Routine not yet executed - "running": Routine execution in progress - "completed": Routine executed successfully - "failed": Routine execution failed - "error_continued": Routine failed but execution continued (CONTINUE strategy) - "skipped": Routine was skipped (SKIP strategy) Examples: Basic usage: >>> job_state = flow.execute(entry_routine_id) >>> print(job_state.status) # "completed" or "failed" >>> routine_state = job_state.get_routine_state(routine_id) >>> print(routine_state["status"]) # "completed" Query execution history: >>> history = job_state.get_execution_history() >>> for record in history: ... print(f"{record.routine_id} emitted {record.event_name}") Check specific routine: >>> routine_state = job_state.get_routine_state("my_routine") >>> if routine_state and routine_state["status"] == "completed": ... print("Routine completed successfully") """
[docs] def __init__(self, flow_id: str = ""): """Initialize JobState. Args: flow_id: Flow identifier. """ super().__init__() self.flow_id: str = flow_id self.job_id: str = str(uuid.uuid4()) self.status: str = "pending" # pending, running, paused, completed, failed, cancelled self.pause_points: List[Dict[str, Any]] = [] self.current_routine_id: Optional[str] = None self.routine_states: Dict[str, Dict[str, Any]] = {} self.execution_history: List[ExecutionRecord] = [] self.pending_tasks: List[Dict[str, Any]] = [] # Serialized pending tasks self.created_at: datetime = datetime.now() self.updated_at: datetime = datetime.now() # Deferred events to be emitted on resume self.deferred_events: List[Dict[str, Any]] = [] # Shared data area for execution-wide data storage self.shared_data: Dict[str, Any] = {} self.shared_log: List[Dict[str, Any]] = [] # Output handler for this execution (not serialized) self.output_handler: Optional[Any] = None # OutputHandler, but avoid circular import # Output log for persistence (optional) self.output_log: List[Dict[str, Any]] = [] # Register serializable fields self.add_serializable_fields( [ "flow_id", "job_id", "status", "current_routine_id", "routine_states", "execution_history", "created_at", "updated_at", "pause_points", "pending_tasks", "deferred_events", "shared_data", "shared_log", "output_log", ] )
[docs] def __repr__(self) -> str: """Return string representation of the JobState.""" return f"JobState[{self.job_id}:{self.status}]"
[docs] def update_routine_state(self, routine_id: str, state: Dict[str, Any]) -> None: """Update state for a specific routine. This method updates or sets the execution state for a routine. The state dictionary typically contains information like: - "status": Execution status ("completed", "failed", etc.) - "error": Error message if execution failed - "result": Execution result (if any) - Custom state information added by routines Args: routine_id: Unique identifier of the routine in the flow. Must match the ID used when adding the routine to the flow. state: Dictionary containing routine state information. Common keys: - "status": str - Execution status - "error": str - Error message (if failed) - "result": Any - Execution result - Custom keys as needed The dictionary is copied, so modifications to the original won't affect the stored state. Side Effects: - Updates routine_states[routine_id] with the new state - Updates updated_at timestamp Examples: Update routine status: >>> job_state.update_routine_state("my_routine", { ... "status": "completed", ... "result": "success" ... }) Mark routine as failed: >>> job_state.update_routine_state("my_routine", { ... "status": "failed", ... "error": "Connection timeout" ... }) """ self.routine_states[routine_id] = state.copy() self.updated_at = datetime.now()
[docs] def get_routine_state(self, routine_id: str) -> Optional[Dict[str, Any]]: """Get execution state for a specific routine. This method retrieves the current execution state of a routine. Returns None if the routine hasn't been executed or doesn't exist. Args: routine_id: Unique identifier of the routine in the flow. Must match the ID used when adding the routine to the flow. Returns: Dictionary containing routine state information, or None if: - Routine hasn't been executed yet - Routine ID doesn't exist in the flow - State hasn't been set yet Common keys in returned dictionary: - "status": str - Execution status ("completed", "failed", etc.) - "error": str - Error message (if execution failed) - "result": Any - Execution result (if any) - Custom keys added by routines Examples: Check if routine completed: >>> state = job_state.get_routine_state("my_routine") >>> if state and state.get("status") == "completed": ... print("Routine completed successfully") Get execution result: >>> state = job_state.get_routine_state("processor") >>> if state: ... result = state.get("result") ... print(f"Result: {result}") Check for errors: >>> state = job_state.get_routine_state("validator") >>> if state and "error" in state: ... print(f"Error: {state['error']}") """ return self.routine_states.get(routine_id)
[docs] def record_execution(self, routine_id: str, event_name: str, data: Dict[str, Any]) -> None: """Record an execution event in the execution history. This method creates an ExecutionRecord and adds it to the execution history. The history provides a chronological log of all routine executions, event emissions, and data transmissions. Execution history is useful for: - Debugging: Trace data flow through the workflow - Monitoring: Track which routines executed and when - Analysis: Understand execution patterns and performance - Auditing: Maintain a record of all operations Args: routine_id: Unique identifier of the routine that executed. This is the routine that emitted the event or completed execution. event_name: Name of the event that was emitted, or action name. Common values: event names like "output", "result", or special actions like "error_continued", "skipped". data: Dictionary of data transmitted or associated with the execution. This typically contains the parameters passed to the event, or error information for error records. Example: {"result": "success", "count": 42} Side Effects: - Creates a new ExecutionRecord with current timestamp - Appends record to execution_history list - Updates updated_at timestamp Examples: Record event emission: >>> job_state.record_execution( ... "processor", ... "output", ... {"result": "processed", "count": 10} ... ) Record error continuation: >>> job_state.record_execution( ... "optional_routine", ... "error_continued", ... {"error": "Service unavailable", "error_type": "ConnectionError"} ... ) """ record = ExecutionRecord(routine_id, event_name, data) self.execution_history.append(record) self.updated_at = datetime.now()
[docs] def get_execution_history(self, routine_id: Optional[str] = None) -> List[ExecutionRecord]: """Get execution history, optionally filtered by routine. This method returns the execution history, which is a chronological list of all routine executions and event emissions. You can filter to get history for a specific routine. Args: routine_id: Optional routine identifier to filter history. If provided, returns only ExecutionRecords for this routine. If None, returns all execution records. Returns: List of ExecutionRecord objects, sorted by timestamp (chronological order). Each ExecutionRecord contains: - routine_id: str - Routine that executed - event_name: str - Event emitted or action name - data: Dict[str, Any] - Data transmitted - timestamp: datetime - When the execution occurred Examples: Get all execution history: >>> history = job_state.get_execution_history() >>> for record in history: ... print(f"{record.timestamp}: {record.routine_id} -> {record.event_name}") Get history for specific routine: >>> processor_history = job_state.get_execution_history("processor") >>> for record in processor_history: ... print(f"Event: {record.event_name}, Data: {record.data}") Find error records: >>> history = job_state.get_execution_history() >>> errors = [r for r in history if "error" in r.event_name] >>> print(f"Found {len(errors)} error records") """ if routine_id is None: history = self.execution_history else: history = [r for r in self.execution_history if r.routine_id == routine_id] # Sort by time return sorted(history, key=lambda x: x.timestamp)
def _set_paused(self, reason: str = "", checkpoint: Optional[Dict[str, Any]] = None) -> None: """Internal method: Set paused state (called by Flow). Args: reason: Reason for pausing. checkpoint: Checkpoint data. """ self.status = "paused" pause_point = { "timestamp": datetime.now().isoformat(), "reason": reason, "current_routine_id": self.current_routine_id, "checkpoint": checkpoint or {}, } self.pause_points.append(pause_point) self.updated_at = datetime.now() def _set_running(self) -> None: """Internal method: Set running state (called by Flow).""" if self.status == "paused": self.status = "running" self.updated_at = datetime.now() def _set_cancelled(self, reason: str = "") -> None: """Internal method: Set cancelled state (called by Flow). Args: reason: Reason for cancellation. """ self.status = "cancelled" self.updated_at = datetime.now() if reason: self.routine_states.setdefault("_cancellation", {})["reason"] = reason
[docs] def save(self, filepath: str) -> None: """Persist state to file. Args: filepath: File path. """ import os # Ensure directory exists os.makedirs(os.path.dirname(filepath) if os.path.dirname(filepath) else ".", exist_ok=True) data = self.serialize() # Handle datetime if isinstance(data.get("created_at"), datetime): data["created_at"] = data["created_at"].isoformat() if isinstance(data.get("updated_at"), datetime): data["updated_at"] = data["updated_at"].isoformat() with open(filepath, "w", encoding="utf-8") as f: json.dump(data, f, indent=2, ensure_ascii=False)
[docs] @classmethod def load(cls, filepath: str) -> "JobState": """Load state from file. Args: filepath: File path. Returns: JobState object. Raises: FileNotFoundError: If file does not exist. ValueError: If file format is incorrect. """ import os if not os.path.exists(filepath): raise FileNotFoundError(f"JobState file not found: {filepath}") with open(filepath, encoding="utf-8") as f: data = json.load(f) # Validate data format if "_type" not in data or data["_type"] != "JobState": raise ValueError(f"Invalid JobState file format: {filepath}") # Create object job_state = cls(data.get("flow_id", "")) job_state.deserialize(data) # Handle datetime if isinstance(job_state.created_at, str): job_state.created_at = datetime.fromisoformat(job_state.created_at) if isinstance(job_state.updated_at, str): job_state.updated_at = datetime.fromisoformat(job_state.updated_at) return job_state
[docs] def serialize(self) -> Dict[str, Any]: """Serialize, handling datetime and ExecutionRecord.""" data = super().serialize() # Handle datetime if isinstance(data.get("created_at"), datetime): data["created_at"] = data["created_at"].isoformat() if isinstance(data.get("updated_at"), datetime): data["updated_at"] = data["updated_at"].isoformat() return data
[docs] def deserialize(self, data: Dict[str, Any]) -> None: """Deserialize, handling datetime and ExecutionRecord.""" # Handle datetime if isinstance(data.get("created_at"), str): data["created_at"] = datetime.fromisoformat(data["created_at"]) if isinstance(data.get("updated_at"), str): data["updated_at"] = datetime.fromisoformat(data["updated_at"]) # Handle ExecutionRecord list if "execution_history" in data and isinstance(data["execution_history"], list): records = [] for record_data in data["execution_history"]: if isinstance(record_data, dict): record = ExecutionRecord( record_data.get("routine_id", ""), record_data.get("event_name", ""), record_data.get("data", {}), ( datetime.fromisoformat(record_data["timestamp"]) if isinstance(record_data.get("timestamp"), str) else None ), ) records.append(record) data["execution_history"] = records super().deserialize(data)
[docs] def add_deferred_event(self, routine_id: str, event_name: str, data: Dict[str, Any]) -> None: """Add a deferred event to be emitted on resume. Args: routine_id: ID of the routine that will emit the event. event_name: Name of the event to emit. data: Data to pass to the event. """ self.deferred_events.append( { "routine_id": routine_id, "event_name": event_name, "data": data, "timestamp": datetime.now().isoformat(), } ) self.updated_at = datetime.now()
[docs] def update_shared_data(self, key: str, value: Any) -> None: """Update shared data. Thread-safe in CPython (GIL protected). Args: key: Key to update. value: Value to set. """ self.shared_data[key] = value self.updated_at = datetime.now()
[docs] def get_shared_data(self, key: str, default: Any = None) -> Any: """Get shared data. Args: key: Key to get. default: Default value if key doesn't exist. Returns: Value for key, or default if key doesn't exist. """ return self.shared_data.get(key, default)
[docs] def append_to_shared_log(self, entry: Dict[str, Any]) -> None: """Append entry to shared log. Thread-safe in CPython (GIL protected). Args: entry: Dictionary to append to log. """ if not isinstance(entry, dict): raise ValueError("Entry must be a dictionary") # Automatically add timestamp if not present if "timestamp" not in entry: entry = entry.copy() entry["timestamp"] = datetime.now().isoformat() self.shared_log.append(entry) self.updated_at = datetime.now()
[docs] def get_shared_log( self, filter_func: Optional[Callable[[Dict[str, Any]], bool]] = None ) -> List[Dict[str, Any]]: """Get shared log, optionally filtered. Args: filter_func: Optional function to filter entries. Should return True to include entry. Returns: List of log entries. """ if filter_func is None: return self.shared_log.copy() return [entry for entry in self.shared_log if filter_func(entry)]
[docs] def set_output_handler(self, handler: Any) -> None: """Set output handler for this execution. Args: handler: OutputHandler instance. Use None to disable output. """ self.output_handler = handler
[docs] def send_output(self, routine_id: str, output_type: str, data: Dict[str, Any]) -> None: """Send output from routine. This method is called by routines to send execution-specific data. The output includes execution context (job_id, routine_id, timestamp). Args: routine_id: ID of the routine generating the output. output_type: Type of output (e.g., 'user_data', 'status', 'result'). data: Output data dictionary (user-defined structure). Note: This method was renamed from emit_output() to avoid confusion with Routine.emit() which emits events to connected slots. Use Routine.send_output() for convenient access. """ timestamp = datetime.now() # Call output_handler if set if self.output_handler: try: self.output_handler.handle( job_id=self.job_id, routine_id=routine_id, output_type=output_type, data=data, timestamp=timestamp, ) except Exception as e: # Ignore output handler failures import warnings warnings.warn(f"Output handler failed: {e}") # Optionally save to output_log for persistence self.output_log.append( { "routine_id": routine_id, "output_type": output_type, "data": data, "timestamp": timestamp.isoformat(), } ) self.updated_at = timestamp
[docs] @staticmethod def wait_for_completion( flow: "Flow", job_state: "JobState", timeout: Optional[float] = None, stability_checks: int = 5, check_interval: float = 0.1, stability_delay: float = 0.05, progress_callback: Optional[Callable[[int, int, str], None]] = None, ) -> bool: """Wait for flow execution to complete. Args: flow: Flow object to wait for. job_state: JobState object to monitor. timeout: Maximum time to wait in seconds. None for no timeout. stability_checks: Number of consecutive checks required for stability. check_interval: Interval between checks in seconds. stability_delay: Delay between stability checks in seconds. progress_callback: Optional callback function called periodically with (queue_size, active_count, status) tuple. Returns: True if execution completed before timeout, False if timeout occurred. Examples: Basic usage: >>> job_state = flow.execute(entry_routine_id, entry_params) >>> completed = JobState.wait_for_completion(flow, job_state, timeout=300.0) >>> if completed: ... print("Execution completed successfully") """ checker = _ExecutionCompletionChecker( flow=flow, job_state=job_state, stability_checks=stability_checks, check_interval=check_interval, stability_delay=stability_delay, ) start_time = time.time() last_progress_time = 0.0 progress_interval = 5.0 while True: if timeout is not None: elapsed = time.time() - start_time if elapsed >= timeout: logger.warning( f"Execution completion wait timed out after {timeout} seconds. " f"Queue size: {flow._task_queue.qsize()}, " f"Active tasks: {len([f for f in flow._active_tasks if not f.done()])}, " f"Status: {job_state.status}" ) return False if checker.check_with_stability(): logger.debug("Execution completed successfully") if job_state.status == "running": # Before setting to "completed", check if there are any critical failures # in routine states (not just errors in execution history) # We need to distinguish between: # 1. Critical failures: routine status is "failed" or "error" (not "error_continued") # 2. Tolerated errors: routine status is "error_continued" (CONTINUE strategy) # 3. Slot handler errors: only in execution history, routine status not "failed" has_critical_failure = False # Check routine states for critical failure status # Only "failed" or "error" (not "error_continued") indicate critical failures for routine_id, routine_state in job_state.routine_states.items(): if isinstance(routine_state, dict): status = routine_state.get("status") # "error_continued" means error was tolerated (CONTINUE strategy) # Only "failed" or "error" indicate critical failures if status in ["failed", "error"]: has_critical_failure = True logger.debug( f"Critical failure detected in routine state: {routine_id} has status {status}" ) break if has_critical_failure: logger.warning( "Execution completed but critical failures detected in routine states. " "Setting status to 'failed' instead of 'completed'." ) job_state.status = "failed" else: # No critical failures: errors in execution history are tolerated # (either CONTINUE strategy or slot handler errors that were caught) job_state.status = "completed" return True if progress_callback is not None: current_time = time.time() if current_time - last_progress_time >= progress_interval: queue_size = flow._task_queue.qsize() with flow._execution_lock: active_count = len([f for f in flow._active_tasks if not f.done()]) progress_callback(queue_size, active_count, job_state.status) last_progress_time = current_time time.sleep(check_interval)
[docs] @staticmethod def wait_until_condition( flow: "Flow", job_state: "JobState", condition: Callable[["Flow", "JobState"], bool], timeout: Optional[float] = None, check_interval: float = 0.1, progress_callback: Optional[Callable[[int, int, str], None]] = None, ) -> bool: """Wait until a condition is met. This method continuously checks a condition function until it returns True, or until a timeout occurs. Useful for waiting for specific states like retry count reaching a threshold. Args: flow: Flow object to monitor. job_state: JobState object to monitor. condition: Callable that takes (flow, job_state) and returns bool. Should return True when the condition is met. timeout: Maximum time to wait in seconds. None for no timeout. check_interval: Interval between checks in seconds. progress_callback: Optional callback function called periodically with (queue_size, active_count, status) tuple. Returns: True if condition was met before timeout, False if timeout occurred. Examples: Wait until retry count reaches 2: >>> def condition(flow, job_state): ... retry_count = flow.get_routine_retry_count("processor") ... return retry_count is not None and retry_count >= 2 >>> JobState.wait_until_condition(flow, job_state, condition, timeout=30.0) Wait until a specific routine completes: >>> def condition(flow, job_state): ... state = job_state.get_routine_state("validator") ... return state is not None and state.get("status") == "completed" >>> JobState.wait_until_condition(flow, job_state, condition) """ start_time = time.time() last_progress_time = 0.0 progress_interval = 5.0 while True: if timeout is not None: elapsed = time.time() - start_time if elapsed >= timeout: logger.warning( f"Condition wait timed out after {timeout} seconds. " f"Status: {job_state.status}" ) return False # Check if condition is met try: if condition(flow, job_state): logger.debug("Condition met successfully") return True except Exception as e: logger.warning(f"Error checking condition: {e}") # Continue waiting even if condition check fails # Check if execution completed (condition might never be met) if job_state.status in ["completed", "failed", "cancelled"]: logger.debug( f"Execution completed with status '{job_state.status}' before condition was met" ) # Check condition one more time before returning try: if condition(flow, job_state): return True except Exception: pass return False # Call progress callback if provided if progress_callback is not None: current_time = time.time() if current_time - last_progress_time >= progress_interval: try: queue_size = flow._task_queue.qsize() except AttributeError: queue_size = 0 try: with flow._execution_lock: active_count = len([f for f in flow._active_tasks if not f.done()]) except AttributeError: active_count = 0 progress_callback(queue_size, active_count, job_state.status) last_progress_time = current_time time.sleep(check_interval)
class _ExecutionCompletionChecker: """Execution completion checker (internal).""" def __init__( self, flow: "Flow", job_state: "JobState", stability_checks: int = 5, check_interval: float = 0.1, stability_delay: float = 0.05, ): self.flow = flow self.job_state = job_state self.stability_checks = stability_checks self.check_interval = check_interval self.stability_delay = stability_delay def is_complete(self) -> bool: """Check if execution is complete.""" if self.job_state.status in ["paused", "cancelled"]: return True if self.job_state.status == "failed": return True queue_empty = self.flow._task_queue.empty() with self.flow._execution_lock: active_tasks = [f for f in self.flow._active_tasks if not f.done()] active_count = len(active_tasks) if queue_empty and active_count == 0: if self.job_state.status in ["completed", "failed", "paused", "cancelled"]: return True if self.job_state.status == "running": return True return False def check_with_stability(self) -> bool: """Check completion with stability verification.""" for _ in range(self.stability_checks): if not self.is_complete(): return False time.sleep(self.stability_delay) return True