Serialization and Persistence

In this tutorial, you’ll learn how to serialize and deserialize flows for persistence, recovery, and distributed execution.

Learning Objectives

By the end of this tutorial, you’ll be able to:

  • Serialize flows to JSON

  • Deserialize flows from JSON

  • Save and load JobState for recovery

  • Understand serialization requirements

  • Build persistent workflows

Step 1: Basic Flow Serialization

You can serialize a flow to JSON for persistence. Important: By default, serialize() does NOT include execution state (job_state). This is the recommended approach for proper execution recovery.

 1from routilux import Flow, Routine
 2import json
 3
 4class SimpleProcessor(Routine):
 5    def __init__(self):
 6        super().__init__()
 7        self.input_slot = self.define_slot("input", handler=self.process)
 8        self.output_event = self.define_event("output", ["result"])
 9
10    def process(self, data=None, **kwargs):
11        data_value = data or kwargs.get("data", "")
12        result = f"Processed: {data_value}"
13        self.emit("output", result=result)
14
15class DataSource(Routine):
16    def __init__(self):
17        super().__init__()
18        self.trigger_slot = self.define_slot("trigger", handler=self.send)
19        self.output_event = self.define_event("output", ["data"])
20
21    def send(self, **kwargs):
22        self.emit("output", data="test")
23
24# Create and configure flow
25flow = Flow(flow_id="serializable_flow")
26
27source = DataSource()
28processor = SimpleProcessor()
29
30source_id = flow.add_routine(source, "source")
31processor_id = flow.add_routine(processor, "processor")
32
33flow.connect(source_id, "output", processor_id, "input")
34
35# Serialize flow (default: include_execution_state=False)
36# This only serializes flow structure, not execution state
37flow_data = flow.serialize()
38
39# Save to file
40with open("flow.json", "w") as f:
41    json.dump(flow_data, f, indent=2)
42
43print("Flow serialized to flow.json")
44print(f"Flow ID: {flow_data['flow_id']}")
45print(f"Routines: {len(flow_data['routines'])}")
46print(f"Contains job_state: {'job_state' in flow_data}")

Expected Output:

Flow serialized to flow.json
Flow ID: serializable_flow
Routines: 2
Contains job_state: False

Key Points:

  • serialize() by default does NOT include execution state (recommended)

  • Only serializes flow structure (routines, connections, configuration)

  • Flow structure is preserved

  • Execution state (JobState) should be serialized separately for recovery

Step 2: Deserializing Flows

Deserialize a flow from JSON. Since we serialized only the flow structure, we can restore it and execute it:

 1from routilux import Flow
 2import json
 3
 4# Load from file
 5with open("flow.json", "r") as f:
 6    flow_data = json.load(f)
 7
 8# Deserialize flow (create new instance and deserialize)
 9restored_flow = Flow()
10restored_flow.deserialize(flow_data)
11
12print(f"Restored flow ID: {restored_flow.flow_id}")
13print(f"Routines: {len(restored_flow.routines)}")
14
15# Execute restored flow
16# Find entry routine ID (first routine in this case)
17entry_id = list(restored_flow.routines.keys())[0]
18job_state = restored_flow.execute(entry_id)
19
20print(f"Execution status: {job_state.status}")

Expected Output:

Restored flow ID: serializable_flow
Routines: 2
Execution status: completed

Key Points:

  • Flow.deserialize() recreates flow from serialized data

  • All routines and connections are restored

  • Flow can be executed immediately after deserialization

  • Routine instances are recreated (must have no-argument constructors)

  • Note: This creates a NEW execution, not a continuation of a previous one

Step 3: Serialization Requirements

For serialization to work, routines must meet certain requirements:

 1from routilux import Flow, Routine
 2
 3class ConfigurableProcessor(Routine):
 4    """Correct: Uses _config for configuration"""
 5
 6    def __init__(self):
 7        super().__init__()
 8        self.input_slot = self.define_slot("input", handler=self.process)
 9        self.output_event = self.define_event("output", ["result"])
