Flow API

The Flow module has been refactored into a modular structure for better maintainability. The main Flow class and related components are organized as follows:

Main Flow Class

Flow module.

This module contains the Flow class and related components for workflow orchestration.

class routilux.flow.Flow(flow_id=None, execution_strategy='sequential', max_workers=5, execution_timeout=None)[source]

Bases: Serializable

Flow manager for orchestrating workflow execution.

A Flow is a container that manages multiple Routine nodes and their connections, providing workflow orchestration capabilities including execution, error handling, state management, and persistence.

Key Responsibilities:
  • Routine Management: Add, organize, and track routines in the workflow

  • Connection Management: Link routines via events and slots

  • Execution Control: Execute workflows sequentially or concurrently

  • Error Handling: Apply error handling strategies at flow or routine level

  • State Management: Track execution state via JobState

  • Persistence: Serialize and restore flow state for resumption

Execution Modes:
  • Sequential: Routines execute one at a time in dependency order. Suitable for workflows with dependencies or when order matters.

  • Concurrent: Independent routines execute in parallel using threads. Suitable for independent operations that can run simultaneously. Use max_workers to control parallelism.

Error Handling:

Error handlers can be set at two levels: 1. Flow-level: Default handler for all routines (set_error_handler()) 2. Routine-level: Override for specific routines (routine.set_error_handler())

Priority: Routine-level > Flow-level > Default (STOP)

Examples

Basic workflow:
>>> flow = Flow()
>>> routine1 = DataProcessor()
>>> routine2 = DataValidator()
>>> id1 = flow.add_routine(routine1, "processor")
>>> id2 = flow.add_routine(routine2, "validator")
>>> flow.connect(id1, "output", id2, "input")
>>> job_state = flow.execute(id1, entry_params={"data": "test"})
Concurrent execution:
>>> flow = Flow(execution_strategy="concurrent", max_workers=5)
>>> # Add routines and connections...
>>> job_state = flow.execute(entry_id)
>>> flow.wait_for_completion()  # Wait for all threads
>>> flow.shutdown()  # Clean up thread pool
Error handling:
>>> from routilux import ErrorHandler, ErrorStrategy
>>> flow.set_error_handler(ErrorHandler(strategy=ErrorStrategy.CONTINUE))
>>> # Or set per-routine:
>>> routine.set_as_critical(max_retries=3)
__init__(flow_id=None, execution_strategy='sequential', max_workers=5, execution_timeout=None)[source]

Initialize Flow.

Parameters:
  • flow_id (str | None) – Flow identifier (auto-generated if None).

  • execution_strategy (str) – Execution strategy, “sequential” or “concurrent”.

  • max_workers (int) – Maximum number of worker threads for concurrent execution.

  • execution_timeout (float | None) – Default timeout for execution completion in seconds. None for no timeout (default: 300.0 seconds).

__repr__()[source]

Return string representation of the Flow.

add_routine(routine, routine_id=None)[source]

Add a routine to the flow.

Parameters:
  • routine (Routine) – Routine instance to add.

  • routine_id (str | None) – Optional unique identifier for this routine in the flow.

Returns:

The routine ID used.

Raises:

ValueError – If routine_id already exists in the flow.

Return type:

str

cancel(job_state, reason='')[source]

Cancel execution.

Parameters:
  • job_state (JobState) – JobState to cancel.

  • reason (str) – Reason for cancellation.

Raises:

ValueError – If job_state flow_id doesn’t match.

connect(source_routine_id, source_event, target_routine_id, target_slot, param_mapping=None)[source]

Connect two routines by linking a source event to a target slot.

Parameters:
  • source_routine_id (str) – Identifier of the routine that emits the event.

  • source_event (str) – Name of the event to connect from.

  • target_routine_id (str) – Identifier of the routine that receives the data.

  • target_slot (str) – Name of the slot to connect to.

  • param_mapping (dict[str, str] | None) – Optional dictionary mapping event parameter names to slot parameter names.

Returns:

Connection object representing this connection.

Raises:

ValueError – If any of the required components don’t exist.

Return type:

Connection

deserialize(data)[source]

Deserialize Flow, restoring all routines and connections.

Parameters:

data (dict[str, Any]) – Serialized data dictionary.

execute(entry_routine_id, entry_params=None, execution_strategy=None, timeout=None)[source]

Execute the flow starting from the specified entry routine.

Parameters:
  • entry_routine_id (str) – Identifier of the routine to start execution from.

  • entry_params (dict[str, Any] | None) – Optional dictionary of parameters to pass to the entry routine’s trigger slot.

  • execution_strategy (str | None) – Optional execution strategy override.

  • timeout (float | None) – Optional timeout for execution completion in seconds. If None, uses flow.execution_timeout (default: 300.0 seconds).

Returns:

JobState object containing execution status and state.

Raises:

ValueError – If entry_routine_id does not exist in the flow.

Return type:

JobState

find_routines_by_type(routine_type)[source]

Find routines by type.

Parameters:

routine_type (type) – Type of routine to find (e.g., DataProcessRoutine).

