Serialization¶
routilux provides full serialization support for persistence and state recovery.
Serializing Objects¶
All core classes support serialization:
# Serialize a flow
data = flow.serialize()
# Serialize a routine
data = routine.serialize()
# Serialize a job state
data = job_state.serialize()
# Serialize an error handler
data = error_handler.serialize()
Deserializing Objects¶
Deserialize objects:
# Deserialize a flow
flow = Flow()
flow.deserialize(data)
# Deserialize a routine
routine = Routine()
routine.deserialize(data)
# Deserialize a job state
job_state = JobState()
job_state.deserialize(data)
# Deserialize an error handler
error_handler = ErrorHandler()
error_handler.deserialize(data)
Saving to JSON¶
Save serialized data to JSON:
import json
data = flow.serialize()
with open("flow.json", "w") as f:
json.dump(data, f, indent=2)
Loading from JSON¶
Load from JSON:
import json
with open("flow.json", "r") as f:
data = json.load(f)
flow = Flow()
flow.deserialize(data)
Serializable Fields¶
Classes register fields for serialization:
self.add_serializable_fields(["field1", "field2", "field3"])
Only registered fields are serialized. Complex objects (lists, dicts, other Serializable objects) are automatically handled.
Serialization Validation¶
Before serializing a Flow, the system automatically validates that all Serializable objects (routines, connections, slots, events, etc.) can be constructed without arguments. This ensures that deserialization will succeed.
Why This Matters:
When deserializing, the system needs to create new instances of all Serializable
objects. It does this by calling Class() with no arguments. If a class
requires constructor parameters, deserialization will fail.
Automatic Validation:
When you call flow.serialize(), the system:
Recursively traverses all Serializable objects in the Flow
Checks that each object’s class can be instantiated without arguments
Raises a clear error if any object fails validation
Example Error:
# ❌ This will fail during serialization
class BadRoutine(Routine):
def __init__(self, required_param):
super().__init__()
self.param = required_param
flow = Flow()
routine = BadRoutine("value") # This works
flow.add_routine(routine, "bad_routine")
# This will raise TypeError with clear error message
data = flow.serialize()
# TypeError: Routine 'bad_routine' (BadRoutine) cannot be serialized:
# BadRoutine cannot be deserialized because its __init__ method requires
# parameters: required_param
# Serializable classes must support initialization with no arguments.
# For Routine subclasses, use _config dictionary instead of constructor parameters.
Correct Pattern:
# ✅ Correct: Use _config dictionary
class GoodRoutine(Routine):
def __init__(self):
super().__init__()
# Configuration is set after creation
def configure(self, param1, param2):
self.set_config(param1=param1, param2=param2)
flow = Flow()
routine = GoodRoutine()
routine.configure(param1="value1", param2="value2")
flow.add_routine(routine, "good_routine")
# This will succeed
data = flow.serialize()
What Gets Validated:
All routines in the Flow
All connections
All slots and events within routines
All nested Serializable objects
Error handlers, job states, and other Serializable fields
Error Messages:
The validation provides detailed error messages that include:
Which object failed (routine ID, connection index, field name, etc.)
Which class has the problem
What parameters are required
How to fix the issue
This allows you to catch serialization issues early, before attempting to save or transfer the Flow.
Constructor Requirements¶
Critical Rule: All Serializable classes (including Routine subclasses) must support initialization with no arguments.
For Routine Subclasses:
❌ Don’t: Accept constructor parameters
✅ Do: Use
_configdictionary for configuration
Example:
# ❌ Wrong: Constructor with parameters
class MyRoutine(Routine):
def __init__(self, max_items: int, timeout: float):
super().__init__()
self.max_items = max_items
self.timeout = timeout
# ✅ Correct: No constructor parameters, use _config
class MyRoutine(Routine):
def __init__(self):
super().__init__()
# Configuration is set after creation
def setup(self, max_items: int, timeout: float):
self.set_config(max_items=max_items, timeout=timeout)
# Usage
routine = MyRoutine()
routine.setup(max_items=10, timeout=5.0)
flow.add_routine(routine, "my_routine")
Why This Constraint Exists:
During deserialization, the system needs to:
Load the class from the registry
Create an instance:
routine = RoutineClass()Restore state:
routine.deserialize(data)
If the class requires constructor parameters, step 2 will fail because the system doesn’t know what values to pass.
Validation Timing:
At Class Definition: The
@register_serializabledecorator checks the class definition when it’s first loadedAt Serialization:
flow.serialize()validates all objects in the Flow before serialization, providing early error detection
This two-stage validation ensures that:
Classes are correctly defined from the start
Runtime issues are caught before serialization
Special Handling¶
Some classes have special serialization behavior:
ErrorHandler: The
ErrorStrategyenum is automatically converted to/from strings during serialization/deserialization.
Handler Method Validation¶
When serializing slot handlers and merge strategies, the system validates that methods belong to the routine being serialized. This ensures cross-process serialization safety.
Why This Matters:
When serialized data is transferred to another process (e.g., for distributed execution), only methods of the serialized routine itself can be properly restored. Methods from other routines cannot be deserialized because their object instances don’t exist in the new process.
Validation Rules:
✅ Allowed: Methods of the routine being serialized
✅ Allowed: Module-level functions (can be imported in any process)
✅ Allowed: Builtin functions
❌ Not Allowed: Methods from other routine instances
Example - Correct Usage:
class MyRoutine(Routine):
def __init__(self):
super().__init__()
# ✅ Correct: Use method from this routine
self.input_slot = self.define_slot("input", handler=self.process_data)
def process_data(self, data):
return {"processed": data}
Example - Incorrect Usage:
class MyRoutine(Routine):
def __init__(self):
super().__init__()
other_routine = OtherRoutine()
# ❌ Wrong: Using method from another routine
# This will raise ValueError during serialization
self.input_slot = self.define_slot("input", handler=other_routine.process)
Error Message:
If you try to serialize a method from another routine, you’ll get a clear error:
ValueError: Cannot serialize method 'process' from OtherRoutine[other_id].
Only methods of the serialized object itself (MyRoutine[my_id])
can be serialized for cross-process execution.
What Gets Validated:
Slot handlers (in
Routine.define_slot())Merge strategies (if they are callable methods)
Conditional router conditions (if they are callable methods)
Note: Functions (not methods) are always allowed because they can be imported by module name in any process.
Flow and JobState Separation¶
Critical Design Principle: Flow and JobState are completely decoupled.
Flow: Contains only the workflow definition (routines, connections, configuration)
JobState: Contains execution state (execution history, routine states, status)
Why This Matters:
Flow serialization does NOT include execution state
JobState must be serialized separately for recovery
This allows multiple independent executions of the same flow
Enables proper cross-host execution and recovery
Serialization Pattern:
# Serialize Flow (workflow definition only)
flow_data = flow.serialize()
# flow_data does NOT contain job_state
# Serialize JobState separately (execution state)
job_state_data = job_state.serialize()
# Save both for recovery
save_flow(flow_data)
save_job_state(job_state_data)
Deserialization Pattern:
# Deserialize Flow
new_flow = Flow()
new_flow.deserialize(flow_data)
# Deserialize JobState separately
new_job_state = JobState()
new_job_state.deserialize(job_state_data)
# Resume execution
resumed = new_flow.resume(new_job_state)
Cross-Host Serialization and Execution¶
routilux supports transferring workflows and execution state across different hosts for distributed execution and recovery.
Use Cases:
Distributed Execution: Execute workflow on one host, transfer to another
Recovery: Resume execution on a different host after failure
Load Balancing: Move execution to a less loaded host
Persistence: Save execution state for later recovery
Complete Example: Cross-Host Execution:
1# ============================================
2# Host A: Execute and Prepare for Transfer
3# ============================================
4
5from routilux import Flow, Routine, JobState
6from serilux import register_serializable
7import json
8
9@register_serializable
10class DataSource(Routine):
11 def __init__(self):
12 super().__init__()
13 self.trigger_slot = self.define_slot("trigger", handler=self.send)
14 self.output_event = self.define_event("output", ["data"])
15
16 def send(self, **kwargs):
17 self.emit("output", data="initial_data")
18
19@register_serializable
20class DataProcessor(Routine):
21 def __init__(self):
22 super().__init__()
23 self.input_slot = self.define_slot("input", handler=self.process)
24 self.output_event = self.define_event("output", ["result"])
25
26 def process(self, data=None, **kwargs):
27 result = f"Processed: {data}"
28 self.emit("output", result=result)
29
30# Create flow on Host A
31flow = Flow(flow_id="cross_host_flow")
32source = DataSource()
33processor = DataProcessor()
34source_id = flow.add_routine(source, "source")
35processor_id = flow.add_routine(processor, "processor")
36flow.connect(source_id, "output", processor_id, "input")
37
38# Execute and pause (simulating transfer point)
39job_state = flow.execute(source_id)
40flow.pause(job_state, reason="Transfer to Host B")
41
42# Serialize both Flow and JobState
43flow_data = flow.serialize()
44job_state_data = job_state.serialize()
45
46# Prepare for transfer (JSON format)
47transfer_data = {
48 "flow": flow_data,
49 "job_state": job_state_data
50}
51
52# Save to file (or send over network)
53with open("transfer.json", "w") as f:
54 json.dump(transfer_data, f, indent=2)
55
56print("✅ Host A: Flow and JobState serialized and ready for transfer")
57
58# ============================================
59# Host B: Receive and Resume Execution
60# ============================================
61
62# Load from file (or receive from network)
63with open("transfer.json", "r") as f:
64 transfer_data = json.load(f)
65
66# Deserialize Flow
67new_flow = Flow()
68new_flow.deserialize(transfer_data["flow"])
69
70# Deserialize JobState
71new_job_state = JobState()
72new_job_state.deserialize(transfer_data["job_state"])
73
74# Verify deserialization
75assert new_flow.flow_id == flow.flow_id
76assert new_job_state.job_id == job_state.job_id
77assert new_job_state.status == "paused"
78
79# Resume execution on Host B
80resumed_job_state = new_flow.resume(new_job_state)
81
82# Wait for completion
83from routilux.job_state import JobState
84JobState.wait_for_completion(new_flow, resumed_job_state, timeout=10.0)
85
86print(f"✅ Host B: Execution resumed and completed")
87print(f" Final status: {resumed_job_state.status}")
88print(f" Execution history: {len(resumed_job_state.execution_history)} records")
Expected Output:
✅ Host A: Flow and JobState serialized and ready for transfer
✅ Host B: Execution resumed and completed
Final status: completed
Execution history: 4 records
Key Points:
Flow and JobState are serialized separately - This is required
Both must be transferred - Flow (definition) + JobState (execution state)
Routines must be registered - Use
@register_serializabledecoratorDeserialization order matters - Deserialize Flow first, then JobState
Resume uses deserialized JobState - Pass the deserialized JobState to
resume()
Network Transfer Example:
1import socket
2import json
3import pickle
4
5# Host A: Send
6def send_to_host_b(flow_data, job_state_data, host_b_address):
7 transfer_data = {
8 "flow": flow_data,
9 "job_state": job_state_data
10 }
11
12 # Serialize to JSON
13 json_data = json.dumps(transfer_data)
14
15 # Send over network
16 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
17 sock.connect(host_b_address)
18 sock.sendall(json_data.encode())
19 sock.close()
20
21# Host B: Receive
22def receive_from_host_a(port):
23 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
24 sock.bind(("", port))
25 sock.listen(1)
26 conn, addr = sock.accept()
27
28 # Receive data
29 data = b""
30 while True:
31 chunk = conn.recv(4096)
32 if not chunk:
33 break
34 data += chunk
35
36 conn.close()
37 sock.close()
38
39 # Deserialize
40 transfer_data = json.loads(data.decode())
41 return transfer_data["flow"], transfer_data["job_state"]
Database Storage Example:
1import sqlite3
2import json
3
4# Host A: Save to database
5def save_execution_to_db(flow_data, job_state_data, execution_id):
6 conn = sqlite3.connect("executions.db")
7 cursor = conn.cursor()
8
9 cursor.execute("""
10 CREATE TABLE IF NOT EXISTS executions (
11 execution_id TEXT PRIMARY KEY,
12 flow_data TEXT,
13 job_state_data TEXT,
14 created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
15 )
16 """)
17
18 cursor.execute("""
19 INSERT INTO executions (execution_id, flow_data, job_state_data)
20 VALUES (?, ?, ?)
21 """, (execution_id, json.dumps(flow_data), json.dumps(job_state_data)))
22
23 conn.commit()
24 conn.close()
25
26# Host B: Load from database
27def load_execution_from_db(execution_id):
28 conn = sqlite3.connect("executions.db")
29 cursor = conn.cursor()
30
31 cursor.execute("""
32 SELECT flow_data, job_state_data
33 FROM executions
34 WHERE execution_id = ?
35 """, (execution_id,))
36
37 row = cursor.fetchone()
38 conn.close()
39
40 if row:
41 flow_data = json.loads(row[0])
42 job_state_data = json.loads(row[1])
43 return flow_data, job_state_data
44 else:
45 return None, None
Best Practices for Cross-Host Execution:
Always serialize Flow and JobState separately:
# ✅ Correct flow_data = flow.serialize() job_state_data = job_state.serialize() # ❌ Wrong: Flow doesn't include JobState # flow_data = flow.serialize(include_execution_state=True) # This doesn't exist!
Register all custom Routine classes:
from serilux import register_serializable @register_serializable class MyRoutine(Routine): def __init__(self): super().__init__() # ...
Use no-argument constructors:
# ✅ Correct class MyRoutine(Routine): def __init__(self): super().__init__() self.set_config(param1="value1") # ❌ Wrong: Constructor with parameters class MyRoutine(Routine): def __init__(self, param1): # Will fail during deserialization! super().__init__() self.param1 = param1
Validate before transfer:
# Serialize and validate flow_data = flow.serialize() # Validates automatically job_state_data = job_state.serialize() # Test deserialization locally before transfer test_flow = Flow() test_flow.deserialize(flow_data) test_job_state = JobState() test_job_state.deserialize(job_state_data) # If this works, safe to transfer
Handle errors gracefully:
try: new_flow = Flow() new_flow.deserialize(flow_data) new_job_state = JobState() new_job_state.deserialize(job_state_data) resumed = new_flow.resume(new_job_state) except Exception as e: print(f"Failed to resume execution: {e}") # Handle error (retry, log, notify, etc.)
Common Pitfalls:
Forgetting to serialize JobState:
# ❌ Wrong: Only serializing Flow flow_data = flow.serialize() # JobState is lost! # ✅ Correct: Serialize both flow_data = flow.serialize() job_state_data = job_state.serialize()
Assuming Flow includes execution state:
# ❌ Wrong: Flow doesn't have job_state flow_data = flow.serialize() # flow_data["job_state"] # KeyError! # ✅ Correct: JobState is separate job_state_data = job_state.serialize()
Not registering custom routines:
# ❌ Wrong: Not registered class MyRoutine(Routine): def __init__(self): super().__init__() # Will fail during deserialization on Host B # ✅ Correct: Registered @register_serializable class MyRoutine(Routine): def __init__(self): super().__init__()
Using constructor parameters:
# ❌ Wrong: Constructor with parameters class MyRoutine(Routine): def __init__(self, max_items): super().__init__() self.max_items = max_items # Will fail during deserialization # ✅ Correct: Use _config class MyRoutine(Routine): def __init__(self): super().__init__() def setup(self, max_items): self.set_config(max_items=max_items)
Serialization Best Practices¶
This section covers best practices for serializing and deserializing flows, including when to use pause vs direct serialization, and what to expect after recovery.
When to Use Pause vs Direct Serialization¶
Option 1: Pause Before Serialization (Recommended for Active Executions)
Use flow.pause() when you want to capture the exact execution state at a specific point:
# Execute flow
job_state = flow.execute(entry_id)
# Pause to capture state
flow.pause(job_state, reason="Saving checkpoint")
# Serialize (pending tasks are already captured)
flow_data = flow.serialize()
job_state_data = job_state.serialize()
# Save to file
save_data = {
"flow": flow_data,
"job_state": job_state_data
}
Benefits of Pausing:
✅ Captures all pending tasks: Tasks in the queue are moved to
pending_tasks✅ Clean state: Active tasks complete before serialization
✅ Predictable recovery: Exact state is preserved
✅ Better for debugging: Clear pause points in execution history
When to Use:
Saving checkpoints during long-running executions
Transferring execution to another host at a known point
Debugging and inspection of execution state
When you need guaranteed consistency
Option 2: Direct Serialization (For Quick Saves)
You can serialize directly without pausing, especially useful for:
# Execute flow
job_state = flow.execute(entry_id)
# Serialize immediately (without pausing)
flow_data = flow.serialize()
job_state_data = job_state.serialize()
# Save to file
save_data = {
"flow": flow_data,
"job_state": job_state_data
}
Benefits of Direct Serialization:
✅ Faster: No need to wait for active tasks
✅ Non-blocking: Can save state while execution continues
✅ Simpler: Fewer steps in the code
When to Use:
Quick state snapshots
Background persistence
When execution can continue after save
When you don’t need to capture queue tasks
Important Note: When using direct serialization, tasks that are in the execution queue
may not be captured in pending_tasks. However, routilux automatically recovers tasks
from slot data during resume(), so execution will continue correctly even if some
tasks weren’t in pending_tasks.
Automatic Task Recovery¶
New Feature: Starting from version 0.1.0, routilux automatically recovers tasks from
slot data during resume(). This means:
✅ Slot data is preserved: Data in routine slots is serialized with the flow
✅ Automatic recovery: Tasks are automatically rebuilt from slot data if needed
✅ Retry state preserved: Error handler retry counts are correctly restored
✅ Connection recovery: Connection information is automatically restored
How It Works:
When you call flow.resume(job_state), the system:
Deserializes pending tasks from
job_state.pending_tasksAutomatically scans all routine slots for data
Rebuilds tasks for slots that have data but no pending tasks
Preserves retry state from error handlers
Restores connections automatically
Example:
# Host A: Save state (without pausing)
job_state = flow.execute(entry_id)
flow_data = flow.serialize()
job_state_data = job_state.serialize()
# Some tasks may be in queue, not in pending_tasks
# Host B: Restore and resume
new_flow = Flow()
new_flow.deserialize(flow_data)
new_job_state = JobState()
new_job_state.deserialize(job_state_data)
# Resume automatically recovers tasks from slot data
resumed = new_flow.resume(new_job_state)
# ✅ Execution continues correctly, even if some tasks weren't in pending_tasks
What Gets Recovered:
Tasks for slots with data
Retry state (retry_count, max_retries)
Connection information
Task priority and metadata
What Doesn’t Get Recovered:
Tasks that were already completed
Tasks for routines that are already failed/cancelled
Tasks that exceed max_retries
Expected Behavior After Recovery¶
Status Transitions:
When you resume a flow, the status transitions as follows:
# Before resume
assert job_state.status == "paused" # or "running"
# After resume
resumed = flow.resume(job_state)
assert resumed.status == "running" # Always becomes "running"
# After completion
# Wait for completion...
assert resumed.status in ["completed", "failed", "cancelled"]
Execution Continuation:
✅ Pending tasks are restored: Tasks from
pending_tasksare enqueued✅ Slot data tasks are recovered: Tasks are automatically rebuilt from slot data
✅ Retry state is preserved: Error handler retry counts continue from saved state
✅ Execution history continues: New events are appended to existing history
Retry Behavior:
When resuming a flow with retry-enabled routines:
# Host A: Save after 2 retries
# error_handler.retry_count = 2
# error_handler.max_retries = 4
# Host B: Resume
resumed = flow.resume(job_state)
# Retry state is preserved
assert error_handler.retry_count == 2 # ✅ Preserved
assert error_handler.max_retries == 4 # ✅ Preserved
# Remaining retries will execute
# (2 more retries available: 3 and 4)
Routine State:
✅ Completed routines: Remain completed, don’t re-execute
✅ Failed routines: Remain failed (unless retries are available)
✅ Pending/running routines: Will continue execution
✅ Skipped routines: Remain skipped
Data Integrity:
✅ Slot data: Preserved and used for task recovery
✅ Shared data: Preserved in
job_state.shared_data✅ Execution history: Preserved and continues
✅ Connection information: Automatically restored
Error Handling:
If recovery encounters issues:
try:
resumed = flow.resume(job_state)
except ValueError as e:
# Flow ID mismatch, routine not found, etc.
print(f"Recovery failed: {e}")
except Exception as e:
# Other errors during recovery
print(f"Unexpected error: {e}")
Common Scenarios:
Normal Recovery:
# Pause and save flow.pause(job_state, reason="Checkpoint") flow_data = flow.serialize() job_state_data = job_state.serialize() # Restore and resume new_flow = Flow() new_flow.deserialize(flow_data) new_job_state = JobState() new_job_state.deserialize(job_state_data) resumed = new_flow.resume(new_job_state) # ✅ All pending tasks restored # ✅ Execution continues from pause point
Recovery with Slot Data:
# Save without pausing (some tasks in queue) flow_data = flow.serialize() job_state_data = job_state.serialize() # Restore and resume new_flow = Flow() new_flow.deserialize(flow_data) new_job_state = JobState() new_job_state.deserialize(job_state_data) resumed = new_flow.resume(new_job_state) # ✅ Tasks automatically recovered from slot data # ✅ Execution continues correctly
Recovery with Retries:
# Save after 2 retries (retry_count=2, max_retries=4) flow_data = flow.serialize() job_state_data = job_state.serialize() # Restore and resume new_flow = Flow() new_flow.deserialize(flow_data) new_job_state = JobState() new_job_state.deserialize(job_state_data) resumed = new_flow.resume(new_job_state) # ✅ Retry state preserved (retry_count=2) # ✅ Remaining retries will execute (3 and 4)
Best Practices Summary¶
Always serialize Flow and JobState separately:
flow_data = flow.serialize() job_state_data = job_state.serialize() # Never assume flow.serialize() includes job_state
Use pause() for checkpoints:
flow.pause(job_state, reason="Checkpoint") # Ensures all pending tasks are captured
Direct serialization is fine for quick saves:
# Can serialize without pausing # Automatic recovery handles slot data
Register all custom routines:
@register_serializable class MyRoutine(Routine): def __init__(self): super().__init__()
Test deserialization locally:
# Validate before transfer test_flow = Flow() test_flow.deserialize(flow_data) test_job_state = JobState() test_job_state.deserialize(job_state_data)
Handle errors gracefully:
try: resumed = flow.resume(job_state) except Exception as e: # Log and handle error logger.error(f"Recovery failed: {e}")
Trust automatic recovery:
# Don't manually check slot data or create tasks # Routilux handles this automatically resumed = flow.resume(job_state) # ✅ Tasks are automatically recovered