Execution Completion API¶
The execution completion functionality has been moved to JobState.wait_for_completion(),
a static method of the JobState class. This provides better encapsulation and aligns
with the design where JobState manages execution state.
Note
The completion API is now part of JobState. See JobState: Execution State Management for details.
JobState.wait_for_completion()¶
The JobState.wait_for_completion() static method provides a robust, systematic way
to wait for flow execution to complete.
Features:
Completion Detection: Checks if execution is complete by verifying: - Task queue is empty (no pending tasks) - No active tasks (all running tasks finished) - This works even if
job_state.statusis still"running"Stability Verification: Performs multiple consecutive checks to avoid race conditions where tasks might be enqueued between checks
Timeout Safety: The timeout parameter serves as a safety limit. If execution doesn’t complete within the timeout, the function returns
Falseand the caller should handle the timeout (e.g., force stop the event loop)Long-running Tasks: Supports long-running tasks (e.g., LLM calls) with configurable timeout
Event Loop Management: Handles edge cases where event loop might exit prematurely
Progress Monitoring: Optional progress callback for monitoring execution status
Example:
from routilux.job_state import JobState
job_state = flow.execute(entry_routine_id="routine1")
def progress_callback(queue_size, active_count, status):
print(f"Queue: {queue_size}, Active: {active_count}, Status: {status}")
completed = JobState.wait_for_completion(
flow=flow,
job_state=job_state,
timeout=300.0,
progress_callback=progress_callback
)
if completed:
print("Execution completed successfully")
else:
print("Execution timed out")
Internal Implementation:
The JobState.wait_for_completion() method uses an internal _ExecutionCompletionChecker
class to perform stability checks. This class is not part of the public API but is used
internally by the static method.
ensure_event_loop_running¶
The ensure_event_loop_running() function ensures the event loop is running,
restarting it if needed when there are tasks in the queue.
Example:
from routilux.flow.completion import ensure_event_loop_running
# Check and restart event loop if needed
if ensure_event_loop_running(flow):
print("Event loop is running")