Serialization

routilux provides full serialization support for persistence and state recovery.

Serializing Objects

All core classes support serialization:

# Serialize a flow
data = flow.serialize()

# Serialize a routine
data = routine.serialize()

# Serialize a job state
data = job_state.serialize()

# Serialize an error handler
data = error_handler.serialize()

Deserializing Objects

Deserialize objects:

# Deserialize a flow
flow = Flow()
flow.deserialize(data)

# Deserialize a routine
routine = Routine()
routine.deserialize(data)

# Deserialize a job state
job_state = JobState()
job_state.deserialize(data)

# Deserialize an error handler
error_handler = ErrorHandler()
error_handler.deserialize(data)

Saving to JSON

Save serialized data to JSON:

import json

data = flow.serialize()
with open("flow.json", "w") as f:
    json.dump(data, f, indent=2)

Loading from JSON

Load from JSON:

import json

with open("flow.json", "r") as f:
    data = json.load(f)

flow = Flow()
flow.deserialize(data)

Serializable Fields

Classes register fields for serialization:

self.add_serializable_fields(["field1", "field2", "field3"])

Only registered fields are serialized. Complex objects (lists, dicts, other Serializable objects) are automatically handled.

Serialization Validation

Before serializing a Flow, the system automatically validates that all Serializable objects (routines, connections, slots, events, etc.) can be constructed without arguments. This ensures that deserialization will succeed.

Why This Matters:

When deserializing, the system needs to create new instances of all Serializable objects. It does this by calling Class() with no arguments. If a class requires constructor parameters, deserialization will fail.

Automatic Validation:

When you call flow.serialize(), the system:

  1. Recursively traverses all Serializable objects in the Flow

  2. Checks that each object’s class can be instantiated without arguments

  3. Raises a clear error if any object fails validation

Example Error:

# ❌ This will fail during serialization
class BadRoutine(Routine):
    def __init__(self, required_param):
        super().__init__()
        self.param = required_param

flow = Flow()
routine = BadRoutine("value")  # This works
flow.add_routine(routine, "bad_routine")

# This will raise TypeError with clear error message
data = flow.serialize()
# TypeError: Routine 'bad_routine' (BadRoutine) cannot be serialized:
# BadRoutine cannot be deserialized because its __init__ method requires
# parameters: required_param
# Serializable classes must support initialization with no arguments.
# For Routine subclasses, use _config dictionary instead of constructor parameters.

Correct Pattern:

# ✅ Correct: Use _config dictionary
class GoodRoutine(Routine):
    def __init__(self):
        super().__init__()
        # Configuration is set after creation

    def configure(self, param1, param2):
        self.set_config(param1=param1, param2=param2)

flow = Flow()
routine = GoodRoutine()
routine.configure(param1="value1", param2="value2")
flow.add_routine(routine, "good_routine")

# This will succeed
data = flow.serialize()

What Gets Validated:

  • All routines in the Flow

  • All connections

  • All slots and events within routines

  • All nested Serializable objects

  • Error handlers, job states, and other Serializable fields

Error Messages:

The validation provides detailed error messages that include:

  • Which object failed (routine ID, connection index, field name, etc.)

  • Which class has the problem

  • What parameters are required

  • How to fix the issue

This allows you to catch serialization issues early, before attempting to save or transfer the Flow.

Constructor Requirements

Critical Rule: All Serializable classes (including Routine subclasses) must support initialization with no arguments.

For Routine Subclasses:

  • Don’t: Accept constructor parameters

  • Do: Use _config dictionary for configuration

Example:

# ❌ Wrong: Constructor with parameters
class MyRoutine(Routine):
    def __init__(self, max_items: int, timeout: float):
        super().__init__()
        self.max_items = max_items
        self.timeout = timeout

# ✅ Correct: No constructor parameters, use _config
class MyRoutine(Routine):
    def __init__(self):
        super().__init__()
        # Configuration is set after creation

    def setup(self, max_items: int, timeout: float):
        self.set_config(max_items=max_items, timeout=timeout)

# Usage
routine = MyRoutine()
routine.setup(max_items=10, timeout=5.0)
flow.add_routine(routine, "my_routine")

Why This Constraint Exists:

During deserialization, the system needs to:

  1. Load the class from the registry

  2. Create an instance: routine = RoutineClass()

  3. Restore state: routine.deserialize(data)

If the class requires constructor parameters, step 2 will fail because the system doesn’t know what values to pass.

Validation Timing:

  • At Class Definition: The @register_serializable decorator checks the class definition when it’s first loaded

  • At Serialization: flow.serialize() validates all objects in the Flow before serialization, providing early error detection

This two-stage validation ensures that:

  1. Classes are correctly defined from the start

  2. Runtime issues are caught before serialization

Special Handling

Some classes have special serialization behavior:

  • ErrorHandler: The ErrorStrategy enum is automatically converted to/from strings during serialization/deserialization.

Handler Method Validation

When serializing slot handlers and merge strategies, the system validates that methods belong to the routine being serialized. This ensures cross-process serialization safety.

Why This Matters:

When serialized data is transferred to another process (e.g., for distributed execution), only methods of the serialized routine itself can be properly restored. Methods from other routines cannot be deserialized because their object instances don’t exist in the new process.

Validation Rules:

  • Allowed: Methods of the routine being serialized

  • Allowed: Module-level functions (can be imported in any process)

  • Allowed: Builtin functions

  • Not Allowed: Methods from other routine instances

Example - Correct Usage:

class MyRoutine(Routine):
    def __init__(self):
        super().__init__()
        # ✅ Correct: Use method from this routine
        self.input_slot = self.define_slot("input", handler=self.process_data)

    def process_data(self, data):
        return {"processed": data}

Example - Incorrect Usage:

class MyRoutine(Routine):
    def __init__(self):
        super().__init__()
        other_routine = OtherRoutine()
        # ❌ Wrong: Using method from another routine
        # This will raise ValueError during serialization
        self.input_slot = self.define_slot("input", handler=other_routine.process)

Error Message:

If you try to serialize a method from another routine, you’ll get a clear error:

ValueError: Cannot serialize method 'process' from OtherRoutine[other_id].
Only methods of the serialized object itself (MyRoutine[my_id])
can be serialized for cross-process execution.

What Gets Validated:

  • Slot handlers (in Routine.define_slot())

  • Merge strategies (if they are callable methods)

  • Conditional router conditions (if they are callable methods)

Note: Functions (not methods) are always allowed because they can be imported by module name in any process.

Flow and JobState Separation

Critical Design Principle: Flow and JobState are completely decoupled.

  • Flow: Contains only the workflow definition (routines, connections, configuration)

  • JobState: Contains execution state (execution history, routine states, status)

Why This Matters:

  • Flow serialization does NOT include execution state

  • JobState must be serialized separately for recovery

  • This allows multiple independent executions of the same flow

  • Enables proper cross-host execution and recovery

Serialization Pattern:

# Serialize Flow (workflow definition only)
flow_data = flow.serialize()
# flow_data does NOT contain job_state

# Serialize JobState separately (execution state)
job_state_data = job_state.serialize()

# Save both for recovery
save_flow(flow_data)
save_job_state(job_state_data)

Deserialization Pattern:

# Deserialize Flow
new_flow = Flow()
new_flow.deserialize(flow_data)

# Deserialize JobState separately
new_job_state = JobState()
new_job_state.deserialize(job_state_data)

# Resume execution
resumed = new_flow.resume(new_job_state)

Cross-Host Serialization and Execution

routilux supports transferring workflows and execution state across different hosts for distributed execution and recovery.

Use Cases:

  • Distributed Execution: Execute workflow on one host, transfer to another

  • Recovery: Resume execution on a different host after failure

  • Load Balancing: Move execution to a less loaded host

  • Persistence: Save execution state for later recovery

Complete Example: Cross-Host Execution:

 1# ============================================
 2# Host A: Execute and Prepare for Transfer
 3# ============================================
 4
 5from routilux import Flow, Routine, JobState
 6from serilux import register_serializable
 7import json
 8
 9@register_serializable