10        # Configuration stored in _config (serializable)
11        self.set_config(threshold=10, enabled=True)
12
13    def process(self, data=None, **kwargs):
14        threshold = self.get_config("threshold", default=5)
15        enabled = self.get_config("enabled", default=False)
16
17        if enabled:
18            data_value = data or kwargs.get("data", 0)
19            result = data_value * threshold
20            self.emit("output", result=result)
21
22class BadProcessor(Routine):
23    """Incorrect: Uses constructor parameters"""
24
25    def __init__(self, threshold=10):  # ❌ Don't do this!
26        super().__init__()
27        self.threshold = threshold  # ❌ Not serializable!
28        self.input_slot = self.define_slot("input", handler=self.process)
29
30    def process(self, **kwargs):
31        # This will break serialization
32        pass
33
34flow = Flow(flow_id="config_flow")
35
36# Correct usage
37processor = ConfigurableProcessor()
38processor_id = flow.add_routine(processor, "processor")
39
40# Serialize (works correctly)
41flow_data = flow.serialize()
42print("Serialization successful")
43
44# Deserialize
45restored = Flow.deserialize(flow_data)
46print(f"Restored processor threshold: {restored.routines[processor_id].get_config('threshold')}")

Expected Output:

Serialization successful
Restored processor threshold: 10

Key Requirements:

  • Routines must have no-argument constructors (except self)

  • Configuration must be stored in _config dictionary

  • Statistics are automatically serialized in _stats

  • Custom fields must be registered with add_serializable_fields()

  • Custom Routine classes must be registered with @register_serializable decorator (Routilux’s built-in routines are already registered)

Key Points:

  • Use set_config() and get_config() for configuration

  • Don’t use constructor parameters

  • All configuration is automatically serialized

Step 4: Saving and Loading JobState

You can save JobState for workflow recovery:

 1from routilux import Flow, Routine, JobState
 2
 3class DataSource(Routine):
 4    def __init__(self):
 5        super().__init__()
 6        self.trigger_slot = self.define_slot("trigger", handler=self.send)
 7        self.output_event = self.define_event("output", ["data"])
 8
 9    def send(self, **kwargs):
10        self.emit("output", data="test_data")
11
12class Processor(Routine):
13    def __init__(self):
14        super().__init__()
15        self.input_slot = self.define_slot("input", handler=self.process)
16        self.output_event = self.define_event("output", ["result"])
17
18    def process(self, data=None, **kwargs):
19        data_value = data or kwargs.get("data", "")
20        result = f"Processed: {data_value}"
21        self.emit("output", result=result)
22
23flow = Flow(flow_id="recovery_flow")
24
25source = DataSource()
26processor = Processor()
27
28source_id = flow.add_routine(source, "source")
29processor_id = flow.add_routine(processor, "processor")
30
31flow.connect(source_id, "output", processor_id, "input")
32
33# Execute and save state
34job_state = flow.execute(source_id)
35
36# Save JobState
37job_state.save("workflow_state.json")
38print(f"JobState saved. Status: {job_state.status}")
39
40# Later, load and resume
41saved_state = JobState.load("workflow_state.json")
42print(f"Loaded JobState. Status: {saved_state.status}")
43print(f"Flow ID: {saved_state.flow_id}")

Expected Output:

JobState saved. Status: completed
Loaded JobState. Status: completed
Flow ID: recovery_flow

Key Points:

  • save() saves JobState to a file

  • JobState.load() loads saved state

  • Can be used for workflow recovery

  • State includes execution history and routine states

Step 5: Understanding Multiple Executions

Important: Each execute() call creates an independent execution with its own JobState. Multiple executions are tracked separately:

 1from routilux import Flow, Routine
 2
 3class DataSource(Routine):
 4    def __init__(self):
 5        super().__init__()
 6        self.trigger_slot = self.define_slot("trigger", handler=self.send)
 7        self.output_event = self.define_event("output", ["data"])
 8
 9    def send(self, value=None, **kwargs):
10        value = value or kwargs.get("value", "default")
11        self.emit("output", data=value)
12
13flow = Flow(flow_id="multi_execution_flow")
14source = DataSource()
15source_id = flow.add_routine(source, "source")
16
17# First execution
18job_state1 = flow.execute(source_id, entry_params={"value": "A"})
19print(f"Execution 1 - Job ID: {job_state1.job_id}, Status: {job_state1.status}")
20
21# Second execution (independent)
22job_state2 = flow.execute(source_id, entry_params={"value": "B"})
23print(f"Execution 2 - Job ID: {job_state2.job_id}, Status: {job_state2.status}")
24
25# Access all executions
26all_executions = flow.get_all_executions()
27print(f"Total executions tracked: {len(all_executions)}")
28
29# Access specific execution
30execution1 = flow.get_execution(job_state1.job_id)
31print(f"Retrieved execution 1: {execution1.job_id if execution1 else None}")

