JobState: Execution State Management¶
The JobState object is central to routilux’s execution model. It represents the execution state of a single workflow execution, completely decoupled from the Flow (workflow definition).
Understanding JobState’s Role is Critical
JobState is responsible for all runtime state and business data:
Execution State: Status, routine states, execution history
Business Data: Intermediate processing data (
shared_data,shared_log)Output Handling: Execution-specific output (
output_handler,output_log)Deferred Events: Events to be emitted on resume
Pause Points: Checkpoints for resumption
What JobState Does NOT Do:
❌ Define workflow structure (that’s
Flow’s job)❌ Implement node functionality (that’s
Routine’s job)❌ Store static configuration (that’s
Routine._config)
Key Principle: Everything that changes during execution belongs in JobState.
Everything that’s static belongs in Flow or Routine._config.
Key Concepts¶
Flow vs JobState vs Routine:
Flow: Static workflow definition (routines, connections, static configuration)
Routine: Function implementation (what each node does, static config in
_config)JobState: Dynamic execution state (runtime state, business data, output)
This clear separation allows:
Multiple independent executions of the same flow
Proper serialization and recovery
Distributed execution across hosts
Independent pause/resume/cancel operations
Reusable routine objects across executions
Identifiers in JobState¶
Each JobState has two important identifiers:
job_id: Automatically generated UUID that uniquely identifies this execution instance
flow_id: The identifier of the Flow that created this JobState
For details on how to use these identifiers, see Identifiers: job_id, flow_id, and routine_id.
Creating JobState¶
A JobState is automatically created when you call flow.execute():
1from routilux import Flow, Routine
2
3class Source(Routine):
4 def __init__(self):
5 super().__init__()
6 self.trigger_slot = self.define_slot("trigger", handler=self.send)
7 self.output_event = self.define_event("output", ["data"])
8
9 def send(self, **kwargs):
10 self.emit("output", data="test")
11
12flow = Flow(flow_id="my_flow")
13source = Source()
14source_id = flow.add_routine(source, "source")
15
16# Execute returns a JobState
17job_state = flow.execute(source_id)
18
19print(f"Job ID: {job_state.job_id}")
20print(f"Status: {job_state.status}")
21print(f"Flow ID: {job_state.flow_id}")
Expected Output:
Job ID: 550e8400-e29b-41d4-a716-446655440000
Status: completed
Flow ID: my_flow
Key Points:
Each
execute()call creates a new, independentJobStateThe
JobStateis returned to you - Flow does not manage itMultiple executions = multiple independent
JobStateobjects
Multiple Independent Executions¶
You can execute the same flow multiple times, each with its own JobState:
1# Execute the same flow multiple times
2job_state1 = flow.execute(source_id, entry_params={"value": "A"})
3job_state2 = flow.execute(source_id, entry_params={"value": "B"})
4job_state3 = flow.execute(source_id, entry_params={"value": "C"})
5
6# Each execution has its own JobState
7assert job_state1.job_id != job_state2.job_id
8assert job_state2.job_id != job_state3.job_id
9assert job_state1 is not job_state2 # Different objects
10
11# Each has its own execution history
12print(f"Execution 1 history: {len(job_state1.execution_history)} records")
13print(f"Execution 2 history: {len(job_state2.execution_history)} records")
14print(f"Execution 3 history: {len(job_state3.execution_history)} records")
Key Points:
Each execution is completely independent
Execution history, routine states, and status are separate
You can pause/resume/cancel each execution independently
JobState Properties¶
The JobState object contains:
Core Properties:
job_id: Unique identifier for this executionflow_id: Identifier of the flow that created this executionstatus: Current execution status (“running”, “completed”, “failed”, “paused”, “cancelled”)created_at: Timestamp when execution startedupdated_at: Timestamp of last update
Execution Tracking:
execution_history: List ofExecutionRecordobjects (chronological log)routine_states: Dictionary mapping routine_id to routine statecurrent_routine_id: Currently executing routine (if any)
Metadata:
metadata: Custom metadata dictionarycheckpoint: Checkpoint data for pause/resume
Accessing Execution History¶
The execution history provides a chronological log of all routine executions and event emissions:
1job_state = flow.execute(source_id)
2
3# Get all execution records
4all_records = job_state.execution_history
5print(f"Total records: {len(all_records)}")
6
7# Get records for a specific routine
8source_records = job_state.get_execution_history("source")
9print(f"Source records: {len(source_records)}")
10
11# Iterate through records
12for record in job_state.execution_history:
13 print(f"{record.timestamp}: {record.routine_id} -> {record.event_name}")
14 print(f" Data: {record.data}")
Expected Output:
Total records: 3
Source records: 2
2025-01-15T10:30:00: source -> start
Data: {}
2025-01-15T10:30:00.100: source -> output
Data: {'data': 'test'}
2025-01-15T10:30:00.200: source -> completed
Data: {}
Key Points:
Execution history is automatically recorded during execution
Records include routine_id, event_name, data, and timestamp
History is recorded in both main thread and worker threads
Accessing Routine States¶
Routine states track the status and metadata of each routine during execution:
1job_state = flow.execute(source_id)
2
3# Get state for a specific routine
4source_state = job_state.get_routine_state("source")
5if source_state:
6 print(f"Source status: {source_state.get('status')}")
7 print(f"Source metadata: {source_state}")
8
9# Update routine state (typically done by error handlers)
10job_state.update_routine_state("source", {
11 "status": "completed",
12 "execution_time": 0.5,
13 "custom_field": "value"
14})
15
16# Get updated state
17updated_state = job_state.get_routine_state("source")
18print(f"Updated state: {updated_state}")
Key Points:
Routine states are updated automatically during execution
Error handlers can update routine states
You can add custom metadata to routine states
Using get_execution_context() for Convenient Access:
Instead of manually accessing flow and job_state, use get_execution_context():
1from routilux import Routine
2
3class Processor(Routine):
4 def process(self, data=None, **kwargs):
5 # Get execution context (flow, job_state, routine_id)
6 ctx = self.get_execution_context()
7 if ctx:
8 # Update routine state
9 ctx.job_state.update_routine_state(ctx.routine_id, {"processed": True})
10
11 # Store business data
12 ctx.job_state.update_shared_data("last_item", data)
13
14 # Send output
15 self.send_output("user_data", message="Processed", value=data)
Key Points:
get_execution_context()returnsExecutionContext(flow, job_state, routine_id)JobState is accessed via a context variable (thread-safe, set automatically during slot handler execution)
Returns
Noneif not in execution context (no flow or job_state available)
Storing Business Data¶
JobState provides two mechanisms for storing intermediate business data:
1. shared_data (Read/Write Dictionary):
Use shared_data for data that needs to be read and updated by multiple routines:
1from routilux import Routine
2
3class DataCollector(Routine):
4 def process(self, data=None, **kwargs):
5 ctx = self.get_execution_context()
6 if ctx:
7 # Store data
8 ctx.job_state.update_shared_data("items", data)
9
10 # Read data
11 items = ctx.job_state.get_shared_data("items", default=[])
12
13 # Update data
14 ctx.job_state.update_shared_data("count", len(items))
2. shared_log (Append-Only List):
Use shared_log for append-only execution logs:
1from routilux import Routine
2
3class Logger(Routine):
4 def process(self, data=None, **kwargs):
5 ctx = self.get_execution_context()
6 if ctx:
7 # Append to log
8 ctx.job_state.append_to_shared_log({
9 "timestamp": datetime.now().isoformat(),
10 "action": "process",
11 "data": data
12 })
13
14 # Read log
15 log = ctx.job_state.get_shared_log()
16 print(f"Total log entries: {len(log)}")
Key Points:
shared_data: For read/write intermediate datashared_log: For append-only execution logsBoth are serialized with
JobStateBoth are execution-specific (each execution has its own)
Sending Output¶
Use send_output() to send execution-specific output data (not events):
1from routilux import Routine
2from routilux import QueueOutputHandler
3
4class DataProcessor(Routine):
5 def process(self, data=None, **kwargs):
6 # Send output via JobState
7 self.send_output("user_data", message="Processing", value=data)
8 self.send_output("status", progress=50, status="in_progress")
9
10# Set output handler on JobState
11job_state = JobState(flow_id="my_flow")
12job_state.set_output_handler(QueueOutputHandler())
13
14# Now all send_output() calls will be sent to the queue
15flow.execute(entry_id)
Key Points:
send_output()is different fromemit()(which sends events to connected slots)Output is sent to
output_handlerset onJobStateOutput is also logged to
output_logfor persistenceUse
Routine.send_output()for convenient access (automatically gets execution context)
Output Handler Types:
QueueOutputHandler: Send output to a queueCallbackOutputHandler: Call a custom function with outputNullOutputHandler: Discard output (for testing)
Deferred Events¶
Use emit_deferred_event() to emit events that will be processed when the flow is resumed:
1from routilux import Routine
2
3class UserInteraction(Routine):
4 def process(self, data=None, **kwargs):
5 # Emit a deferred event
6 self.emit_deferred_event("user_input_required", question="What is your name?")
7
8 # Pause the execution
9 ctx = self.get_execution_context()
10 if ctx:
11 ctx.flow.pause(ctx.job_state, reason="Waiting for user input")
12
13# Later: Resume execution
14# Deferred events are automatically emitted when flow.resume() is called
15flow.resume(job_state)
Key Points:
emit_deferred_event()stores event info inJobState.deferred_eventsEvents are automatically emitted when
flow.resume()is calledUseful for LLM agent workflows where you need to wait for user input
Use
Routine.emit_deferred_event()for convenient access (automatically gets execution context)
JobState Status¶
The status field tracks the current execution state:
Status Values:
"pending": Flow created but not yet started (initial state)"running": Execution is in progress"completed": Execution completed successfully"failed": Execution failed (error occurred)"paused": Execution is paused (can be resumed)"cancelled": Execution was cancelled
Status Transition Flow:
1job_state = flow.execute(source_id)
2
3# Status starts as "running"
4print(f"Initial status: {job_state.status}") # "running"
5
6# Wait for completion using JobState static method
7from routilux.job_state import JobState
8JobState.wait_for_completion(flow, job_state, timeout=5.0)
9
10# Status becomes "completed" or "failed"
11print(f"Final status: {job_state.status}") # "completed" or "failed"
Status Transition Table:
From Status |
Trigger |
To Status |
Conditions |
|---|---|---|---|
pending |
flow.execute() |
running |
Execution starts |
running |
All tasks done, no errors |
completed |
No critical failures |
running |
Critical failure detected |
failed |
Routine status is “failed” |
running |
flow.pause() |
paused |
Manual pause |
running |
flow.cancel() |
cancelled |
Manual cancel |
paused |
flow.resume() |
running |
Resume execution |
Error Detection in wait_for_completion():
The wait_for_completion() method intelligently detects failures by checking
routine states, not just execution history:
Detection Logic:
Checks routine states for critical failure status: -
"failed"or"error"→ JobState becomes"failed"-"error_continued"→ JobState becomes"completed"(tolerated error) -"skipped"→ JobState becomes"completed"(intentionally skipped)Does NOT check execution history alone: - Slot handler errors are logged to execution history but are tolerated - Only routine state
"failed"or"error"indicates critical failureDistinguishes critical failures from tolerated errors: - Critical: routine status is
"failed"or"error"- Tolerated: routine status is"error_continued"(CONTINUE strategy) - Slot errors: only in execution history, routine status not"failed"
Example: Error Detection:
1from routilux.job_state import JobState
2
3job_state = flow.execute(entry_routine_id)
4
5# wait_for_completion() will:
6# 1. Wait for all tasks to complete
7# 2. Check routine states for "failed" or "error" status
8# 3. If found, set job_state.status = "failed"
9# 4. Otherwise, set job_state.status = "completed"
10completed = JobState.wait_for_completion(flow, job_state, timeout=60.0)
11
12if job_state.status == "failed":
13 # Check which routine failed
14 for routine_id, routine_state in job_state.routine_states.items():
15 if routine_state.get("status") == "failed":
16 print(f"Routine {routine_id} failed: {routine_state.get('error')}")
Key Points:
Status is automatically updated during execution
Status can be updated in both main thread and worker threads
Status changes are thread-safe
wait_for_completion()ensures final status reflects actual execution outcome
Pause, Resume, and Cancel¶
You can control execution using the JobState:
1# Execute
2job_state = flow.execute(source_id)
3
4# Pause execution (requires JobState)
5flow.pause(job_state, reason="Manual pause for inspection")
6
7# Check status
8assert job_state.status == "paused"
9
10# Resume execution (returns new JobState for resumed execution)
11resumed_job_state = flow.resume(job_state)
12
13# Wait for completion using JobState static method
14from routilux.job_state import JobState
15JobState.wait_for_completion(flow, resumed_job_state, timeout=5.0)
16assert resumed_job_state.status == "completed"
17
18# Cancel execution (if needed)
19another_job_state = flow.execute(source_id)
20flow.cancel(another_job_state, reason="No longer needed")
21assert another_job_state.status == "cancelled"
Key Points:
pause(),resume(), andcancel()require theJobStateas first argumentEach execution can be paused/resumed/cancelled independently
Flow does not manage these operations - you pass the
JobStateexplicitly
Serialization and Persistence¶
JobState can be serialized for persistence and recovery:
1import json
2
3# Execute and pause
4job_state = flow.execute(source_id)
5flow.pause(job_state, reason="Save for later")
6
7# Serialize JobState
8job_state_data = job_state.serialize()
9
10# Save to file
11with open("job_state.json", "w") as f:
12 json.dump(job_state_data, f, indent=2)
13
14# Later: Load and deserialize
15with open("job_state.json", "r") as f:
16 loaded_data = json.load(f)
17
18new_job_state = JobState()
19new_job_state.deserialize(loaded_data)
20
21# Resume execution
22resumed = flow.resume(new_job_state)
Key Points:
JobStateserialization is separate fromFlowserializationSerialized
JobStateincludes execution history, routine states, and statusDeserialized
JobStatecan be used to resume executionSee Serialization for cross-host scenarios
Thread Safety and Concurrent Execution¶
Understanding thread safety and concurrent execution is crucial for using routilux effectively, especially in multi-threaded applications or when running multiple executions simultaneously.
Architecture Overview¶
routilux uses a sophisticated multi-threaded architecture to support concurrent execution while maintaining thread safety and proper isolation between executions.
1. Thread Pool Management:
Each Flow maintains a single shared ThreadPoolExecutor that is reused across
all executions:
1from routilux import Flow
2
3flow = Flow(flow_id="my_flow", execution_strategy="concurrent", max_workers=5)
4
5# First execution - thread pool is created
6job_state1 = flow.execute(entry_id)
7
8# Second execution - reuses the same thread pool
9job_state2 = flow.execute(entry_id)
10
11# Both executions share the same ThreadPoolExecutor
12# The thread pool is created when first needed and reused for all executions
Key Points:
✅ Thread pool is Flow-level: Created once per
Flow, not per execution✅ Shared across executions: All executions of the same flow share the same pool
✅ Efficient resource usage: Avoids creating/destroying thread pools repeatedly
2. Three Types of Threads:
Understanding the different threads involved in execution is essential:
- Execution Thread (User Thread):
The thread that calls
flow.execute()Usually the main thread or a user-created thread
Creates the
JobStateobjectSets JobState in context variable for entry routine
Triggers the entry routine’s handler
- Event Loop Thread (Background Thread):
A background thread created by
start_event_loop()Processes the task queue sequentially
Submits tasks to the ThreadPoolExecutor
Manages task scheduling and coordination
- Worker Threads (ThreadPoolExecutor Threads):
Threads from the shared
ThreadPoolExecutorExecute actual routine handlers
May execute tasks from different executions
Access
JobStatevia context variable (set by slot.receive(), thread-safe)
Example: Thread Identification:
1import threading
2from routilux import Flow, Routine
3
4class ThreadAwareRoutine(Routine):
5 def __init__(self, name):
6 super().__init__()
7 self.name = name
8 self.trigger_slot = self.define_slot("trigger", handler=self.process)
9
10 def process(self, **kwargs):
11 thread_name = threading.current_thread().name
12 print(f"[{self.name}] Executing in thread: {thread_name}")
13
14 # Access JobState via context variable (thread-safe)
15 from routilux.routine import _current_job_state
16 job_state = _current_job_state.get(None)
17 if job_state:
18 job_state.record_execution(self._id, "process", {"thread": thread_name})
19
20flow = Flow(flow_id="thread_demo", execution_strategy="concurrent", max_workers=2)
21routine = ThreadAwareRoutine("Processor")
22routine_id = flow.add_routine(routine, "processor")
23
24# Execution thread (MainThread)
25print(f"Execution thread: {threading.current_thread().name}")
26job_state = flow.execute(routine_id)
27from routilux.job_state import JobState
28JobState.wait_for_completion(flow, job_state, timeout=2.0)
Expected Output:
Execution thread: MainThread
[Processor] Executing in thread: ThreadPoolExecutor-0_0
JobState Access Mechanism¶
routilux uses parameter passing to ensure each task accesses the correct JobState:
1. Task-Level JobState Passing:
Each SlotActivationTask carries its own JobState:
1# When a task is created, it carries the JobState
2task = SlotActivationTask(
3 slot=slot,
4 data=data,
5 job_state=job_state, # JobState is explicitly passed
6 connection=connection
7)
2. Context Variable for Thread-Safe Access:
routilux uses Python’s contextvars.ContextVar to provide thread-safe access to JobState.
When a slot handler is executed, the JobState is set in a context variable:
1from routilux.routine import _current_job_state
2
3def slot.receive(data, job_state=None, flow=None):
4 # Set JobState in context variable for thread-safe access
5 if job_state is not None:
6 old_job_state = _current_job_state.get(None)
7 _current_job_state.set(job_state)
8
9 try:
10 # Handler can access JobState via context variable
11 # This works even when multiple handlers run concurrently
12 self.handler(data)
13 finally:
14 # Restore previous JobState in context variable
15 if old_job_state is not None:
16 _current_job_state.set(old_job_state)
17 else:
18 _current_job_state.set(None)
Why Context Variables?
Thread-safe: Each execution context has its own context variable value, even when multiple handlers run concurrently in different threads
Nested execution support: Context variables support nested execution contexts, allowing complex execution patterns
No function signature changes: Handlers can access JobState via
get_execution_context()without modifying function signaturesAutomatic isolation: Each task carries its own JobState, and context variables ensure different executions don’t interfere, even when the same routine is executed concurrently
Concurrent Execution Scenarios¶
Scenario 1: Multiple Executions in Same Thread (Sequential):
1# Sequential executions in the same thread
2from routilux.job_state import JobState
3
4job_state1 = flow.execute(entry_id, entry_params={"task": "A"})
5JobState.wait_for_completion(flow, job_state1, timeout=2.0)
6
7job_state2 = flow.execute(entry_id, entry_params={"task": "B"})
8JobState.wait_for_completion(flow, job_state2, timeout=2.0)
9
10# Each execution has its own JobState
11assert job_state1.job_id != job_state2.job_id
12assert job_state1 is not job_state2
Scenario 2: Multiple Executions in Different Threads (Concurrent):
1import threading
2
3def run_execution(flow, entry_id, task_name):
4 from routilux.job_state import JobState
5 job_state = flow.execute(entry_id, entry_params={"task": task_name})
6 JobState.wait_for_completion(flow, job_state, timeout=3.0)
7 return job_state
8
9# Create multiple threads, each running an execution
10threads = []
11job_states = {}
12
13for i in range(5):
14 t = threading.Thread(
15 target=lambda i=i: job_states.update({i: run_execution(flow, entry_id, f"Task{i}")}),
16 name=f"ExecThread{i}"
17 )
18 threads.append(t)
19 t.start()
20
21for t in threads:
22 t.join()
23
24# Verify isolation
25job_ids = {js.job_id for js in job_states.values()}
26assert len(job_ids) == 5 # Each execution has unique JobState
Key Observations:
✅ Each execution has its own
JobState✅ All executions share the same
ThreadPoolExecutor✅ Each task carries its own
JobState, ensuring correct access✅ No interference between executions
Scenario 3: Same Worker Thread Executing Tasks from Different Executions:
This is a critical scenario that demonstrates the robustness of the design:
1from routilux import Flow, Routine
2import threading
3import time
4
5class Source(Routine):
6 def __init__(self, name):
7 super().__init__()
8 self.name = name
9 self.trigger_slot = self.define_slot("trigger", handler=self.send)
10 self.output_event = self.define_event("output", ["data"])
11
12 def send(self, **kwargs):
13 # Access JobState via context variable (thread-safe)
14 from routilux.routine import _current_job_state
15 job_state = _current_job_state.get(None)
16 if job_state:
17 thread_name = threading.current_thread().name
18 job_state.record_execution(
19 self._id, "output",
20 {"data": f"from {self.name}", "thread": thread_name}
21 )
22 time.sleep(0.1) # Simulate work
23 self.emit("output", data=f"from {self.name}")
24
25class Processor(Routine):
26 def __init__(self, name):
27 super().__init__()
28 self.name = name
29 self.input_slot = self.define_slot("input", handler=self.process)
30
31 def process(self, data=None, **kwargs):
32 # Access JobState via context variable (thread-safe)
33 from routilux.routine import _current_job_state
34 job_state = _current_job_state.get(None)
35 if job_state:
36 thread_name = threading.current_thread().name
37 job_state.record_execution(
38 self._id, "input",
39 {"data": data, "thread": thread_name}
40 )
41
42flow = Flow(flow_id="isolation_test", execution_strategy="concurrent", max_workers=1)
43
44source1 = Source("Source1")
45source2 = Source("Source2")
46processor1 = Processor("Processor1")
47processor2 = Processor("Processor2")
48
49s1_id = flow.add_routine(source1, "s1")
50s2_id = flow.add_routine(source2, "s2")
51p1_id = flow.add_routine(processor1, "p1")
52p2_id = flow.add_routine(processor2, "p2")
53
54flow.connect(s1_id, "output", p1_id, "input")
55flow.connect(s2_id, "output", p2_id, "input")
56
57# Execute two executions sequentially (same worker thread will be reused)
58from routilux.job_state import JobState
59
60job_state1 = flow.execute(s1_id)
61JobState.wait_for_completion(flow, job_state1, timeout=2.0)
62
63job_state2 = flow.execute(s2_id)
64JobState.wait_for_completion(flow, job_state2, timeout=2.0)
65
66# Verify isolation: each JobState only contains records from its own execution
67js1_routines = {r.routine_id for r in job_state1.execution_history}
68js2_routines = {r.routine_id for r in job_state2.execution_history}
69
70print(f"JobState 1 routines: {js1_routines}")
71print(f"JobState 2 routines: {js2_routines}")
72
73# No cross-contamination
74assert js1_routines & js2_routines == set() or "s1" in js1_routines
75assert "s2" in js2_routines
Expected Output:
JobState 1 routines: {'s1', 'p1', ...}
JobState 2 routines: {'s2', 'p2', ...}
Thread Safety of JobState Updates¶
Question: When multiple routines from the same execution run in different worker threads,
do they safely update the same JobState?
Answer: Yes! In CPython, JobState updates are thread-safe.
How It Works:
1class JobState:
2 def record_execution(self, routine_id: str, event_name: str, data: Dict[str, Any]) -> None:
3 record = ExecutionRecord(routine_id, event_name, data)
4 self.execution_history.append(record) # Atomic in CPython (GIL protected)
5 self.updated_at = datetime.now()
6
7 def update_routine_state(self, routine_id: str, state: Dict[str, Any]) -> None:
8 self.routine_states[routine_id] = state.copy() # Atomic in CPython (GIL protected)
9 self.updated_at = datetime.now()
Thread Safety Guarantees:
✅ ``list.append()`` is atomic in CPython (protected by GIL)
✅ ``dict`` assignment is atomic in CPython (protected by GIL)
✅ Multiple worker threads can safely update the same ``JobState``
✅ Main thread can safely read ``JobState`` while worker threads update it
✅ No data loss or corruption in concurrent scenarios
✅ Reads are thread-safe: You can read
JobStateproperties (execution_history, routine_states, status, etc.) from the main thread at any time, even during execution
Verification Test:
Extensive testing has verified thread safety:
✅ 10 threads concurrently updating 10,000 records: No data loss
✅ 3 writer threads + 2 reader threads: No data corruption
✅ Multiple executions with shared thread pool: Perfect isolation
Important Note:
The thread safety relies on CPython’s Global Interpreter Lock (GIL). If you plan to use routilux in a multi-process environment (without GIL), you may need additional synchronization mechanisms. However, for standard CPython multi-threaded applications, the current implementation is fully thread-safe.
Common Questions and Answers¶
Q1: Does the same worker thread execute tasks from different executions?
A: Yes, worker threads are shared across executions. However, each task carries its
own JobState, and the JobState is set in a context variable before each task execution
and cleared after, ensuring perfect isolation. Context variables provide thread-safe access
even when the same routine is executed concurrently in different threads.
Q2: Can routines from the same execution run in different threads?
A: Yes! In concurrent mode, routines from the same execution can run in different
worker threads. They all update the same JobState, which is thread-safe in CPython.
Q3: What happens if I call ``flow.execute()`` multiple times concurrently?
A: Each call creates a new, independent JobState. All executions share the same
thread pool, but each execution is completely isolated. No interference occurs.
Q4: Is it safe to access ``JobState`` from routine handlers?
A: Yes! You can safely access and update JobState from any routine handler,
regardless of which thread it runs in. The context variable mechanism ensures you always
access the correct JobState for the current execution, even when multiple handlers run
concurrently. Use get_execution_context() for convenient access, or access the context
variable directly via from routilux.routine import _current_job_state.
Q5: Can I read JobState from the main thread while execution is running?
A: Yes! You can safely read JobState from the main thread at any time, even while
worker threads are updating it. In CPython, all read and write operations are protected by
the GIL, ensuring thread safety. You might see intermediate states (e.g., execution_history
growing, status changing from “running” to “completed”), but you won’t see corrupted or
inconsistent data. If you need the final, complete state, wait for completion using
JobState.wait_for_completion().
Thread Safety Best Practices¶
1. Reading JobState from Main Thread:
You can safely read JobState from the main thread at any time, even while worker threads
are updating it. In CPython, all read and write operations are protected by the GIL, ensuring
thread safety:
1# Execute flow (non-blocking)
2job_state = flow.execute(entry_id)
3
4# You can safely read JobState anytime, even during execution
5print(f"Current status: {job_state.status}")
6print(f"History so far: {len(job_state.execution_history)} records")
7
8# Worker threads are updating job_state concurrently, but reads are safe
9# You might see intermediate states (e.g., execution_history growing)
10# But you won't see corrupted or inconsistent data
11
12# Wait for completion to see final state
13from routilux.job_state import JobState
14JobState.wait_for_completion(flow, job_state, timeout=5.0)
15
16# Now you have the final, complete state
17print(f"Final status: {job_state.status}")
18print(f"Final history: {len(job_state.execution_history)} records")
Key Points:
✅ Reads are thread-safe: You can read
JobStatefrom the main thread anytime✅ No data corruption: GIL ensures you won’t see corrupted or partially written data
✅ May see intermediate states: You might see execution_history growing, or status changing from “running” to “completed” - this is normal and safe
✅ Wait for completion for final state: If you need the final, complete state, wait for completion before reading
2. Always Wait for Completion for Final State:
When you need the final, complete state (not intermediate states), wait for completion:
1from routilux.job_state import JobState
2
3job_state = flow.execute(entry_id)
4JobState.wait_for_completion(flow, job_state, timeout=5.0) # Wait for all tasks to complete
5
6# Now safe to access final JobState
7print(f"Status: {job_state.status}")
8print(f"History: {len(job_state.execution_history)} records")
3. Store JobState for Each Execution:
If you need to track multiple concurrent executions, store each JobState:
1from routilux.job_state import JobState
2
3executions = {}
4for i in range(10):
5 job_state = flow.execute(entry_id, entry_params={"task_id": i})
6 executions[i] = job_state
7
8# Wait for all to complete
9for job_state in executions.values():
10 JobState.wait_for_completion(flow, job_state, timeout=5.0)
11
12# Now you can access each JobState independently
13for task_id, job_state in executions.items():
14 print(f"Task {task_id}: {job_state.status}")
4. Access JobState from Routine Handlers:
When accessing JobState from routine handlers, use the context variable or get_execution_context():
1class MyRoutine(Routine):
2 def process(self, **kwargs):
3 # Method 1: Use get_execution_context() (recommended)
4 ctx = self.get_execution_context()
5 if ctx:
6 ctx.job_state.record_execution(ctx.routine_id, "process", kwargs)
7 ctx.job_state.update_routine_state(ctx.routine_id, {"status": "processing"})
8
9 # Method 2: Access context variable directly (thread-safe)
10 from routilux.routine import _current_job_state
11 job_state = _current_job_state.get(None)
12 if job_state:
13 # Safe to update JobState (thread-safe even in concurrent execution)
14 job_state.record_execution(self._id, "process", kwargs)
15 job_state.update_routine_state(self._id, {"status": "processing"})
5. Understand Thread Pool Sharing:
Remember that all executions share the same thread pool. If you need to limit
concurrency across all executions, set max_workers appropriately:
1# Limit total concurrent tasks across all executions
2flow = Flow(flow_id="limited", execution_strategy="concurrent", max_workers=3)
3
4# Even if you run 10 executions concurrently, only 3 worker threads
5# will be available, limiting total system load
Key Takeaways:
✅
JobStateupdates are thread-safe in CPython✅
JobStatereads are thread-safe - you can read from main thread anytime✅ Multiple executions are perfectly isolated
✅ Worker threads are shared, but isolation is maintained via Task-level JobState passing
✅ Context variables provide thread-safe access without modifying function signatures
✅ Context variables ensure correct JobState access even when the same routine executes concurrently in different threads
✅ You can safely read
JobStateduring execution to monitor progress✅ Wait for completion if you need the final, complete state (not intermediate states)
Best Practices¶
1. Store JobState for Recovery:
1# Execute and store JobState
2job_state = flow.execute(source_id)
3
4# Save for recovery
5job_state_data = job_state.serialize()
6save_to_database(job_state_data)
2. Use JobState for Status Monitoring:
1job_state = flow.execute(source_id)
2
3# Monitor status
4while job_state.status == "running":
5 time.sleep(0.1)
6 # Status is updated automatically
7
8if job_state.status == "completed":
9 print("Execution successful!")
10elif job_state.status == "failed":
11 print("Execution failed!")
3. Independent Execution Management:
1# Execute multiple times
2executions = []
3for i in range(10):
4 job_state = flow.execute(source_id, entry_params={"batch": i})
5 executions.append(job_state)
6
7# Manage each independently
8for job_state in executions:
9 if job_state.status == "paused":
10 flow.resume(job_state)
4. Cross-Host Execution:
1# Host A: Execute and serialize
2job_state = flow.execute(source_id)
3flow.pause(job_state, reason="Transfer to Host B")
4
5flow_data = flow.serialize()
6job_state_data = job_state.serialize()
7
8# Send to Host B
9send_to_host_b(flow_data, job_state_data)
10
11# Host B: Deserialize and resume
12new_flow = Flow()
13new_flow.deserialize(flow_data)
14
15new_job_state = JobState()
16new_job_state.deserialize(job_state_data)
17
18resumed = new_flow.resume(new_job_state)
Common Pitfalls¶
1. Assuming Flow Manages JobState:
# Wrong: Flow doesn't have job_state
flow.job_state.status # AttributeError!
# Correct: JobState is returned from execute()
job_state = flow.execute(source_id)
job_state.status # OK
2. Sharing JobState Between Executions:
# Wrong: Reusing JobState from previous execution
job_state1 = flow.execute(source_id)
job_state2 = flow.execute(source_id) # Different execution!
# Don't use job_state1 for job_state2's operations
# Correct: Use the JobState from the specific execution
job_state = flow.execute(source_id)
flow.pause(job_state, reason="pause this execution")
3. Not Serializing JobState Separately:
# Wrong: Flow serialization doesn't include JobState
flow_data = flow.serialize()
# job_state is NOT in flow_data!
# Correct: Serialize separately
flow_data = flow.serialize()
job_state_data = job_state.serialize()
# Save both for recovery
4. Assuming JobState Persists Across Executions:
# Wrong: Each execute() creates new JobState
job_state1 = flow.execute(source_id)
job_state2 = flow.execute(source_id)
# job_state1 and job_state2 are different!
# Correct: Store JobState if you need it
job_state = flow.execute(source_id)
save_job_state(job_state)