State Management

In this tutorial, you’ll learn how to track execution state and monitor your workflows using Routilux’s built-in state management features.

Learning Objectives

By the end of this tutorial, you’ll be able to:

  • Track execution state using JobState

  • Store routine-specific state in JobState

  • Access execution history

  • Monitor workflow performance

  • Use shared data areas for execution-wide state

Step 1: Understanding JobState

JobState is the central mechanism for tracking execution state. Each execution creates a unique JobState that tracks:

  • Execution status (pending, running, completed, failed, etc.)

  • Execution history (all routine executions)

  • Routine states (per-routine execution information)

  • Shared data (execution-wide data storage)

 1from routilux import Flow, Routine
 2
 3class SimpleProcessor(Routine):
 4    def __init__(self):
 5        super().__init__()
 6        self.input_slot = self.define_slot("input", handler=self.process)
 7        self.output_event = self.define_event("output", ["result"])
 8
 9    def process(self, data=None, **kwargs):
10        data_value = data or kwargs.get("data", "")
11        result = f"Processed: {data_value}"
12
13        # Store execution state in JobState using get_execution_context()
14        ctx = self.get_execution_context()
15        if ctx:
16            ctx.job_state.update_routine_state(
17                ctx.routine_id, {"last_processed": data_value, "processed": True}
18            )
19
20        self.emit("output", result=result)
21
22class DataSource(Routine):
23    def __init__(self):
24        super().__init__()
25        self.trigger_slot = self.define_slot("trigger", handler=self.send)
26        self.output_event = self.define_event("output", ["data"])
27
28    def send(self, **kwargs):
29        self.emit("output", data="test")
30
31flow = Flow(flow_id="jobstate_flow")
32
33source = DataSource()
34processor = SimpleProcessor()
35
36source_id = flow.add_routine(source, "source")
37processor_id = flow.add_routine(processor, "processor")
38
39flow.connect(source_id, "output", processor_id, "input")
40
41job_state = flow.execute(source_id)
42
43# Access JobState information
44print(f"Status: {job_state.status}")
45print(f"Flow ID: {job_state.flow_id}")
46print(f"Created at: {job_state.created_at}")
47
48# Get execution history
49history = job_state.get_execution_history()
50print(f"Execution records: {len(history)}")
51
52# Check routine states
53processor_state = job_state.get_routine_state(processor_id)
54print(f"Processor state: {processor_state}")

Expected Output:

Status: completed
Flow ID: jobstate_flow
Created at: 2024-01-01 12:00:00
Execution records: 2
Processor state: {'last_processed': 'test', 'processed': True}

Key Points:

  • JobState is created for each execution

  • Execution state should be stored in JobState, not routine instance variables

  • update_routine_state() stores routine-specific state

  • get_routine_state() retrieves routine state

  • Execution history records all routine executions

Step 2: Storing Routine State

Routines should store execution-specific state in JobState:

 1from routilux import Flow, Routine
 2
 3class StatefulProcessor(Routine):
 4    def __init__(self):
 5        super().__init__()
 6        self.input_slot = self.define_slot("input", handler=self.process)
 7        self.output_event = self.define_event("output", ["result"])
 8
 9    def process(self, data=None, **kwargs):
10        data_value = data or kwargs.get("data", "")
11
12        # Get execution context
13        ctx = self.get_execution_context()
14        if not ctx:
15            return
16
17        # Get current state
18        current_state = ctx.job_state.get_routine_state(ctx.routine_id) or {}
19        processed_count = current_state.get("processed_count", 0) + 1
20
21        try:
22            # Process data
23            result = data_value.upper()
24
25            # Update state in JobState
26            ctx.job_state.update_routine_state(
27                ctx.routine_id,
28                {
29                    "processed_count": processed_count,
30                    "last_processed": data_value,
31                    "last_result": result,
32                    "status": "success",
33                },
34            )
35
36            self.emit("output", result=result)
37
38        except Exception as e:
39            # Update error state in JobState
40            error_count = current_state.get("error_count", 0) + 1
41            ctx.job_state.update_routine_state(
42                ctx.routine_id,
43                {
44                    "processed_count": processed_count,
45                    "error_count": error_count,
46                    "last_error": str(e),
47                    "status": "error",
48                },
49            )
50            raise
51
52class DataSource(Routine):
53    def __init__(self):
54        super().__init__()
55        self.trigger_slot = self.define_slot("trigger", handler=self.send)
56        self.output_event = self.define_event("output", ["data"])
57
58    def send(self, **kwargs):
59        for item in ["hello", "world", "routilux"]:
60            self.emit("output", data=item)
61
62flow = Flow(flow_id="stateful_flow")
63
64source = DataSource()
65processor = StatefulProcessor()
66
67source_id = flow.add_routine(source, "source")
68processor_id = flow.add_routine(processor, "processor")
69
70flow.connect(source_id, "output", processor_id, "input")
71
72from routilux.job_state import JobState
73job_state = flow.execute(source_id)
74JobState.wait_for_completion(flow, job_state, timeout=2.0)
75
76# Check routine state
77processor_state = job_state.get_routine_state(processor_id)
78print(f"Processed: {processor_state.get('processed_count', 0)}")
79print(f"Errors: {processor_state.get('error_count', 0)}")
80print(f"Last processed: {processor_state.get('last_processed', 'none')}")

