JobState API

JobState and ExecutionRecord classes.

Used for recording flow execution state.

class routilux.job_state.ExecutionRecord(routine_id='', event_name='', data=None, timestamp=None)[source]

Bases: Serializable

Execution record for a single routine execution.

Captures information about when and how a routine was executed, including parameters, timestamp, and event information.

__init__(routine_id='', event_name='', data=None, timestamp=None)[source]

Initialize ExecutionRecord.

Parameters:
  • routine_id (str) – Routine identifier.

  • event_name (str) – Event name.

  • data (Dict[str, Any] | None) – Transmitted data.

  • timestamp (datetime | None) – Timestamp (uses current time if None).

__repr__()[source]

Return string representation of the ExecutionRecord.

serialize()[source]

Serialize, handling datetime conversion.

deserialize(data)[source]

Deserialize, handling datetime conversion.

class routilux.job_state.JobState(flow_id='')[source]

Bases: Serializable

Job state for tracking flow execution.

JobState maintains comprehensive state information about flow execution, including routine states, execution history, pause points, and overall execution status. It provides a complete snapshot of workflow execution that can be serialized, persisted, and used for resumption.

Key Responsibilities:
  • Status Tracking: Monitor overall flow execution status

  • Routine States: Track individual routine execution states

  • Execution History: Record all routine executions with timestamps

  • Pause Points: Support execution pausing and resumption

  • State Queries: Provide methods to query execution state

Status Values:
  • “pending”: Flow created but not yet started

  • “running”: Flow execution in progress

  • “paused”: Flow execution paused (can be resumed)

  • “completed”: Flow execution completed successfully

  • “failed”: Flow execution failed due to error

  • “cancelled”: Flow execution cancelled by user

Routine State Values:
  • “pending”: Routine not yet executed

  • “running”: Routine execution in progress

  • “completed”: Routine executed successfully

  • “failed”: Routine execution failed

  • “error_continued”: Routine failed but execution continued (CONTINUE strategy)

  • “skipped”: Routine was skipped (SKIP strategy)

Examples

Basic usage:
>>> job_state = flow.execute(entry_routine_id)
>>> print(job_state.status)  # "completed" or "failed"
>>> routine_state = job_state.get_routine_state(routine_id)
>>> print(routine_state["status"])  # "completed"
Query execution history:
>>> history = job_state.get_execution_history()
>>> for record in history:
...     print(f"{record.routine_id} emitted {record.event_name}")
Check specific routine:
>>> routine_state = job_state.get_routine_state("my_routine")
>>> if routine_state and routine_state["status"] == "completed":
...     print("Routine completed successfully")
__init__(flow_id='')[source]

Initialize JobState.

Parameters:

flow_id (str) – Flow identifier.

__repr__()[source]

Return string representation of the JobState.

update_routine_state(routine_id, state)[source]

Update state for a specific routine.

This method updates or sets the execution state for a routine. The state dictionary typically contains information like: - “status”: Execution status (“completed”, “failed”, etc.) - “error”: Error message if execution failed - “result”: Execution result (if any) - Custom state information added by routines

Parameters:
  • routine_id (str) – Unique identifier of the routine in the flow. Must match the ID used when adding the routine to the flow.

  • state (Dict[str, Any]) – Dictionary containing routine state information. Common keys: - “status”: str - Execution status - “error”: str - Error message (if failed) - “result”: Any - Execution result - Custom keys as needed The dictionary is copied, so modifications to the original won’t affect the stored state.

Side Effects:
  • Updates routine_states[routine_id] with the new state

  • Updates updated_at timestamp

Examples

Update routine status:
>>> job_state.update_routine_state("my_routine", {
...     "status": "completed",
...     "result": "success"
... })
Mark routine as failed:
>>> job_state.update_routine_state("my_routine", {
...     "status": "failed",
...     "error": "Connection timeout"
... })
get_routine_state(routine_id)[source]

Get execution state for a specific routine.

This method retrieves the current execution state of a routine. Returns None if the routine hasn’t been executed or doesn’t exist.

Parameters:

