JobState API¶
JobState and ExecutionRecord classes.
Used for recording flow execution state.
- class routilux.job_state.ExecutionRecord(routine_id='', event_name='', data=None, timestamp=None)[source]¶
Bases:
SerializableExecution record for a single routine execution.
Captures information about when and how a routine was executed, including parameters, timestamp, and event information.
- class routilux.job_state.JobState(flow_id='')[source]¶
Bases:
SerializableJob state for tracking flow execution.
JobState maintains comprehensive state information about flow execution, including routine states, execution history, pause points, and overall execution status. It provides a complete snapshot of workflow execution that can be serialized, persisted, and used for resumption.
- Key Responsibilities:
Status Tracking: Monitor overall flow execution status
Routine States: Track individual routine execution states
Execution History: Record all routine executions with timestamps
Pause Points: Support execution pausing and resumption
State Queries: Provide methods to query execution state
- Status Values:
“pending”: Flow created but not yet started
“running”: Flow execution in progress
“paused”: Flow execution paused (can be resumed)
“completed”: Flow execution completed successfully
“failed”: Flow execution failed due to error
“cancelled”: Flow execution cancelled by user
- Routine State Values:
“pending”: Routine not yet executed
“running”: Routine execution in progress
“completed”: Routine executed successfully
“failed”: Routine execution failed
“error_continued”: Routine failed but execution continued (CONTINUE strategy)
“skipped”: Routine was skipped (SKIP strategy)
Examples
- Basic usage:
>>> job_state = flow.execute(entry_routine_id) >>> print(job_state.status) # "completed" or "failed" >>> routine_state = job_state.get_routine_state(routine_id) >>> print(routine_state["status"]) # "completed"
- Query execution history:
>>> history = job_state.get_execution_history() >>> for record in history: ... print(f"{record.routine_id} emitted {record.event_name}")
- Check specific routine:
>>> routine_state = job_state.get_routine_state("my_routine") >>> if routine_state and routine_state["status"] == "completed": ... print("Routine completed successfully")
- update_routine_state(routine_id, state)[source]¶
Update state for a specific routine.
This method updates or sets the execution state for a routine. The state dictionary typically contains information like: - “status”: Execution status (“completed”, “failed”, etc.) - “error”: Error message if execution failed - “result”: Execution result (if any) - Custom state information added by routines
- Parameters:
routine_id (str) – Unique identifier of the routine in the flow. Must match the ID used when adding the routine to the flow.
state (Dict[str, Any]) – Dictionary containing routine state information. Common keys: - “status”: str - Execution status - “error”: str - Error message (if failed) - “result”: Any - Execution result - Custom keys as needed The dictionary is copied, so modifications to the original won’t affect the stored state.
- Side Effects:
Updates routine_states[routine_id] with the new state
Updates updated_at timestamp
Examples
- Update routine status:
>>> job_state.update_routine_state("my_routine", { ... "status": "completed", ... "result": "success" ... })
- Mark routine as failed:
>>> job_state.update_routine_state("my_routine", { ... "status": "failed", ... "error": "Connection timeout" ... })
- get_routine_state(routine_id)[source]¶
Get execution state for a specific routine.
This method retrieves the current execution state of a routine. Returns None if the routine hasn’t been executed or doesn’t exist.
- Parameters:
routine_id (str) – Unique identifier of the routine in the flow. Must match the ID used when adding the routine to the flow.
- Returns:
Routine hasn’t been executed yet
Routine ID doesn’t exist in the flow
State hasn’t been set yet
Common keys in returned dictionary: - “status”: str - Execution status (“completed”, “failed”, etc.) - “error”: str - Error message (if execution failed) - “result”: Any - Execution result (if any) - Custom keys added by routines
- Return type:
Dictionary containing routine state information, or None if
Examples
- Check if routine completed:
>>> state = job_state.get_routine_state("my_routine") >>> if state and state.get("status") == "completed": ... print("Routine completed successfully")
- Get execution result:
>>> state = job_state.get_routine_state("processor") >>> if state: ... result = state.get("result") ... print(f"Result: {result}")
- Check for errors:
>>> state = job_state.get_routine_state("validator") >>> if state and "error" in state: ... print(f"Error: {state['error']}")
- record_execution(routine_id, event_name, data)[source]¶
Record an execution event in the execution history.
This method creates an ExecutionRecord and adds it to the execution history. The history provides a chronological log of all routine executions, event emissions, and data transmissions.
Execution history is useful for: - Debugging: Trace data flow through the workflow - Monitoring: Track which routines executed and when - Analysis: Understand execution patterns and performance - Auditing: Maintain a record of all operations
- Parameters:
routine_id (str) – Unique identifier of the routine that executed. This is the routine that emitted the event or completed execution.
event_name (str) – Name of the event that was emitted, or action name. Common values: event names like “output”, “result”, or special actions like “error_continued”, “skipped”.
data (Dict[str, Any]) – Dictionary of data transmitted or associated with the execution. This typically contains the parameters passed to the event, or error information for error records. Example: {“result”: “success”, “count”: 42}
- Side Effects:
Creates a new ExecutionRecord with current timestamp
Appends record to execution_history list
Updates updated_at timestamp
Examples
- Record event emission:
>>> job_state.record_execution( ... "processor", ... "output", ... {"result": "processed", "count": 10} ... )
- Record error continuation:
>>> job_state.record_execution( ... "optional_routine", ... "error_continued", ... {"error": "Service unavailable", "error_type": "ConnectionError"} ... )
- get_execution_history(routine_id=None)[source]¶
Get execution history, optionally filtered by routine.
This method returns the execution history, which is a chronological list of all routine executions and event emissions. You can filter to get history for a specific routine.
- Parameters:
routine_id (str | None) – Optional routine identifier to filter history. If provided, returns only ExecutionRecords for this routine. If None, returns all execution records.
- Returns:
List of ExecutionRecord objects, sorted by timestamp (chronological order). Each ExecutionRecord contains: - routine_id: str - Routine that executed - event_name: str - Event emitted or action name - data: Dict[str, Any] - Data transmitted - timestamp: datetime - When the execution occurred
- Return type:
Examples
- Get all execution history:
>>> history = job_state.get_execution_history() >>> for record in history: ... print(f"{record.timestamp}: {record.routine_id} -> {record.event_name}")
- Get history for specific routine:
>>> processor_history = job_state.get_execution_history("processor") >>> for record in processor_history: ... print(f"Event: {record.event_name}, Data: {record.data}")
- Find error records:
>>> history = job_state.get_execution_history() >>> errors = [r for r in history if "error" in r.event_name] >>> print(f"Found {len(errors)} error records")
- classmethod load(filepath)[source]¶
Load state from file.
- Parameters:
filepath (str) – File path.
- Returns:
JobState object.
- Raises:
FileNotFoundError – If file does not exist.
ValueError – If file format is incorrect.
- Return type:
- add_deferred_event(routine_id, event_name, data)[source]¶
Add a deferred event to be emitted on resume.
Update shared data.
Thread-safe in CPython (GIL protected).
Get shared data.
Append entry to shared log.
Thread-safe in CPython (GIL protected).
Get shared log, optionally filtered.
- set_output_handler(handler)[source]¶
Set output handler for this execution.
- Parameters:
handler (Any) – OutputHandler instance. Use None to disable output.
- send_output(routine_id, output_type, data)[source]¶
Send output from routine.
This method is called by routines to send execution-specific data. The output includes execution context (job_id, routine_id, timestamp).
- Parameters:
Note
This method was renamed from emit_output() to avoid confusion with Routine.emit() which emits events to connected slots. Use Routine.send_output() for convenient access.
- static wait_for_completion(flow, job_state, timeout=None, stability_checks=5, check_interval=0.1, stability_delay=0.05, progress_callback=None)[source]¶
Wait for flow execution to complete.
- Parameters:
flow (Flow) – Flow object to wait for.
job_state (JobState) – JobState object to monitor.
timeout (float | None) – Maximum time to wait in seconds. None for no timeout.
stability_checks (int) – Number of consecutive checks required for stability.
check_interval (float) – Interval between checks in seconds.
stability_delay (float) – Delay between stability checks in seconds.
progress_callback (Callable[[int, int, str], None] | None) – Optional callback function called periodically with (queue_size, active_count, status) tuple.
- Returns:
True if execution completed before timeout, False if timeout occurred.
- Return type:
Examples
- Basic usage:
>>> job_state = flow.execute(entry_routine_id, entry_params) >>> completed = JobState.wait_for_completion(flow, job_state, timeout=300.0) >>> if completed: ... print("Execution completed successfully")
- static wait_until_condition(flow, job_state, condition, timeout=None, check_interval=0.1, progress_callback=None)[source]¶
Wait until a condition is met.
This method continuously checks a condition function until it returns True, or until a timeout occurs. Useful for waiting for specific states like retry count reaching a threshold.
- Parameters:
flow (Flow) – Flow object to monitor.
job_state (JobState) – JobState object to monitor.
condition (Callable[[Flow, JobState], bool]) – Callable that takes (flow, job_state) and returns bool. Should return True when the condition is met.
timeout (float | None) – Maximum time to wait in seconds. None for no timeout.
check_interval (float) – Interval between checks in seconds.
progress_callback (Callable[[int, int, str], None] | None) – Optional callback function called periodically with (queue_size, active_count, status) tuple.
- Returns:
True if condition was met before timeout, False if timeout occurred.
- Return type:
Examples
- Wait until retry count reaches 2:
>>> def condition(flow, job_state): ... retry_count = flow.get_routine_retry_count("processor") ... return retry_count is not None and retry_count >= 2 >>> JobState.wait_until_condition(flow, job_state, condition, timeout=30.0)
- Wait until a specific routine completes:
>>> def condition(flow, job_state): ... state = job_state.get_routine_state("validator") ... return state is not None and state.get("status") == "completed" >>> JobState.wait_until_condition(flow, job_state, condition)