Expected Output:

Processed: 3
Errors: 0
Last processed: routilux

Key Points:

  • Always get flow and job_state at the start of handler

  • Use get_routine_state() to get current state

  • Use update_routine_state() to update state

  • State is execution-specific and isolated per execution

Step 3: Using Shared Data

JobState provides shared data areas for execution-wide state:

 1from routilux import Flow, Routine
 2
 3class DataCollector(Routine):
 4    def __init__(self):
 5        super().__init__()
 6        self.input_slot = self.define_slot("input", handler=self.collect)
 7
 8    def collect(self, data=None, **kwargs):
 9        data_value = data or kwargs.get("data", "")
10
11        # Use get_execution_context() for convenient access
12        ctx = self.get_execution_context()
13        if ctx:
14            # Append to shared log
15            ctx.job_state.append_to_shared_log(
16                {"action": "collected", "data": data_value, "routine": "collector"}
17            )
18
19            # Update shared data
20            collected_items = ctx.job_state.get_shared_data("collected_items", [])
21            collected_items.append(data_value)
22            ctx.job_state.update_shared_data("collected_items", collected_items)
23
24class DataProcessor(Routine):
25    def __init__(self):
26        super().__init__()
27        self.input_slot = self.define_slot("input", handler=self.process)
28        self.output_event = self.define_event("output", ["result"])
29
30    def process(self, data=None, **kwargs):
31        data_value = data or kwargs.get("data", "")
32
33        # Use get_execution_context() for convenient access
34        ctx = self.get_execution_context()
35        if ctx:
36            # Read shared data
37            collected_items = ctx.job_state.get_shared_data("collected_items", [])
38
39            # Process with context
40            result = f"Processed {len(collected_items)} items, current: {data_value}"
41
42            # Append to shared log
43            ctx.job_state.append_to_shared_log(
44                {"action": "processed", "data": data_value, "result": result}
45            )
46
47            self.emit("output", result=result)
48
49class DataSource(Routine):
50    def __init__(self):
51        super().__init__()
52        self.trigger_slot = self.define_slot("trigger", handler=self.send)
53        self.output_event = self.define_event("output", ["data"])
54
55    def send(self, **kwargs):
56        for item in ["a", "b", "c"]:
57            self.emit("output", data=item)
58
59flow = Flow(flow_id="shared_data_flow")
60
61source = DataSource()
62collector = DataCollector()
63processor = DataProcessor()
64
65source_id = flow.add_routine(source, "source")
66collector_id = flow.add_routine(collector, "collector")
67processor_id = flow.add_routine(processor, "processor")
68
69flow.connect(source_id, "output", collector_id, "input")
70flow.connect(source_id, "output", processor_id, "input")
71
72from routilux.job_state import JobState
73job_state = flow.execute(source_id)
74JobState.wait_for_completion(flow, job_state, timeout=2.0)
75
76# Check shared data
77collected_items = job_state.get_shared_data("collected_items", [])
78print(f"Collected items: {collected_items}")
79
80# Check shared log
81log = job_state.get_shared_log()
82print(f"Log entries: {len(log)}")
83for entry in log:
84    print(f"  {entry}")

Expected Output:

Collected items: ['a', 'b', 'c']
Log entries: 6
  {'action': 'collected', 'data': 'a', 'routine': 'collector', 'timestamp': '...'}
  {'action': 'collected', 'data': 'b', 'routine': 'collector', 'timestamp': '...'}
  {'action': 'collected', 'data': 'c', 'routine': 'collector', 'timestamp': '...'}
  {'action': 'processed', 'data': 'a', 'result': '...', 'timestamp': '...'}
  {'action': 'processed', 'data': 'b', 'result': '...', 'timestamp': '...'}
  {'action': 'processed', 'data': 'c', 'result': '...', 'timestamp': '...'}

Key Points:

  • update_shared_data() stores execution-wide data

  • get_shared_data() retrieves shared data

  • append_to_shared_log() appends to execution log

  • get_shared_log() retrieves log entries

  • Shared data is execution-specific and isolated

Step 4: Accessing Execution History