10class DataSource(Routine):
11    def __init__(self):
12        super().__init__()
13        self.trigger_slot = self.define_slot("trigger", handler=self.send)
14        self.output_event = self.define_event("output", ["data"])
15
16    def send(self, **kwargs):
17        self.emit("output", data="initial_data")
18
19@register_serializable
20class DataProcessor(Routine):
21    def __init__(self):
22        super().__init__()
23        self.input_slot = self.define_slot("input", handler=self.process)
24        self.output_event = self.define_event("output", ["result"])
25
26    def process(self, data=None, **kwargs):
27        result = f"Processed: {data}"
28        self.emit("output", result=result)
29
30# Create flow on Host A
31flow = Flow(flow_id="cross_host_flow")
32source = DataSource()
33processor = DataProcessor()
34source_id = flow.add_routine(source, "source")
35processor_id = flow.add_routine(processor, "processor")
36flow.connect(source_id, "output", processor_id, "input")
37
38# Execute and pause (simulating transfer point)
39job_state = flow.execute(source_id)
40flow.pause(job_state, reason="Transfer to Host B")
41
42# Serialize both Flow and JobState
43flow_data = flow.serialize()
44job_state_data = job_state.serialize()
45
46# Prepare for transfer (JSON format)
47transfer_data = {
48    "flow": flow_data,
49    "job_state": job_state_data
50}
51
52# Save to file (or send over network)
53with open("transfer.json", "w") as f:
54    json.dump(transfer_data, f, indent=2)
55
56print("✅ Host A: Flow and JobState serialized and ready for transfer")
57
58# ============================================
59# Host B: Receive and Resume Execution
60# ============================================
61
62# Load from file (or receive from network)
63with open("transfer.json", "r") as f:
64    transfer_data = json.load(f)
65
66# Deserialize Flow
67new_flow = Flow()
68new_flow.deserialize(transfer_data["flow"])
69
70# Deserialize JobState
71new_job_state = JobState()
72new_job_state.deserialize(transfer_data["job_state"])
73
74# Verify deserialization
75assert new_flow.flow_id == flow.flow_id
76assert new_job_state.job_id == job_state.job_id
77assert new_job_state.status == "paused"
78
79# Resume execution on Host B
80resumed_job_state = new_flow.resume(new_job_state)
81
82# Wait for completion
83from routilux.job_state import JobState
84JobState.wait_for_completion(new_flow, resumed_job_state, timeout=10.0)
85
86print(f"✅ Host B: Execution resumed and completed")
87print(f"   Final status: {resumed_job_state.status}")
88print(f"   Execution history: {len(resumed_job_state.execution_history)} records")

Expected Output:

✅ Host A: Flow and JobState serialized and ready for transfer
✅ Host B: Execution resumed and completed
   Final status: completed
   Execution history: 4 records

Key Points:

  1. Flow and JobState are serialized separately - This is required

  2. Both must be transferred - Flow (definition) + JobState (execution state)

  3. Routines must be registered - Use @register_serializable decorator

  4. Deserialization order matters - Deserialize Flow first, then JobState

  5. Resume uses deserialized JobState - Pass the deserialized JobState to resume()

Network Transfer Example:

 1import socket
 2import json
 3import pickle
 4
 5# Host A: Send
 6def send_to_host_b(flow_data, job_state_data, host_b_address):
 7    transfer_data = {
 8        "flow": flow_data,
 9        "job_state": job_state_data
10    }
11
12    # Serialize to JSON
13    json_data = json.dumps(transfer_data)
14
15    # Send over network
16    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
17    sock.connect(host_b_address)
18    sock.sendall(json_data.encode())
19    sock.close()
20
21# Host B: Receive
22def receive_from_host_a(port):
23    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
24    sock.bind(("", port))
25    sock.listen(1)
26    conn, addr = sock.accept()
27
28    # Receive data
29    data = b""
30    while True:
31        chunk = conn.recv(4096)
32        if not chunk:
33            break
34        data += chunk
35
36    conn.close()
37    sock.close()
38
39    # Deserialize
40    transfer_data = json.loads(data.decode())
41    return transfer_data["flow"], transfer_data["job_state"]

