Basic Example

A simple example demonstrating basic routine and flow usage.

Example Code

  1#!/usr/bin/env python
  2"""
  3Basic Example: Simple data processing flow
  4
  5This example demonstrates:
  6- Creating routines with slots and events
  7- Connecting routines in a flow
  8- Executing a flow
  9- Checking execution status
 10"""
 11
 12from routilux import Flow, Routine
 13
 14
 15class DataSource(Routine):
 16    """A routine that generates data"""
 17
 18    def __init__(self):
 19        super().__init__()
 20        # Define trigger slot for entry routine
 21        self.trigger_slot = self.define_slot("trigger", handler=self._handle_trigger)
 22        self.output_event = self.define_event("output", ["data"])
 23
 24    def _handle_trigger(self, data=None, **kwargs):
 25        """Handle trigger and emit data through the output event"""
 26        # Extract data from kwargs if not provided directly
 27        output_data = data or kwargs.get("data", "default_data")
 28        # Execution state should be stored in JobState, not routine._stats
 29        self.emit("output", data=output_data)
 30
 31
 32class DataProcessor(Routine):
 33    """A routine that processes data"""
 34
 35    def __init__(self):
 36        super().__init__()
 37        self.input_slot = self.define_slot("input", handler=self.process)
 38        self.output_event = self.define_event("output", ["result"])
 39        self.processed_data = None
 40
 41    def process(self, data):
 42        """Process incoming data"""
 43        # Handle both dict and direct value
 44        if isinstance(data, dict):
 45            data_value = data.get("data", data)
 46        else:
 47            data_value = data
 48
 49        # Process the data
 50        self.processed_data = f"Processed: {data_value}"
 51        # Execution state should be stored in JobState, not routine._stats
 52
 53        # Emit the result
 54        self.emit("output", result=self.processed_data)
 55
 56
 57class DataSink(Routine):
 58    """A routine that receives final data"""
 59
 60    def __init__(self):
 61        super().__init__()
 62        self.input_slot = self.define_slot("input", handler=self.receive)
 63        self.final_result = None
 64
 65    def receive(self, result):
 66        """Receive and store the final result"""
 67        # Handle both dict and direct value
 68        if isinstance(result, dict):
 69            result_value = result.get("result", result)
 70        else:
 71            result_value = result
 72
 73        self.final_result = result_value
 74        # Execution state should be stored in JobState, not routine._stats
 75        print(f"Final result: {self.final_result}")
 76
 77
 78def main():
 79    """Main function"""
 80    # Create a flow
 81    flow = Flow(flow_id="basic_example")
 82
 83    # Create routine instances
 84    source = DataSource()
 85    processor = DataProcessor()
 86    sink = DataSink()
 87
 88    # Add routines to the flow
 89    source_id = flow.add_routine(source, "source")
 90    processor_id = flow.add_routine(processor, "processor")
 91    sink_id = flow.add_routine(sink, "sink")
 92
 93    # Connect routines: source -> processor -> sink
 94    flow.connect(source_id, "output", processor_id, "input")
 95    flow.connect(processor_id, "output", sink_id, "input")
 96
 97    # Execute the flow
 98    print("Executing flow...")
 99    job_state = flow.execute(source_id, entry_params={"data": "Hello, World!"})
100
101    # Wait for execution to complete
102    from routilux.job_state import JobState
103
104    JobState.wait_for_completion(flow, job_state, timeout=2.0)
105
106    # Check results
107    print(f"\nExecution Status: {job_state.status}")
108    print(f"Final Result: {sink.final_result}")
109    # Execution state is tracked in JobState, not routine._stats
110    print(f"Execution History: {len(job_state.execution_history)} records")
111
112    assert job_state.status == "completed"
113    assert sink.final_result == "Processed: Hello, World!"
114
115
116if __name__ == "__main__":
117    main()

This example demonstrates:

  • Creating routines with slots and events

  • Automatic flow detection in emit() calls (no need to pass flow parameter)

  • Connecting routines in a flow

  • Executing a flow

  • Checking execution status

Key Features:

  • Automatic Flow Detection: The emit() method automatically detects the flow from routine context

  • Non-blocking emit(): Event emission returns immediately after enqueuing tasks

  • Event Queue Architecture: All execution uses a unified event queue mechanism