routine_id (str) – Unique identifier of the routine in the flow. Must match the ID used when adding the routine to the flow.

Returns:

  • Routine hasn’t been executed yet

  • Routine ID doesn’t exist in the flow

  • State hasn’t been set yet

Common keys in returned dictionary: - “status”: str - Execution status (“completed”, “failed”, etc.) - “error”: str - Error message (if execution failed) - “result”: Any - Execution result (if any) - Custom keys added by routines

Return type:

Dictionary containing routine state information, or None if

Examples

Check if routine completed:
>>> state = job_state.get_routine_state("my_routine")
>>> if state and state.get("status") == "completed":
...     print("Routine completed successfully")
Get execution result:
>>> state = job_state.get_routine_state("processor")
>>> if state:
...     result = state.get("result")
...     print(f"Result: {result}")
Check for errors:
>>> state = job_state.get_routine_state("validator")
>>> if state and "error" in state:
...     print(f"Error: {state['error']}")
record_execution(routine_id, event_name, data)[source]

Record an execution event in the execution history.

This method creates an ExecutionRecord and adds it to the execution history. The history provides a chronological log of all routine executions, event emissions, and data transmissions.

Execution history is useful for: - Debugging: Trace data flow through the workflow - Monitoring: Track which routines executed and when - Analysis: Understand execution patterns and performance - Auditing: Maintain a record of all operations

Parameters:
  • routine_id (str) – Unique identifier of the routine that executed. This is the routine that emitted the event or completed execution.

  • event_name (str) – Name of the event that was emitted, or action name. Common values: event names like “output”, “result”, or special actions like “error_continued”, “skipped”.

  • data (Dict[str, Any]) – Dictionary of data transmitted or associated with the execution. This typically contains the parameters passed to the event, or error information for error records. Example: {“result”: “success”, “count”: 42}

Side Effects:
  • Creates a new ExecutionRecord with current timestamp

  • Appends record to execution_history list

  • Updates updated_at timestamp

Examples

Record event emission:
>>> job_state.record_execution(
...     "processor",
...     "output",
...     {"result": "processed", "count": 10}
... )
Record error continuation:
>>> job_state.record_execution(
...     "optional_routine",
...     "error_continued",
...     {"error": "Service unavailable", "error_type": "ConnectionError"}
... )
get_execution_history(routine_id=None)[source]

Get execution history, optionally filtered by routine.

This method returns the execution history, which is a chronological list of all routine executions and event emissions. You can filter to get history for a specific routine.

Parameters:

routine_id (str | None) – Optional routine identifier to filter history. If provided, returns only ExecutionRecords for this routine. If None, returns all execution records.

Returns:

List of ExecutionRecord objects, sorted by timestamp (chronological order). Each ExecutionRecord contains: - routine_id: str - Routine that executed - event_name: str - Event emitted or action name - data: Dict[str, Any] - Data transmitted - timestamp: datetime - When the execution occurred

Return type:

List[ExecutionRecord]

Examples

Get all execution history:
>>> history = job_state.get_execution_history()
>>> for record in history:
...     print(f"{record.timestamp}: {record.routine_id} -> {record.event_name}")
Get history for specific routine:
>>> processor_history = job_state.get_execution_history("processor")
>>> for record in processor_history:
...     print(f"Event: {record.event_name}, Data: {record.data}")
Find error records:
>>> history = job_state.get_execution_history()
>>> errors = [r for r in history if "error" in r.event_name]
>>> print(f"Found {len(errors)} error records")
save(filepath)[source]

Persist state to file.

Parameters:

filepath (str) – File path.

classmethod load(filepath)[source]

Load state from file.

Parameters:

filepath (str) – File path.

Returns:

JobState object.

Raises:
Return type:

JobState

serialize()[source]

Serialize, handling datetime and ExecutionRecord.

deserialize(data)[source]

Deserialize, handling datetime and ExecutionRecord.

add_deferred_event(routine_id, event_name, data)[source]

Add a deferred event to be emitted on resume.

Parameters:
  • routine_id (str) – ID of the routine that will emit the event.

  • event_name (str) – Name of the event to emit.

  • data (Dict[str, Any]) – Data to pass to the event.