Returns:

List of (routine_id, routine) tuples matching the type.

Return type:

list[tuple[str, Routine]]

Examples

Find all DataProcessRoutine instances:
>>> routines = flow.find_routines_by_type(DataProcessRoutine)
>>> for routine_id, routine in routines:
...     print(f"Found {routine_id}: {routine}")
get_routine_retry_count(routine_id)[source]

Get retry count for a routine.

Parameters:

routine_id (str) – Routine identifier.

Returns:

Retry count if routine has error handler with retry strategy, None otherwise.

Return type:

int | None

Examples

Get retry count for a routine:
>>> retry_count = flow.get_routine_retry_count("processor")
>>> if retry_count is not None:
...     print(f"Current retry count: {retry_count}")
pause(job_state, reason='', checkpoint=None)[source]

Pause execution.

Parameters:
  • job_state (JobState) – JobState to pause.

  • reason (str) – Reason for pausing.

  • checkpoint (dict[str, Any] | None) – Optional checkpoint data.

Raises:

ValueError – If job_state flow_id doesn’t match.

resume(job_state)[source]

Resume execution from paused or saved state.

Parameters:

job_state (JobState) – JobState to resume.

Returns:

Updated JobState.

Raises:

ValueError – If job_state flow_id doesn’t match or routine doesn’t exist.

Return type:

JobState

serialize()[source]

Serialize Flow, including all routines and connections.

Returns:

Serialized dictionary containing flow data (structure only, no execution state).

Raises:

TypeError – If any Serializable object in the Flow cannot be constructed without arguments.

Return type:

dict[str, Any]

Note

Flow serialization only includes structure (routines, connections, config). Execution state (JobState) must be serialized separately: 1. Serialize Flow: flow_data = flow.serialize() 2. Serialize JobState: job_state_data = job_state.serialize() 3. Deserialize both on target host 4. Use flow.resume(job_state) to continue execution

set_error_handler(error_handler)[source]

Set error handler for the flow.

Parameters:

error_handler (ErrorHandler) – ErrorHandler object.

set_execution_strategy(strategy, max_workers=None)[source]

Set execution strategy.

Parameters:
  • strategy (str) – Execution strategy, “sequential” or “concurrent”.

  • max_workers (int | None) – Maximum number of worker threads (only effective in concurrent mode).

shutdown(wait=True, timeout=None)[source]

Shutdown Flow’s executor and event loop.

Parameters:
  • wait (bool) – Whether to wait for all tasks to complete.

  • timeout (float | None) – Wait timeout in seconds (only effective when wait=True).

wait_for_completion(timeout=None, job_state=None)[source]

Wait for all tasks to complete.

Deprecated since version This: method is deprecated. Use JobState.wait_for_completion() instead for proper error detection and state management.

For proper completion detection with error checking, use:

from routilux.job_state import JobState
completed = JobState.wait_for_completion(flow, job_state, timeout=timeout)
Parameters:
  • timeout (float | None) – Timeout in seconds (infinite wait if None).

  • job_state (JobState | None) – Optional JobState object. If provided, will use JobState.wait_for_completion() for proper error detection.

Returns:

True if all tasks completed before timeout, False otherwise.

Return type:

bool

class routilux.flow.TaskPriority(value)[source]

Bases: Enum

Task priority for queue scheduling.

HIGH = 1
NORMAL = 2
LOW = 3
class routilux.flow.SlotActivationTask(slot, data, connection=None, priority=TaskPriority.NORMAL, retry_count=0, max_retries=0, created_at=None, job_state=None)[source]

Bases: object

Slot activation task for queue-based execution.

Each task is associated with a JobState to track execution state. This allows tasks executed in worker threads to access and update the correct JobState, even when running concurrently.

__init__(slot, data, connection=None, priority=TaskPriority.NORMAL, retry_count=0, max_retries=0, created_at=None, job_state=None)
__lt__(other)[source]

For priority queue sorting.

connection: Connection | None = None
created_at: datetime | None = None
job_state: Any | None = None
max_retries: int = 0
priority: TaskPriority = 2
retry_count: int = 0
slot: Slot
data: dict[str, Any]

Flow Submodules

The Flow functionality is organized into the following submodules:

flow.flow

Main Flow class that orchestrates workflow execution.

flow.task

Task-related classes including TaskPriority enum and SlotActivationTask dataclass.

flow.execution

Execution logic for sequential and concurrent workflow execution.

flow.event_loop

Event loop and task queue management.

flow.error_handling

Error handling logic for task errors and error handler resolution.

flow.state_management

State management including pause, resume, cancel, and task serialization.

flow.dependency

Dependency graph building and querying.

flow.serialization

Flow serialization and deserialization logic.

flow.completion

Execution completion detection and waiting mechanism. Provides systematic completion checking to avoid race conditions and handle long-running tasks.

For most use cases, you only need to import from the main routilux.flow module:

from routilux import Flow
from routilux.flow import TaskPriority, SlotActivationTask  # If needed

The submodules are internal implementation details and typically don’t need to be imported directly.