Execution history records all routine executions:

 1from routilux import Flow, Routine
 2
 3class SimpleProcessor(Routine):
 4    def __init__(self):
 5        super().__init__()
 6        self.input_slot = self.define_slot("input", handler=self.process)
 7        self.output_event = self.define_event("output", ["result"])
 8
 9    def process(self, data=None, **kwargs):
10        data_value = data or kwargs.get("data", "")
11        result = f"Processed: {data_value}"
12        self.emit("output", result=result)
13
14class DataSource(Routine):
15    def __init__(self):
16        super().__init__()
17        self.trigger_slot = self.define_slot("trigger", handler=self.send)
18        self.output_event = self.define_event("output", ["data"])
19
20    def send(self, **kwargs):
21        for item in ["a", "b", "c"]:
22            self.emit("output", data=item)
23
24flow = Flow(flow_id="history_flow")
25
26source = DataSource()
27processor = SimpleProcessor()
28
29source_id = flow.add_routine(source, "source")
30processor_id = flow.add_routine(processor, "processor")
31
32flow.connect(source_id, "output", processor_id, "input")
33
34from routilux.job_state import JobState
35job_state = flow.execute(source_id)
36JobState.wait_for_completion(flow, job_state, timeout=2.0)
37
38# Get all execution history
39history = job_state.get_execution_history()
40print(f"Total records: {len(history)}")
41
42# Get history for specific routine
43processor_history = job_state.get_execution_history(processor_id)
44print(f"Processor executions: {len(processor_history)}")
45
46# Print history
47for record in history:
48    print(f"  {record.routine_id}: {record.event_name} at {record.timestamp}")

Expected Output:

Total records: 4
Processor executions: 3
  source: output at 2024-01-01 12:00:00
  processor: output at 2024-01-01 12:00:00.001
  source: output at 2024-01-01 12:00:00.002
  processor: output at 2024-01-01 12:00:00.003

Key Points:

  • get_execution_history() returns all execution records

  • get_execution_history(routine_id) filters by routine

  • Each record contains routine_id, event_name, data, and timestamp

  • History is automatically recorded for all event emissions

Step 5: Complete Example - Stateful Workflow

Here’s a complete example combining state management features:

  1from routilux import Flow, Routine
  2
  3class DataSource(Routine):
  4    def __init__(self):
  5        super().__init__()
  6        self.trigger_slot = self.define_slot("trigger", handler=self.generate)
  7        self.output_event = self.define_event("output", ["data"])
  8
  9    def generate(self, count=3, **kwargs):
 10        count = count or kwargs.get("count", 3)
 11
 12        # Use get_execution_context() for convenient access
 13        ctx = self.get_execution_context()
 14        if ctx:
 15            ctx.job_state.update_routine_state(ctx.routine_id, {"generated_count": count})
 16
 17        for i in range(count):
 18            self.emit("output", data=f"item_{i}")
 19
 20class Validator(Routine):
 21    def __init__(self):
 22        super().__init__()
 23        self.input_slot = self.define_slot("input", handler=self.validate)
 24        self.output_event = self.define_event("output", ["data", "valid"])
 25
 26    def validate(self, data=None, **kwargs):
 27        data_value = data or kwargs.get("data", "")
 28
 29        # Use get_execution_context() for convenient access
 30        ctx = self.get_execution_context()
 31        if not ctx:
 32            return
 33
 34        current_state = ctx.job_state.get_routine_state(ctx.routine_id) or {}
 35
 36        # Simple validation: must contain "item"
 37        is_valid = "item" in str(data_value)
 38
 39        if is_valid:
 40            valid_count = current_state.get("valid_count", 0) + 1
 41            ctx.job_state.update_routine_state(
 42                ctx.routine_id, {"valid_count": valid_count, "last_valid": data_value}
 43            )
 44        else:
 45            invalid_count = current_state.get("invalid_count", 0) + 1
 46            ctx.job_state.update_routine_state(
 47                ctx.routine_id, {"invalid_count": invalid_count, "last_invalid": data_value}
 48            )
 49
 50        self.emit("output", data=data_value, valid=is_valid)
 51
 52class Aggregator(Routine):
 53    def __init__(self):
 54        super().__init__()
 55        self.input_slot = self.define_slot("input", handler=self.aggregate)
 56
 57    def aggregate(self, data=None, valid=None, **kwargs):
 58        data_value = data or kwargs.get("data", "")
 59        is_valid = valid if valid is not None else kwargs.get("valid", False)
 60
 61        # Use get_execution_context() for convenient access
 62        ctx = self.get_execution_context()
 63        if ctx:
 64            current_state = ctx.job_state.get_routine_state(ctx.routine_id) or {}
 65
 66            total_received = current_state.get("total_received", 0) + 1
 67            total_valid = current_state.get("total_valid", 0)
 68            if is_valid:
 69                total_valid += 1
 70
 71            ctx.job_state.update_routine_state(
 72                ctx.routine_id, {"total_received": total_received, "total_valid": total_valid}
 73            )
 74
 75        print(f"Aggregated: {data_value} (valid={is_valid})")
 76
 77def main():
 78    flow = Flow(flow_id="stateful_workflow")
 79
 80    source = DataSource()
 81    validator = Validator()
 82    aggregator = Aggregator()
 83
 84    source_id = flow.add_routine(source, "source")
 85    validator_id = flow.add_routine(validator, "validator")
 86    agg_id = flow.add_routine(aggregator, "aggregator")
 87
 88    flow.connect(source_id, "output", validator_id, "input")
 89    flow.connect(validator_id, "output", agg_id, "input")
 90
 91    from routilux.job_state import JobState
 92    job_state = flow.execute(source_id, entry_params={"count": 5})
 93    JobState.wait_for_completion(flow, job_state, timeout=2.0)
 94
 95    print("\n=== Execution State ===")
 96    source_state = job_state.get_routine_state(source_id)
 97    validator_state = job_state.get_routine_state(validator_id)
 98    aggregator_state = job_state.get_routine_state(agg_id)
 99
