Serialization¶
routilux provides full serialization support for persistence and state recovery.
Serializing Objects¶
All core classes support serialization:
# Serialize a flow
data = flow.serialize()
# Serialize a routine
data = routine.serialize()
# Serialize a job state
data = job_state.serialize()
# Serialize an error handler
data = error_handler.serialize()
Deserializing Objects¶
Deserialize objects:
# Deserialize a flow
flow = Flow()
flow.deserialize(data)
# Deserialize a routine
routine = Routine()
routine.deserialize(data)
# Deserialize a job state
job_state = JobState()
job_state.deserialize(data)
# Deserialize an error handler
error_handler = ErrorHandler()
error_handler.deserialize(data)
Saving to JSON¶
Save serialized data to JSON:
import json
data = flow.serialize()
with open("flow.json", "w") as f:
json.dump(data, f, indent=2)
Loading from JSON¶
Load from JSON:
import json
with open("flow.json", "r") as f:
data = json.load(f)
flow = Flow()
flow.deserialize(data)
Serializable Fields¶
Classes register fields for serialization:
self.add_serializable_fields(["field1", "field2", "field3"])
Only registered fields are serialized. Complex objects (lists, dicts, other Serializable objects) are automatically handled.
Serialization Validation¶
Before serializing a Flow, the system automatically validates that all Serializable objects (routines, connections, slots, events, etc.) can be constructed without arguments. This ensures that deserialization will succeed.
Why This Matters:
When deserializing, the system needs to create new instances of all Serializable
objects. It does this by calling Class() with no arguments. If a class
requires constructor parameters, deserialization will fail.
Automatic Validation:
When you call flow.serialize(), the system:
Recursively traverses all Serializable objects in the Flow
Checks that each object’s class can be instantiated without arguments
Raises a clear error if any object fails validation
Example Error:
# ❌ This will fail during serialization
class BadRoutine(Routine):
def __init__(self, required_param):
super().__init__()
self.param = required_param
flow = Flow()
routine = BadRoutine("value") # This works
flow.add_routine(routine, "bad_routine")
# This will raise TypeError with clear error message
data = flow.serialize()
# TypeError: Routine 'bad_routine' (BadRoutine) cannot be serialized:
# BadRoutine cannot be deserialized because its __init__ method requires
# parameters: required_param
# Serializable classes must support initialization with no arguments.
# For Routine subclasses, use _config dictionary instead of constructor parameters.
Correct Pattern:
# ✅ Correct: Use _config dictionary
class GoodRoutine(Routine):
def __init__(self):
super().__init__()
# Configuration is set after creation
def configure(self, param1, param2):
self.set_config(param1=param1, param2=param2)
flow = Flow()
routine = GoodRoutine()
routine.configure(param1="value1", param2="value2")
flow.add_routine(routine, "good_routine")
# This will succeed
data = flow.serialize()
What Gets Validated:
All routines in the Flow
All connections
All slots and events within routines
All nested Serializable objects
Error handlers, job states, and other Serializable fields
Error Messages:
The validation provides detailed error messages that include:
Which object failed (routine ID, connection index, field name, etc.)
Which class has the problem
What parameters are required
How to fix the issue
This allows you to catch serialization issues early, before attempting to save or transfer the Flow.
Constructor Requirements¶
Critical Rule: All Serializable classes (including Routine subclasses) must support initialization with no arguments.
For Routine Subclasses:
❌ Don’t: Accept constructor parameters
✅ Do: Use
_configdictionary for configuration
Example:
# ❌ Wrong: Constructor with parameters
class MyRoutine(Routine):
def __init__(self, max_items: int, timeout: float):
super().__init__()
self.max_items = max_items
self.timeout = timeout
# ✅ Correct: No constructor parameters, use _config
class MyRoutine(Routine):
def __init__(self):
super().__init__()
# Configuration is set after creation
def setup(self, max_items: int, timeout: float):
self.set_config(max_items=max_items, timeout=timeout)
# Usage
routine = MyRoutine()
routine.setup(max_items=10, timeout=5.0)
flow.add_routine(routine, "my_routine")
Why This Constraint Exists:
During deserialization, the system needs to:
Load the class from the registry
Create an instance:
routine = RoutineClass()Restore state:
routine.deserialize(data)
If the class requires constructor parameters, step 2 will fail because the system doesn’t know what values to pass.
Validation Timing:
At Class Definition: The
@register_serializabledecorator checks the class definition when it’s first loadedAt Serialization:
flow.serialize()validates all objects in the Flow before serialization, providing early error detection
This two-stage validation ensures that:
Classes are correctly defined from the start
Runtime issues are caught before serialization
Special Handling¶
Some classes have special serialization behavior:
ErrorHandler: The
ErrorStrategyenum is automatically converted to/from strings during serialization/deserialization.
Handler Method Validation¶
When serializing slot handlers and merge strategies, the system validates that methods belong to the routine being serialized. This ensures cross-process serialization safety.
Why This Matters:
When serialized data is transferred to another process (e.g., for distributed execution), only methods of the serialized routine itself can be properly restored. Methods from other routines cannot be deserialized because their object instances don’t exist in the new process.
Validation Rules:
✅ Allowed: Methods of the routine being serialized
✅ Allowed: Module-level functions (can be imported in any process)
✅ Allowed: Builtin functions
❌ Not Allowed: Methods from other routine instances
Example - Correct Usage:
class MyRoutine(Routine):
def __init__(self):
super().__init__()
# ✅ Correct: Use method from this routine
self.input_slot = self.define_slot("input", handler=self.process_data)
def process_data(self, data):
return {"processed": data}
Example - Incorrect Usage:
class MyRoutine(Routine):
def __init__(self):
super().__init__()
other_routine = OtherRoutine()
# ❌ Wrong: Using method from another routine
# This will raise ValueError during serialization
self.input_slot = self.define_slot("input", handler=other_routine.process)
Error Message:
If you try to serialize a method from another routine, you’ll get a clear error:
ValueError: Cannot serialize method 'process' from OtherRoutine[other_id].
Only methods of the serialized object itself (MyRoutine[my_id])
can be serialized for cross-process execution.
What Gets Validated:
Slot handlers (in
Routine.define_slot())Merge strategies (if they are callable methods)
Conditional router conditions (if they are callable methods)
Note: Functions (not methods) are always allowed because they can be imported by module name in any process.
Flow and JobState Separation¶
Critical Design Principle: Flow and JobState are completely decoupled.
Flow: Contains only the workflow definition (routines, connections, configuration)
JobState: Contains execution state (execution history, routine states, status)
Why This Matters:
Flow serialization does NOT include execution state
JobState must be serialized separately for recovery
This allows multiple independent executions of the same flow
Enables proper cross-host execution and recovery
Serialization Pattern:
# Serialize Flow (workflow definition only)
flow_data = flow.serialize()
# flow_data does NOT contain job_state
# Serialize JobState separately (execution state)
job_state_data = job_state.serialize()
# Save both for recovery
save_flow(flow_data)
save_job_state(job_state_data)
Deserialization Pattern:
# Deserialize Flow
new_flow = Flow()
new_flow.deserialize(flow_data)
# Deserialize JobState separately
new_job_state = JobState()
new_job_state.deserialize(job_state_data)
# Resume execution
resumed = new_flow.resume(new_job_state)
Cross-Host Serialization and Execution¶
routilux supports transferring workflows and execution state across different hosts for distributed execution and recovery.
Use Cases:
Distributed Execution: Execute workflow on one host, transfer to another
Recovery: Resume execution on a different host after failure
Load Balancing: Move execution to a less loaded host
Persistence: Save execution state for later recovery
Complete Example: Cross-Host Execution:
1# ============================================
2# Host A: Execute and Prepare for Transfer
3# ============================================
4
5from routilux import Flow, Routine, JobState
6from serilux import register_serializable
7import json
8
9@register_serializable
10class DataSource(Routine):
11 def __init__(self):
12 super().__init__()
13 self.trigger_slot = self.define_slot("trigger", handler=self.send)
14 self.output_event = self.define_event("output", ["data"])
15
16 def send(self, **kwargs):
17 self.emit("output", data="initial_data")
18
19@register_serializable
20class DataProcessor(Routine):
21 def __init__(self):
22 super().__init__()
23 self.input_slot = self.define_slot("input", handler=self.process)
24 self.output_event = self.define_event("output", ["result"])
25
26 def process(self, data=None, **kwargs):
27 result = f"Processed: {data}"
28 self.emit("output", result=result)
29
30# Create flow on Host A
31flow = Flow(flow_id="cross_host_flow")
32source = DataSource()
33processor = DataProcessor()
34source_id = flow.add_routine(source, "source")
35processor_id = flow.add_routine(processor, "processor")
36flow.connect(source_id, "output", processor_id, "input")
37
38# Execute and pause (simulating transfer point)
39job_state = flow.execute(source_id)
40flow.pause(job_state, reason="Transfer to Host B")
41
42# Serialize both Flow and JobState
43flow_data = flow.serialize()
44job_state_data = job_state.serialize()
45
46# Prepare for transfer (JSON format)
47transfer_data = {
48 "flow": flow_data,
49 "job_state": job_state_data
50}
51
52# Save to file (or send over network)
53with open("transfer.json", "w") as f:
54 json.dump(transfer_data, f, indent=2)
55
56print("✅ Host A: Flow and JobState serialized and ready for transfer")
57
58# ============================================
59# Host B: Receive and Resume Execution
60# ============================================
61
62# Load from file (or receive from network)
63with open("transfer.json", "r") as f:
64 transfer_data = json.load(f)
65
66# Deserialize Flow
67new_flow = Flow()
68new_flow.deserialize(transfer_data["flow"])
69
70# Deserialize JobState
71new_job_state = JobState()
72new_job_state.deserialize(transfer_data["job_state"])
73
74# Verify deserialization
75assert new_flow.flow_id == flow.flow_id
76assert new_job_state.job_id == job_state.job_id
77assert new_job_state.status == "paused"
78
79# Resume execution on Host B
80resumed_job_state = new_flow.resume(new_job_state)
81
82# Wait for completion
83from routilux.job_state import JobState
84JobState.wait_for_completion(new_flow, resumed_job_state, timeout=10.0)
85
86print(f"✅ Host B: Execution resumed and completed")
87print(f" Final status: {resumed_job_state.status}")
88print(f" Execution history: {len(resumed_job_state.execution_history)} records")
Expected Output:
✅ Host A: Flow and JobState serialized and ready for transfer
✅ Host B: Execution resumed and completed
Final status: completed
Execution history: 4 records
Key Points:
Flow and JobState are serialized separately - This is required
Both must be transferred - Flow (definition) + JobState (execution state)
Routines must be registered - Use
@register_serializabledecoratorDeserialization order matters - Deserialize Flow first, then JobState
Resume uses deserialized JobState - Pass the deserialized JobState to
resume()
Network Transfer Example:
1import socket
2import json
3import pickle
4
5# Host A: Send
6def send_to_host_b(flow_data, job_state_data, host_b_address):
7 transfer_data = {
8 "flow": flow_data,
9 "job_state": job_state_data
10 }
11
12 # Serialize to JSON
13 json_data = json.dumps(transfer_data)
14
15 # Send over network
16 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
17 sock.connect(host_b_address)
18 sock.sendall(json_data.encode())
19 sock.close()
20
21# Host B: Receive
22def receive_from_host_a(port):
23 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
24 sock.bind(("", port))
25 sock.listen(1)
26 conn, addr = sock.accept()
27
28 # Receive data
29 data = b""
30 while True:
31 chunk = conn.recv(4096)
32 if not chunk:
33 break
34 data += chunk
35
36 conn.close()
37 sock.close()
38
39 # Deserialize
40 transfer_data = json.loads(data.decode())
41 return transfer_data["flow"], transfer_data["job_state"]
Database Storage Example:
1import sqlite3
2import json
3
4# Host A: Save to database
5def save_execution_to_db(flow_data, job_state_data, execution_id):
6 conn = sqlite3.connect("executions.db")
7 cursor = conn.cursor()
8
9 cursor.execute("""
10 CREATE TABLE IF NOT EXISTS executions (
11 execution_id TEXT PRIMARY KEY,
12 flow_data TEXT,
13 job_state_data TEXT,
14 created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
15 )
16 """)
17
18 cursor.execute("""
19 INSERT INTO executions (execution_id, flow_data, job_state_data)
20 VALUES (?, ?, ?)
21 """, (execution_id, json.dumps(flow_data), json.dumps(job_state_data)))
22
23 conn.commit()
24 conn.close()
25
26# Host B: Load from database
27def load_execution_from_db(execution_id):
28 conn = sqlite3.connect("executions.db")
29 cursor = conn.cursor()
30
31 cursor.execute("""
32 SELECT flow_data, job_state_data
33 FROM executions
34 WHERE execution_id = ?
35 """, (execution_id,))
36
37 row = cursor.fetchone()
38 conn.close()
39
40 if row:
41 flow_data = json.loads(row[0])
42 job_state_data = json.loads(row[1])
43 return flow_data, job_state_data
44 else:
45 return None, None
Best Practices for Cross-Host Execution:
Always serialize Flow and JobState separately:
# ✅ Correct flow_data = flow.serialize() job_state_data = job_state.serialize() # ❌ Wrong: Flow doesn't include JobState # flow_data = flow.serialize(include_execution_state=True) # This doesn't exist!
Register all custom Routine classes:
from serilux import register_serializable @register_serializable class MyRoutine(Routine): def __init__(self): super().__init__() # ...
Use no-argument constructors:
# ✅ Correct class MyRoutine(Routine): def __init__(self): super().__init__() self.set_config(param1="value1") # ❌ Wrong: Constructor with parameters class MyRoutine(Routine): def __init__(self, param1): # Will fail during deserialization! super().__init__() self.param1 = param1
Validate before transfer:
# Serialize and validate flow_data = flow.serialize() # Validates automatically job_state_data = job_state.serialize() # Test deserialization locally before transfer test_flow = Flow() test_flow.deserialize(flow_data) test_job_state = JobState() test_job_state.deserialize(job_state_data) # If this works, safe to transfer
Handle errors gracefully:
try: new_flow = Flow() new_flow.deserialize(flow_data) new_job_state = JobState() new_job_state.deserialize(job_state_data) resumed = new_flow.resume(new_job_state) except Exception as e: print(f"Failed to resume execution: {e}") # Handle error (retry, log, notify, etc.)
Common Pitfalls:
Forgetting to serialize JobState:
# ❌ Wrong: Only serializing Flow flow_data = flow.serialize() # JobState is lost! # ✅ Correct: Serialize both flow_data = flow.serialize() job_state_data = job_state.serialize()
Assuming Flow includes execution state:
# ❌ Wrong: Flow doesn't have job_state flow_data = flow.serialize() # flow_data["job_state"] # KeyError! # ✅ Correct: JobState is separate job_state_data = job_state.serialize()
Not registering custom routines:
# ❌ Wrong: Not registered class MyRoutine(Routine): def __init__(self): super().__init__() # Will fail during deserialization on Host B # ✅ Correct: Registered @register_serializable class MyRoutine(Routine): def __init__(self): super().__init__()
Using constructor parameters:
# ❌ Wrong: Constructor with parameters class MyRoutine(Routine): def __init__(self, max_items): super().__init__() self.max_items = max_items # Will fail during deserialization # ✅ Correct: Use _config class MyRoutine(Routine): def __init__(self): super().__init__() def setup(self, max_items): self.set_config(max_items=max_items)