Flow API¶
The Flow module has been refactored into a modular structure for better maintainability.
The main Flow class and related components are organized as follows:
Main Flow Class¶
Flow module.
This module contains the Flow class and related components for workflow orchestration.
- class routilux.flow.Flow(flow_id=None, execution_strategy='sequential', max_workers=5, execution_timeout=None)[source]¶
Bases:
SerializableFlow manager for orchestrating workflow execution.
A Flow is a container that manages multiple Routine nodes and their connections, providing workflow orchestration capabilities including execution, error handling, state management, and persistence.
- Key Responsibilities:
Routine Management: Add, organize, and track routines in the workflow
Connection Management: Link routines via events and slots
Execution Control: Execute workflows sequentially or concurrently
Error Handling: Apply error handling strategies at flow or routine level
State Management: Track execution state via JobState
Persistence: Serialize and restore flow state for resumption
- Execution Modes:
Sequential: Routines execute one at a time in dependency order. Suitable for workflows with dependencies or when order matters.
Concurrent: Independent routines execute in parallel using threads. Suitable for independent operations that can run simultaneously. Use max_workers to control parallelism.
- Error Handling:
Error handlers can be set at two levels: 1. Flow-level: Default handler for all routines (set_error_handler()) 2. Routine-level: Override for specific routines (routine.set_error_handler())
Priority: Routine-level > Flow-level > Default (STOP)
Examples
- Basic workflow:
>>> flow = Flow() >>> routine1 = DataProcessor() >>> routine2 = DataValidator() >>> id1 = flow.add_routine(routine1, "processor") >>> id2 = flow.add_routine(routine2, "validator") >>> flow.connect(id1, "output", id2, "input") >>> job_state = flow.execute(id1, entry_params={"data": "test"})
- Concurrent execution:
>>> flow = Flow(execution_strategy="concurrent", max_workers=5) >>> # Add routines and connections... >>> job_state = flow.execute(entry_id) >>> flow.wait_for_completion() # Wait for all threads >>> flow.shutdown() # Clean up thread pool
- Error handling:
>>> from routilux import ErrorHandler, ErrorStrategy >>> flow.set_error_handler(ErrorHandler(strategy=ErrorStrategy.CONTINUE)) >>> # Or set per-routine: >>> routine.set_as_critical(max_retries=3)
- __init__(flow_id=None, execution_strategy='sequential', max_workers=5, execution_timeout=None)[source]¶
Initialize Flow.
- Parameters:
flow_id (str | None) – Flow identifier (auto-generated if None).
execution_strategy (str) – Execution strategy, “sequential” or “concurrent”.
max_workers (int) – Maximum number of worker threads for concurrent execution.
execution_timeout (float | None) – Default timeout for execution completion in seconds. None for no timeout (default: 300.0 seconds).
- add_routine(routine, routine_id=None)[source]¶
Add a routine to the flow.
- Parameters:
- Returns:
The routine ID used.
- Raises:
ValueError – If routine_id already exists in the flow.
- Return type:
- cancel(job_state, reason='')[source]¶
Cancel execution.
- Parameters:
- Raises:
ValueError – If job_state flow_id doesn’t match.
- connect(source_routine_id, source_event, target_routine_id, target_slot, param_mapping=None)[source]¶
Connect two routines by linking a source event to a target slot.
- Parameters:
source_routine_id (str) – Identifier of the routine that emits the event.
source_event (str) – Name of the event to connect from.
target_routine_id (str) – Identifier of the routine that receives the data.
target_slot (str) – Name of the slot to connect to.
param_mapping (dict[str, str] | None) – Optional dictionary mapping event parameter names to slot parameter names.
- Returns:
Connection object representing this connection.
- Raises:
ValueError – If any of the required components don’t exist.
- Return type:
- execute(entry_routine_id, entry_params=None, execution_strategy=None, timeout=None)[source]¶
Execute the flow starting from the specified entry routine.
- Parameters:
entry_routine_id (str) – Identifier of the routine to start execution from.
entry_params (dict[str, Any] | None) – Optional dictionary of parameters to pass to the entry routine’s trigger slot.
execution_strategy (str | None) – Optional execution strategy override.
timeout (float | None) – Optional timeout for execution completion in seconds. If None, uses flow.execution_timeout (default: 300.0 seconds).
- Returns:
JobState object containing execution status and state.
- Raises:
ValueError – If entry_routine_id does not exist in the flow.
- Return type:
- find_routines_by_type(routine_type)[source]¶
Find routines by type.
- Parameters:
routine_type (type) – Type of routine to find (e.g., DataProcessRoutine).
- Returns:
List of (routine_id, routine) tuples matching the type.
- Return type:
Examples
- Find all DataProcessRoutine instances:
>>> routines = flow.find_routines_by_type(DataProcessRoutine) >>> for routine_id, routine in routines: ... print(f"Found {routine_id}: {routine}")
- get_routine_retry_count(routine_id)[source]¶
Get retry count for a routine.
- Parameters:
routine_id (str) – Routine identifier.
- Returns:
Retry count if routine has error handler with retry strategy, None otherwise.
- Return type:
int | None
Examples
- Get retry count for a routine:
>>> retry_count = flow.get_routine_retry_count("processor") >>> if retry_count is not None: ... print(f"Current retry count: {retry_count}")
- pause(job_state, reason='', checkpoint=None)[source]¶
Pause execution.
- Parameters:
- Raises:
ValueError – If job_state flow_id doesn’t match.
- resume(job_state)[source]¶
Resume execution from paused or saved state.
- Parameters:
job_state (JobState) – JobState to resume.
- Returns:
Updated JobState.
- Raises:
ValueError – If job_state flow_id doesn’t match or routine doesn’t exist.
- Return type:
- serialize()[source]¶
Serialize Flow, including all routines and connections.
- Returns:
Serialized dictionary containing flow data (structure only, no execution state).
- Raises:
TypeError – If any Serializable object in the Flow cannot be constructed without arguments.
- Return type:
Note
Flow serialization only includes structure (routines, connections, config). Execution state (JobState) must be serialized separately: 1. Serialize Flow: flow_data = flow.serialize() 2. Serialize JobState: job_state_data = job_state.serialize() 3. Deserialize both on target host 4. Use flow.resume(job_state) to continue execution
- set_error_handler(error_handler)[source]¶
Set error handler for the flow.
- Parameters:
error_handler (ErrorHandler) – ErrorHandler object.
- wait_for_completion(timeout=None, job_state=None)[source]¶
Wait for all tasks to complete.
Deprecated since version This: method is deprecated. Use
JobState.wait_for_completion()instead for proper error detection and state management.For proper completion detection with error checking, use:
from routilux.job_state import JobState completed = JobState.wait_for_completion(flow, job_state, timeout=timeout)
- Parameters:
- Returns:
True if all tasks completed before timeout, False otherwise.
- Return type:
- class routilux.flow.TaskPriority(value)[source]¶
Bases:
EnumTask priority for queue scheduling.
- HIGH = 1¶
- NORMAL = 2¶
- LOW = 3¶
- class routilux.flow.SlotActivationTask(slot, data, connection=None, priority=TaskPriority.NORMAL, retry_count=0, max_retries=0, created_at=None, job_state=None)[source]¶
Bases:
objectSlot activation task for queue-based execution.
Each task is associated with a JobState to track execution state. This allows tasks executed in worker threads to access and update the correct JobState, even when running concurrently.
- __init__(slot, data, connection=None, priority=TaskPriority.NORMAL, retry_count=0, max_retries=0, created_at=None, job_state=None)¶
- connection: Connection | None = None¶
- priority: TaskPriority = 2¶
Flow Submodules¶
The Flow functionality is organized into the following submodules:
- flow.flow
Main Flow class that orchestrates workflow execution.
- flow.task
Task-related classes including
TaskPriorityenum andSlotActivationTaskdataclass.- flow.execution
Execution logic for sequential and concurrent workflow execution.
- flow.event_loop
Event loop and task queue management.
- flow.error_handling
Error handling logic for task errors and error handler resolution.
- flow.state_management
State management including pause, resume, cancel, and task serialization.
- flow.dependency
Dependency graph building and querying.
- flow.serialization
Flow serialization and deserialization logic.
- flow.completion
Execution completion detection and waiting mechanism. Provides systematic completion checking to avoid race conditions and handle long-running tasks.
For most use cases, you only need to import from the main routilux.flow module:
from routilux import Flow
from routilux.flow import TaskPriority, SlotActivationTask # If needed
The submodules are internal implementation details and typically don’t need to be imported directly.