100    print(f"Source generated: {source_state.get('generated_count', 0)}")
101    print(
102        f"Validator - Valid: {validator_state.get('valid_count', 0)}, "
103        f"Invalid: {validator_state.get('invalid_count', 0)}"
104    )
105    print(
106        f"Aggregator - Total: {aggregator_state.get('total_received', 0)}, "
107        f"Valid: {aggregator_state.get('total_valid', 0)}"
108    )
109    print(f"\nExecution status: {job_state.status}")
110
111if __name__ == "__main__":
112    main()

Expected Output:

Aggregated: item_0 (valid=True)
Aggregated: item_1 (valid=True)
Aggregated: item_2 (valid=True)
Aggregated: item_3 (valid=True)
Aggregated: item_4 (valid=True)

=== Execution State ===
Source generated: 5
Validator - Valid: 5, Invalid: 0
Aggregator - Total: 5, Valid: 5

Execution status: completed

Key Points:

  • Store all execution state in JobState

  • Use update_routine_state() to update state

  • Use get_routine_state() to retrieve state

  • State is execution-specific and isolated per execution

Common Pitfalls

Pitfall 1: Modifying routine instance variables during execution

def process(self, **kwargs):
    # ❌ Don't do this - breaks execution isolation
    self.counter += 1
    self.data.append(kwargs)

Solution: Store execution state in JobState:

def process(self, **kwargs):
    # Use get_execution_context() for convenient access
    ctx = self.get_execution_context()
    if ctx:
        current_state = ctx.job_state.get_routine_state(ctx.routine_id) or {}
        counter = current_state.get("counter", 0) + 1
        ctx.job_state.update_routine_state(ctx.routine_id, {"counter": counter})

Pitfall 2: Not checking for flow and job_state

def process(self, **kwargs):
    job_state = getattr(self, "_current_job_state", None)  # Set by slot.receive()

Solution: Use get_execution_context() which handles all checks:

def process(self, **kwargs):
    ctx = self.get_execution_context()
    if ctx:
        # Safe to use ctx.job_state here
        ctx.job_state.update_routine_state(ctx.routine_id, {"processed": True})

Pitfall 3: Using wrong routine_id

def process(self, **kwargs):
    # ❌ Wrong - self._id is memory address, not routine_id in flow
    job_state.update_routine_state(self._id, {"processed": True})

Solution: Use get_execution_context() which provides the correct routine_id:

def process(self, **kwargs):
    ctx = self.get_execution_context()
    if ctx:
        # ctx.routine_id is the correct routine_id from flow
        ctx.job_state.update_routine_state(ctx.routine_id, {"processed": True})

Best Practices

  1. Always store execution state in JobState: Never modify routine instance variables during execution

  2. Use get_execution_context(): Use get_execution_context() for convenient access to flow, job_state, and routine_id

  3. Use send_output() for output: Use send_output() to send execution-specific output (not events)

  4. Use emit_deferred_event() for pause/resume: Use emit_deferred_event() to emit events that will be processed on resume

  5. Use get_routine_state() first: Get current state before updating to avoid overwriting

  6. Use shared data for execution-wide state: Use update_shared_data() and append_to_shared_log()

  7. Access execution history: Use get_execution_history() to track all executions

Next Steps

Now that you understand state management, let’s move on to Error Handling to learn how to build resilient workflows that handle errors gracefully.