Expected Output:

Execution 1 - Job ID: <uuid1>, Status: completed
Execution 2 - Job ID: <uuid2>, Status: completed
Total executions tracked: 2
Retrieved execution 1: <uuid1>

Key Points:

  • Each execute() creates a new JobState with unique job_id

  • Multiple executions are tracked in flow._active_executions

  • Use flow.get_execution(job_id) to retrieve specific execution

  • Use flow.get_all_executions() to get all tracked executions

  • flow.job_state only tracks the most recent execution (for backward compatibility)

Step 6: Complete Example - Cross-Host Execution Recovery

Here’s a complete example showing proper serialization for cross-host recovery:

  1from routilux import Flow, Routine, JobState
  2import json
  3import os
  4
  5class DataProcessor(Routine):
  6    def __init__(self):
  7        super().__init__()
  8        self.input_slot = self.define_slot("input", handler=self.process)
  9        self.output_event = self.define_event("output", ["result"])
 10        self.set_config(multiplier=2, prefix="Result")
 11
 12    def process(self, data=None, **kwargs):
 13        data_value = data or kwargs.get("data", 0)
 14        multiplier = self.get_config("multiplier", default=1)
 15        prefix = self.get_config("prefix", default="")
 16
 17        result = data_value * multiplier
 18        output = f"{prefix}: {result}" if prefix else str(result)
 19
 20        self.emit("output", result=output)
 21        print(f"Processed {data_value} -> {output}")
 22
 23class DataSource(Routine):
 24    def __init__(self):
 25        super().__init__()
 26        self.trigger_slot = self.define_slot("trigger", handler=self.generate)
 27        self.output_event = self.define_event("output", ["data"])
 28
 29    def generate(self, value=5, **kwargs):
 30        value = value or kwargs.get("value", 5)
 31        self.emit("output", data=value)
 32
 33def save_for_distribution():
 34    """Save workflow structure for distribution"""
 35    flow = Flow(flow_id="distributed_workflow")
 36
 37    source = DataSource()
 38    processor = DataProcessor()
 39
 40    source_id = flow.add_routine(source, "source")
 41    processor_id = flow.add_routine(processor, "processor")
 42
 43    flow.connect(source_id, "output", processor_id, "input")
 44
 45    # Serialize flow structure only (no execution state)
 46    flow_data = flow.serialize(include_execution_state=False)
 47
 48    with open("workflow_structure.json", "w") as f:
 49        json.dump(flow_data, f, indent=2)
 50
 51    print("Workflow structure saved to workflow_structure.json")
 52    return flow
 53
 54def execute_and_save_state():
 55    """Execute workflow and save execution state"""
 56    # Load workflow structure
 57    with open("workflow_structure.json", "r") as f:
 58        flow_data = json.load(f)
 59
 60    flow = Flow()
 61    flow.deserialize(flow_data)
 62
 63    # Execute
 64    entry_id = list(flow.routines.keys())[0]
 65    job_state = flow.execute(entry_id, entry_params={"value": 10})
 66
 67    # Save execution state separately
 68    job_state_data = job_state.serialize()
 69    with open("execution_state.json", "w") as f:
 70        json.dump(job_state_data, f, indent=2)
 71
 72    print(f"Execution state saved. Job ID: {job_state.job_id}")
 73    return job_state
 74
 75def restore_on_remote_host():
 76    """Simulate restoring on a remote host"""
 77    # Load workflow structure
 78    with open("workflow_structure.json", "r") as f:
 79        flow_data = json.load(f)
 80
 81    # Load execution state
 82    with open("execution_state.json", "r") as f:
 83        job_state_data = json.load(f)
 84
 85    # Restore flow
 86    flow = Flow()
 87    flow.deserialize(flow_data)
 88
 89    # Restore execution state
 90    job_state = JobState()
 91    job_state.deserialize(job_state_data)
 92
 93    print(f"Restored flow: {flow.flow_id}")
 94    print(f"Restored execution: {job_state.job_id}, Status: {job_state.status}")
 95
 96    # Verify configuration
 97    processor_id = list(flow.routines.keys())[1]
 98    processor = flow.routines[processor_id]
 99    print(f"Processor config: multiplier={processor.get_config('multiplier')}")