Database Storage Example:

 1import sqlite3
 2import json
 3
 4# Host A: Save to database
 5def save_execution_to_db(flow_data, job_state_data, execution_id):
 6    conn = sqlite3.connect("executions.db")
 7    cursor = conn.cursor()
 8
 9    cursor.execute("""
10        CREATE TABLE IF NOT EXISTS executions (
11            execution_id TEXT PRIMARY KEY,
12            flow_data TEXT,
13            job_state_data TEXT,
14            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
15        )
16    """)
17
18    cursor.execute("""
19        INSERT INTO executions (execution_id, flow_data, job_state_data)
20        VALUES (?, ?, ?)
21    """, (execution_id, json.dumps(flow_data), json.dumps(job_state_data)))
22
23    conn.commit()
24    conn.close()
25
26# Host B: Load from database
27def load_execution_from_db(execution_id):
28    conn = sqlite3.connect("executions.db")
29    cursor = conn.cursor()
30
31    cursor.execute("""
32        SELECT flow_data, job_state_data
33        FROM executions
34        WHERE execution_id = ?
35    """, (execution_id,))
36
37    row = cursor.fetchone()
38    conn.close()
39
40    if row:
41        flow_data = json.loads(row[0])
42        job_state_data = json.loads(row[1])
43        return flow_data, job_state_data
44    else:
45        return None, None

Best Practices for Cross-Host Execution:

  1. Always serialize Flow and JobState separately:

    # ✅ Correct
    flow_data = flow.serialize()
    job_state_data = job_state.serialize()
    
    # ❌ Wrong: Flow doesn't include JobState
    # flow_data = flow.serialize(include_execution_state=True)  # This doesn't exist!
    
  2. Register all custom Routine classes:

    from serilux import register_serializable
    
    @register_serializable
    class MyRoutine(Routine):
        def __init__(self):
            super().__init__()
            # ...
    
  3. Use no-argument constructors:

    # ✅ Correct
    class MyRoutine(Routine):
        def __init__(self):
            super().__init__()
            self.set_config(param1="value1")
    
    # ❌ Wrong: Constructor with parameters
    class MyRoutine(Routine):
        def __init__(self, param1):  # Will fail during deserialization!
            super().__init__()
            self.param1 = param1
    
  4. Validate before transfer:

    # Serialize and validate
    flow_data = flow.serialize()  # Validates automatically
    job_state_data = job_state.serialize()
    
    # Test deserialization locally before transfer
    test_flow = Flow()
    test_flow.deserialize(flow_data)
    
    test_job_state = JobState()
    test_job_state.deserialize(job_state_data)
    
    # If this works, safe to transfer
    
  5. Handle errors gracefully:

    try:
        new_flow = Flow()
        new_flow.deserialize(flow_data)
    
        new_job_state = JobState()
        new_job_state.deserialize(job_state_data)
    
        resumed = new_flow.resume(new_job_state)
    except Exception as e:
        print(f"Failed to resume execution: {e}")
        # Handle error (retry, log, notify, etc.)
    

Common Pitfalls:

  1. Forgetting to serialize JobState:

    # ❌ Wrong: Only serializing Flow
    flow_data = flow.serialize()
    # JobState is lost!
    
    # ✅ Correct: Serialize both
    flow_data = flow.serialize()
    job_state_data = job_state.serialize()
    
  2. Assuming Flow includes execution state:

    # ❌ Wrong: Flow doesn't have job_state
    flow_data = flow.serialize()
    # flow_data["job_state"]  # KeyError!
    
    # ✅ Correct: JobState is separate
    job_state_data = job_state.serialize()
    
  3. Not registering custom routines:

    # ❌ Wrong: Not registered
    class MyRoutine(Routine):
        def __init__(self):
            super().__init__()
    
    # Will fail during deserialization on Host B
    
    # ✅ Correct: Registered
    @register_serializable
    class MyRoutine(Routine):
        def __init__(self):
            super().__init__()
    
  4. Using constructor parameters:

    # ❌ Wrong: Constructor with parameters
    class MyRoutine(Routine):
        def __init__(self, max_items):
            super().__init__()
            self.max_items = max_items
    
    # Will fail during deserialization
    
    # ✅ Correct: Use _config
    class MyRoutine(Routine):
        def __init__(self):
            super().__init__()
    
        def setup(self, max_items):
            self.set_config(max_items=max_items)