update_shared_data(key, value)[source]

Update shared data.

Thread-safe in CPython (GIL protected).

Parameters:
  • key (str) – Key to update.

  • value (Any) – Value to set.

get_shared_data(key, default=None)[source]

Get shared data.

Parameters:
  • key (str) – Key to get.

  • default (Any) – Default value if key doesn’t exist.

Returns:

Value for key, or default if key doesn’t exist.

Return type:

Any

append_to_shared_log(entry)[source]

Append entry to shared log.

Thread-safe in CPython (GIL protected).

Parameters:

entry (Dict[str, Any]) – Dictionary to append to log.

get_shared_log(filter_func=None)[source]

Get shared log, optionally filtered.

Parameters:

filter_func (Callable[[Dict[str, Any]], bool] | None) – Optional function to filter entries. Should return True to include entry.

Returns:

List of log entries.

Return type:

List[Dict[str, Any]]

set_output_handler(handler)[source]

Set output handler for this execution.

Parameters:

handler (Any) – OutputHandler instance. Use None to disable output.

send_output(routine_id, output_type, data)[source]

Send output from routine.

This method is called by routines to send execution-specific data. The output includes execution context (job_id, routine_id, timestamp).

Parameters:
  • routine_id (str) – ID of the routine generating the output.

  • output_type (str) – Type of output (e.g., ‘user_data’, ‘status’, ‘result’).

  • data (Dict[str, Any]) – Output data dictionary (user-defined structure).

Note

This method was renamed from emit_output() to avoid confusion with Routine.emit() which emits events to connected slots. Use Routine.send_output() for convenient access.

static wait_for_completion(flow, job_state, timeout=None, stability_checks=5, check_interval=0.1, stability_delay=0.05, progress_callback=None)[source]

Wait for flow execution to complete.

Parameters:
  • flow (Flow) – Flow object to wait for.

  • job_state (JobState) – JobState object to monitor.

  • timeout (float | None) – Maximum time to wait in seconds. None for no timeout.

  • stability_checks (int) – Number of consecutive checks required for stability.

  • check_interval (float) – Interval between checks in seconds.

  • stability_delay (float) – Delay between stability checks in seconds.

  • progress_callback (Callable[[int, int, str], None] | None) – Optional callback function called periodically with (queue_size, active_count, status) tuple.

Returns:

True if execution completed before timeout, False if timeout occurred.

Return type:

bool

Examples

Basic usage:
>>> job_state = flow.execute(entry_routine_id, entry_params)
>>> completed = JobState.wait_for_completion(flow, job_state, timeout=300.0)
>>> if completed:
...     print("Execution completed successfully")
static wait_until_condition(flow, job_state, condition, timeout=None, check_interval=0.1, progress_callback=None)[source]

Wait until a condition is met.

This method continuously checks a condition function until it returns True, or until a timeout occurs. Useful for waiting for specific states like retry count reaching a threshold.

Parameters:
  • flow (Flow) – Flow object to monitor.

  • job_state (JobState) – JobState object to monitor.

  • condition (Callable[[Flow, JobState], bool]) – Callable that takes (flow, job_state) and returns bool. Should return True when the condition is met.

  • timeout (float | None) – Maximum time to wait in seconds. None for no timeout.

  • check_interval (float) – Interval between checks in seconds.

  • progress_callback (Callable[[int, int, str], None] | None) – Optional callback function called periodically with (queue_size, active_count, status) tuple.

Returns:

True if condition was met before timeout, False if timeout occurred.

Return type:

bool

Examples

Wait until retry count reaches 2:
>>> def condition(flow, job_state):
...     retry_count = flow.get_routine_retry_count("processor")
...     return retry_count is not None and retry_count >= 2
>>> JobState.wait_until_condition(flow, job_state, condition, timeout=30.0)
Wait until a specific routine completes:
>>> def condition(flow, job_state):
...     state = job_state.get_routine_state("validator")
...     return state is not None and state.get("status") == "completed"
>>> JobState.wait_until_condition(flow, job_state, condition)