Basic Example¶
A simple example demonstrating basic routine and flow usage.
Example Code¶
1#!/usr/bin/env python
2"""
3Basic Example: Simple data processing flow
4
5This example demonstrates:
6- Creating routines with slots and events
7- Connecting routines in a flow
8- Executing a flow
9- Checking execution status
10"""
11
12from routilux import Flow, Routine
13
14
15class DataSource(Routine):
16 """A routine that generates data"""
17
18 def __init__(self):
19 super().__init__()
20 # Define trigger slot for entry routine
21 self.trigger_slot = self.define_slot("trigger", handler=self._handle_trigger)
22 self.output_event = self.define_event("output", ["data"])
23
24 def _handle_trigger(self, data=None, **kwargs):
25 """Handle trigger and emit data through the output event"""
26 # Extract data from kwargs if not provided directly
27 output_data = data or kwargs.get("data", "default_data")
28 # Execution state should be stored in JobState, not routine._stats
29 self.emit("output", data=output_data)
30
31
32class DataProcessor(Routine):
33 """A routine that processes data"""
34
35 def __init__(self):
36 super().__init__()
37 self.input_slot = self.define_slot("input", handler=self.process)
38 self.output_event = self.define_event("output", ["result"])
39 self.processed_data = None
40
41 def process(self, data):
42 """Process incoming data"""
43 # Handle both dict and direct value
44 if isinstance(data, dict):
45 data_value = data.get("data", data)
46 else:
47 data_value = data
48
49 # Process the data
50 self.processed_data = f"Processed: {data_value}"
51 # Execution state should be stored in JobState, not routine._stats
52
53 # Emit the result
54 self.emit("output", result=self.processed_data)
55
56
57class DataSink(Routine):
58 """A routine that receives final data"""
59
60 def __init__(self):
61 super().__init__()
62 self.input_slot = self.define_slot("input", handler=self.receive)
63 self.final_result = None
64
65 def receive(self, result):
66 """Receive and store the final result"""
67 # Handle both dict and direct value
68 if isinstance(result, dict):
69 result_value = result.get("result", result)
70 else:
71 result_value = result
72
73 self.final_result = result_value
74 # Execution state should be stored in JobState, not routine._stats
75 print(f"Final result: {self.final_result}")
76
77
78def main():
79 """Main function"""
80 # Create a flow
81 flow = Flow(flow_id="basic_example")
82
83 # Create routine instances
84 source = DataSource()
85 processor = DataProcessor()
86 sink = DataSink()
87
88 # Add routines to the flow
89 source_id = flow.add_routine(source, "source")
90 processor_id = flow.add_routine(processor, "processor")
91 sink_id = flow.add_routine(sink, "sink")
92
93 # Connect routines: source -> processor -> sink
94 flow.connect(source_id, "output", processor_id, "input")
95 flow.connect(processor_id, "output", sink_id, "input")
96
97 # Execute the flow
98 print("Executing flow...")
99 job_state = flow.execute(source_id, entry_params={"data": "Hello, World!"})
100
101 # Wait for execution to complete
102 from routilux.job_state import JobState
103
104 JobState.wait_for_completion(flow, job_state, timeout=2.0)
105
106 # Check results
107 print(f"\nExecution Status: {job_state.status}")
108 print(f"Final Result: {sink.final_result}")
109 # Execution state is tracked in JobState, not routine._stats
110 print(f"Execution History: {len(job_state.execution_history)} records")
111
112 assert job_state.status == "completed"
113 assert sink.final_result == "Processed: Hello, World!"
114
115
116if __name__ == "__main__":
117 main()
This example demonstrates:
Creating routines with slots and events
Automatic flow detection in
emit()calls (no need to pass flow parameter)Connecting routines in a flow
Executing a flow
Checking execution status
Key Features:
Automatic Flow Detection: The
emit()method automatically detects the flow from routine contextNon-blocking emit(): Event emission returns immediately after enqueuing tasks
Event Queue Architecture: All execution uses a unified event queue mechanism