100
101    return flow, job_state
102
103def main():
104    # Step 1: Save workflow structure
105    save_for_distribution()
106
107    # Step 2: Execute and save state
108    job_state = execute_and_save_state()
109
110    # Step 3: Simulate restoring on remote host
111    restored_flow, restored_job_state = restore_on_remote_host()
112
113    # Clean up
114    for fname in ["workflow_structure.json", "execution_state.json"]:
115        if os.path.exists(fname):
116            os.remove(fname)
117
118if __name__ == "__main__":
119    main()

Expected Output:

Workflow structure saved to workflow_structure.json
Processed 10 -> Result: 20
Execution state saved. Job ID: <job-id>
Restored flow: distributed_workflow
Restored execution: <job-id>, Status: completed
Processor config: multiplier=2

Key Points:

  • Separate serialization: Flow structure and execution state are separate

  • Distribution: Send both files to remote host

  • Recovery: Restore both on remote host

  • Resume: If execution was paused, use flow.resume(job_state) to continue

Common Pitfalls

Pitfall 1: Using constructor parameters

class BadRoutine(Routine):
    def __init__(self, config_value):  # ❌ Breaks serialization!
        super().__init__()
        self.config_value = config_value

Solution: Use _config dictionary: self.set_config(config_value=value)

Pitfall 2: Not registering custom fields

class CustomRoutine(Routine):
    def __init__(self):
        super().__init__()
        self.custom_field = "value"  # Not serialized by default!
        # Need to register it
        self.add_serializable_fields(["custom_field"])

Solution: Register custom fields with add_serializable_fields()

Pitfall 3: Storing non-serializable objects

def __init__(self):
    super().__init__()
    self.file_handle = open("file.txt")  # Not serializable!

Solution: Don’t store non-serializable objects. Recreate them when needed.

Pitfall 4: Serializing Flow with execution state for distribution

flow_data = flow.serialize()  # Includes job_state by default if set
# Sending this to another host will only include the last execution state!

Solution: Use flow.serialize(include_execution_state=False) for distribution. Serialize JobState separately if you need to recover a specific execution.

Pitfall 5: Expecting multiple execute() calls to share state

flow.execute(source1_id)  # Creates JobState 1
flow.execute(source2_id)  # Creates JobState 2 (overwrites JobState 1)
# Aggregator won't see both in the same execution!

Solution: Use a single execute() with multiple emit() calls, or use the aggregation pattern (see Advanced Patterns).

Best Practices

  1. Separate Flow and JobState serialization: - Serialize Flow structure with flow.serialize() (does NOT include execution state) - Serialize JobState separately with job_state.serialize() - This allows proper cross-host recovery

  2. Use _config for configuration: All configuration should be in _config

  3. No constructor parameters: Routines must have no-argument constructors

  4. Register custom fields: Use add_serializable_fields() for custom data

  5. Track multiple executions: Use flow.get_execution(job_id) to access specific executions

  6. Clean up old executions: Use flow.cleanup_completed_executions() to manage memory

  7. Test serialization: Always test that your flows can be serialized/deserialized

  8. Version your workflows: Include version info in flow_id or _config

Understanding Multiple Executions

Key Design Principle: Flow = Workflow Definition, JobState = Execution State

  • Flow: Contains routines, connections, configuration (static structure)

  • JobState: Contains execution state, history, pending tasks (dynamic state)

  • Multiple Executions: Each execute() creates a new JobState with unique job_id

  • Complete Decoupling: Flow does NOT manage JobState - they are completely separate

  • Recovery: To recover execution, serialize both Flow and JobState separately

Best Practice for Cross-Host Execution:

  1. Serialize Flow structure (no execution state - this is the default): .. code-block:: python

    flow_data = flow.serialize() # flow_data does NOT contain job_state

  2. Serialize JobState separately: .. code-block:: python

    job_state_data = job_state.serialize()

  3. Send both to target host

  4. Restore both on target host:

    new_flow = Flow()
    new_flow.deserialize(flow_data)
    
    new_job_state = JobState()
    new_job_state.deserialize(job_state_data)
    
    # Resume if paused
    if new_job_state.status == "paused":
        new_flow.resume(new_job_state)
    

Cross-Host Execution Example:

See Serialization for a complete cross-host execution example with network transfer and database storage patterns.

Next Steps

Congratulations! You’ve completed the Routilux tutorial. You now understand:

  • Creating routines and flows

  • Connecting routines in various patterns

  • Managing state and statistics

  • Handling errors gracefully

  • Executing workflows concurrently

  • Using advanced patterns

  • Serializing and persisting workflows

For more information, check out:

Happy coding with Routilux! 🚀