Serialization and Persistence¶
In this tutorial, you’ll learn how to serialize and deserialize flows for persistence, recovery, and distributed execution.
Learning Objectives¶
By the end of this tutorial, you’ll be able to:
Serialize flows to JSON
Deserialize flows from JSON
Save and load JobState for recovery
Understand serialization requirements
Build persistent workflows
Step 1: Basic Flow Serialization¶
You can serialize a flow to JSON for persistence. Important: By default,
serialize() does NOT include execution state (job_state). This is the
recommended approach for proper execution recovery.
1from routilux import Flow, Routine
2import json
3
4class SimpleProcessor(Routine):
5 def __init__(self):
6 super().__init__()
7 self.input_slot = self.define_slot("input", handler=self.process)
8 self.output_event = self.define_event("output", ["result"])
9
10 def process(self, data=None, **kwargs):
11 data_value = data or kwargs.get("data", "")
12 result = f"Processed: {data_value}"
13 self.emit("output", result=result)
14
15class DataSource(Routine):
16 def __init__(self):
17 super().__init__()
18 self.trigger_slot = self.define_slot("trigger", handler=self.send)
19 self.output_event = self.define_event("output", ["data"])
20
21 def send(self, **kwargs):
22 self.emit("output", data="test")
23
24# Create and configure flow
25flow = Flow(flow_id="serializable_flow")
26
27source = DataSource()
28processor = SimpleProcessor()
29
30source_id = flow.add_routine(source, "source")
31processor_id = flow.add_routine(processor, "processor")
32
33flow.connect(source_id, "output", processor_id, "input")
34
35# Serialize flow (default: include_execution_state=False)
36# This only serializes flow structure, not execution state
37flow_data = flow.serialize()
38
39# Save to file
40with open("flow.json", "w") as f:
41 json.dump(flow_data, f, indent=2)
42
43print("Flow serialized to flow.json")
44print(f"Flow ID: {flow_data['flow_id']}")
45print(f"Routines: {len(flow_data['routines'])}")
46print(f"Contains job_state: {'job_state' in flow_data}")
Expected Output:
Flow serialized to flow.json
Flow ID: serializable_flow
Routines: 2
Contains job_state: False
Key Points:
serialize()by default does NOT include execution state (recommended)Only serializes flow structure (routines, connections, configuration)
Flow structure is preserved
Execution state (JobState) should be serialized separately for recovery
Step 2: Deserializing Flows¶
Deserialize a flow from JSON. Since we serialized only the flow structure, we can restore it and execute it:
1from routilux import Flow
2import json
3
4# Load from file
5with open("flow.json", "r") as f:
6 flow_data = json.load(f)
7
8# Deserialize flow (create new instance and deserialize)
9restored_flow = Flow()
10restored_flow.deserialize(flow_data)
11
12print(f"Restored flow ID: {restored_flow.flow_id}")
13print(f"Routines: {len(restored_flow.routines)}")
14
15# Execute restored flow
16# Find entry routine ID (first routine in this case)
17entry_id = list(restored_flow.routines.keys())[0]
18job_state = restored_flow.execute(entry_id)
19
20print(f"Execution status: {job_state.status}")
Expected Output:
Restored flow ID: serializable_flow
Routines: 2
Execution status: completed
Key Points:
Flow.deserialize()recreates flow from serialized dataAll routines and connections are restored
Flow can be executed immediately after deserialization
Routine instances are recreated (must have no-argument constructors)
Note: This creates a NEW execution, not a continuation of a previous one
Step 3: Serialization Requirements¶
For serialization to work, routines must meet certain requirements:
1from routilux import Flow, Routine
2
3class ConfigurableProcessor(Routine):
4 """Correct: Uses _config for configuration"""
5
6 def __init__(self):
7 super().__init__()
8 self.input_slot = self.define_slot("input", handler=self.process)
9 self.output_event = self.define_event("output", ["result"])
10 # Configuration stored in _config (serializable)
11 self.set_config(threshold=10, enabled=True)
12
13 def process(self, data=None, **kwargs):
14 threshold = self.get_config("threshold", default=5)
15 enabled = self.get_config("enabled", default=False)
16
17 if enabled:
18 data_value = data or kwargs.get("data", 0)
19 result = data_value * threshold
20 self.emit("output", result=result)
21
22class BadProcessor(Routine):
23 """Incorrect: Uses constructor parameters"""
24
25 def __init__(self, threshold=10): # ❌ Don't do this!
26 super().__init__()
27 self.threshold = threshold # ❌ Not serializable!
28 self.input_slot = self.define_slot("input", handler=self.process)
29
30 def process(self, **kwargs):
31 # This will break serialization
32 pass
33
34flow = Flow(flow_id="config_flow")
35
36# Correct usage
37processor = ConfigurableProcessor()
38processor_id = flow.add_routine(processor, "processor")
39
40# Serialize (works correctly)
41flow_data = flow.serialize()
42print("Serialization successful")
43
44# Deserialize
45restored = Flow.deserialize(flow_data)
46print(f"Restored processor threshold: {restored.routines[processor_id].get_config('threshold')}")
Expected Output:
Serialization successful
Restored processor threshold: 10
Key Requirements:
Routines must have no-argument constructors (except
self)Configuration must be stored in
_configdictionaryStatistics are automatically serialized in
_statsCustom fields must be registered with
add_serializable_fields()Custom Routine classes must be registered with
@register_serializabledecorator (Routilux’s built-in routines are already registered)
Key Points:
Use
set_config()andget_config()for configurationDon’t use constructor parameters
All configuration is automatically serialized
Step 4: Saving and Loading JobState¶
You can save JobState for workflow recovery:
1from routilux import Flow, Routine, JobState
2
3class DataSource(Routine):
4 def __init__(self):
5 super().__init__()
6 self.trigger_slot = self.define_slot("trigger", handler=self.send)
7 self.output_event = self.define_event("output", ["data"])
8
9 def send(self, **kwargs):
10 self.emit("output", data="test_data")
11
12class Processor(Routine):
13 def __init__(self):
14 super().__init__()
15 self.input_slot = self.define_slot("input", handler=self.process)
16 self.output_event = self.define_event("output", ["result"])
17
18 def process(self, data=None, **kwargs):
19 data_value = data or kwargs.get("data", "")
20 result = f"Processed: {data_value}"
21 self.emit("output", result=result)
22
23flow = Flow(flow_id="recovery_flow")
24
25source = DataSource()
26processor = Processor()
27
28source_id = flow.add_routine(source, "source")
29processor_id = flow.add_routine(processor, "processor")
30
31flow.connect(source_id, "output", processor_id, "input")
32
33# Execute and save state
34job_state = flow.execute(source_id)
35
36# Save JobState
37job_state.save("workflow_state.json")
38print(f"JobState saved. Status: {job_state.status}")
39
40# Later, load and resume
41saved_state = JobState.load("workflow_state.json")
42print(f"Loaded JobState. Status: {saved_state.status}")
43print(f"Flow ID: {saved_state.flow_id}")
Expected Output:
JobState saved. Status: completed
Loaded JobState. Status: completed
Flow ID: recovery_flow
Key Points:
save()saves JobState to a fileJobState.load()loads saved stateCan be used for workflow recovery
State includes execution history and routine states
Step 5: Understanding Multiple Executions¶
Important: Each execute() call creates an independent execution with
its own JobState. Multiple executions are tracked separately:
1from routilux import Flow, Routine
2
3class DataSource(Routine):
4 def __init__(self):
5 super().__init__()
6 self.trigger_slot = self.define_slot("trigger", handler=self.send)
7 self.output_event = self.define_event("output", ["data"])
8
9 def send(self, value=None, **kwargs):
10 value = value or kwargs.get("value", "default")
11 self.emit("output", data=value)
12
13flow = Flow(flow_id="multi_execution_flow")
14source = DataSource()
15source_id = flow.add_routine(source, "source")
16
17# First execution
18job_state1 = flow.execute(source_id, entry_params={"value": "A"})
19print(f"Execution 1 - Job ID: {job_state1.job_id}, Status: {job_state1.status}")
20
21# Second execution (independent)
22job_state2 = flow.execute(source_id, entry_params={"value": "B"})
23print(f"Execution 2 - Job ID: {job_state2.job_id}, Status: {job_state2.status}")
24
25# Access all executions
26all_executions = flow.get_all_executions()
27print(f"Total executions tracked: {len(all_executions)}")
28
29# Access specific execution
30execution1 = flow.get_execution(job_state1.job_id)
31print(f"Retrieved execution 1: {execution1.job_id if execution1 else None}")
Expected Output:
Execution 1 - Job ID: <uuid1>, Status: completed
Execution 2 - Job ID: <uuid2>, Status: completed
Total executions tracked: 2
Retrieved execution 1: <uuid1>
Key Points:
Each
execute()creates a new JobState with unique job_idMultiple executions are tracked in
flow._active_executionsUse
flow.get_execution(job_id)to retrieve specific executionUse
flow.get_all_executions()to get all tracked executionsflow.job_stateonly tracks the most recent execution (for backward compatibility)
Step 6: Complete Example - Cross-Host Execution Recovery¶
Here’s a complete example showing proper serialization for cross-host recovery:
1from routilux import Flow, Routine, JobState
2import json
3import os
4
5class DataProcessor(Routine):
6 def __init__(self):
7 super().__init__()
8 self.input_slot = self.define_slot("input", handler=self.process)
9 self.output_event = self.define_event("output", ["result"])
10 self.set_config(multiplier=2, prefix="Result")
11
12 def process(self, data=None, **kwargs):
13 data_value = data or kwargs.get("data", 0)
14 multiplier = self.get_config("multiplier", default=1)
15 prefix = self.get_config("prefix", default="")
16
17 result = data_value * multiplier
18 output = f"{prefix}: {result}" if prefix else str(result)
19
20 self.emit("output", result=output)
21 print(f"Processed {data_value} -> {output}")
22
23class DataSource(Routine):
24 def __init__(self):
25 super().__init__()
26 self.trigger_slot = self.define_slot("trigger", handler=self.generate)
27 self.output_event = self.define_event("output", ["data"])
28
29 def generate(self, value=5, **kwargs):
30 value = value or kwargs.get("value", 5)
31 self.emit("output", data=value)
32
33def save_for_distribution():
34 """Save workflow structure for distribution"""
35 flow = Flow(flow_id="distributed_workflow")
36
37 source = DataSource()
38 processor = DataProcessor()
39
40 source_id = flow.add_routine(source, "source")
41 processor_id = flow.add_routine(processor, "processor")
42
43 flow.connect(source_id, "output", processor_id, "input")
44
45 # Serialize flow structure only (no execution state)
46 flow_data = flow.serialize(include_execution_state=False)
47
48 with open("workflow_structure.json", "w") as f:
49 json.dump(flow_data, f, indent=2)
50
51 print("Workflow structure saved to workflow_structure.json")
52 return flow
53
54def execute_and_save_state():
55 """Execute workflow and save execution state"""
56 # Load workflow structure
57 with open("workflow_structure.json", "r") as f:
58 flow_data = json.load(f)
59
60 flow = Flow()
61 flow.deserialize(flow_data)
62
63 # Execute
64 entry_id = list(flow.routines.keys())[0]
65 job_state = flow.execute(entry_id, entry_params={"value": 10})
66
67 # Save execution state separately
68 job_state_data = job_state.serialize()
69 with open("execution_state.json", "w") as f:
70 json.dump(job_state_data, f, indent=2)
71
72 print(f"Execution state saved. Job ID: {job_state.job_id}")
73 return job_state
74
75def restore_on_remote_host():
76 """Simulate restoring on a remote host"""
77 # Load workflow structure
78 with open("workflow_structure.json", "r") as f:
79 flow_data = json.load(f)
80
81 # Load execution state
82 with open("execution_state.json", "r") as f:
83 job_state_data = json.load(f)
84
85 # Restore flow
86 flow = Flow()
87 flow.deserialize(flow_data)
88
89 # Restore execution state
90 job_state = JobState()
91 job_state.deserialize(job_state_data)
92
93 print(f"Restored flow: {flow.flow_id}")
94 print(f"Restored execution: {job_state.job_id}, Status: {job_state.status}")
95
96 # Verify configuration
97 processor_id = list(flow.routines.keys())[1]
98 processor = flow.routines[processor_id]
99 print(f"Processor config: multiplier={processor.get_config('multiplier')}")
100
101 return flow, job_state
102
103def main():
104 # Step 1: Save workflow structure
105 save_for_distribution()
106
107 # Step 2: Execute and save state
108 job_state = execute_and_save_state()
109
110 # Step 3: Simulate restoring on remote host
111 restored_flow, restored_job_state = restore_on_remote_host()
112
113 # Clean up
114 for fname in ["workflow_structure.json", "execution_state.json"]:
115 if os.path.exists(fname):
116 os.remove(fname)
117
118if __name__ == "__main__":
119 main()
Expected Output:
Workflow structure saved to workflow_structure.json
Processed 10 -> Result: 20
Execution state saved. Job ID: <job-id>
Restored flow: distributed_workflow
Restored execution: <job-id>, Status: completed
Processor config: multiplier=2
Key Points:
Separate serialization: Flow structure and execution state are separate
Distribution: Send both files to remote host
Recovery: Restore both on remote host
Resume: If execution was paused, use
flow.resume(job_state)to continue
Common Pitfalls¶
Pitfall 1: Using constructor parameters
class BadRoutine(Routine):
def __init__(self, config_value): # ❌ Breaks serialization!
super().__init__()
self.config_value = config_value
Solution: Use _config dictionary: self.set_config(config_value=value)
Pitfall 2: Not registering custom fields
class CustomRoutine(Routine):
def __init__(self):
super().__init__()
self.custom_field = "value" # Not serialized by default!
# Need to register it
self.add_serializable_fields(["custom_field"])
Solution: Register custom fields with add_serializable_fields()
Pitfall 3: Storing non-serializable objects
def __init__(self):
super().__init__()
self.file_handle = open("file.txt") # Not serializable!
Solution: Don’t store non-serializable objects. Recreate them when needed.
Pitfall 4: Serializing Flow with execution state for distribution
flow_data = flow.serialize() # Includes job_state by default if set
# Sending this to another host will only include the last execution state!
Solution: Use flow.serialize(include_execution_state=False) for distribution.
Serialize JobState separately if you need to recover a specific execution.
Pitfall 5: Expecting multiple execute() calls to share state
flow.execute(source1_id) # Creates JobState 1
flow.execute(source2_id) # Creates JobState 2 (overwrites JobState 1)
# Aggregator won't see both in the same execution!
Solution: Use a single execute() with multiple emit() calls, or use
the aggregation pattern (see Advanced Patterns).
Best Practices¶
Separate Flow and JobState serialization: - Serialize Flow structure with
flow.serialize()(does NOT include execution state) - Serialize JobState separately withjob_state.serialize()- This allows proper cross-host recoveryUse _config for configuration: All configuration should be in
_configNo constructor parameters: Routines must have no-argument constructors
Register custom fields: Use
add_serializable_fields()for custom dataTrack multiple executions: Use
flow.get_execution(job_id)to access specific executionsClean up old executions: Use
flow.cleanup_completed_executions()to manage memoryTest serialization: Always test that your flows can be serialized/deserialized
Version your workflows: Include version info in flow_id or _config
Understanding Multiple Executions¶
Key Design Principle: Flow = Workflow Definition, JobState = Execution State
Flow: Contains routines, connections, configuration (static structure)
JobState: Contains execution state, history, pending tasks (dynamic state)
Multiple Executions: Each
execute()creates a new JobState with unique job_idComplete Decoupling: Flow does NOT manage JobState - they are completely separate
Recovery: To recover execution, serialize both Flow and JobState separately
Best Practice for Cross-Host Execution:
Serialize Flow structure (no execution state - this is the default): .. code-block:: python
flow_data = flow.serialize() # flow_data does NOT contain job_state
Serialize JobState separately: .. code-block:: python
job_state_data = job_state.serialize()
Send both to target host
Restore both on target host:
new_flow = Flow() new_flow.deserialize(flow_data) new_job_state = JobState() new_job_state.deserialize(job_state_data) # Resume if paused if new_job_state.status == "paused": new_flow.resume(new_job_state)
Cross-Host Execution Example:
See Serialization for a complete cross-host execution example with network transfer and database storage patterns.
Next Steps¶
Congratulations! You’ve completed the Routilux tutorial. You now understand:
Creating routines and flows
Connecting routines in various patterns
Managing state and statistics
Handling errors gracefully
Executing workflows concurrently
Using advanced patterns
Serializing and persisting workflows
For more information, check out:
User Guide - Comprehensive user guide
API Reference - Complete API documentation
Examples - Real-world examples
Happy coding with Routilux! 🚀