Data Processing Example

A multi-stage data processing pipeline example.

Example Code

  1#!/usr/bin/env python
  2"""
  3Data Processing Example: Multi-stage data processing pipeline
  4
  5This example demonstrates:
  6- Complex data flow with multiple stages
  7- Parameter mapping
  8- Statistics tracking
  9"""
 10
 11from routilux import Flow, Routine
 12
 13
 14class InputReader(Routine):
 15    """Reads input data"""
 16
 17    def __init__(self):
 18        super().__init__()
 19        # Define trigger slot for entry routine
 20        self.trigger_slot = self.define_slot("trigger", handler=self._handle_trigger)
 21        self.output_event = self.define_event("output", ["raw_data"])
 22
 23    def _handle_trigger(self, filename=None, **kwargs):
 24        """Handle trigger and simulate reading from a file"""
 25        raw_data = filename or kwargs.get("filename", "sample_data.txt")
 26        # Execution state should be stored in JobState, not routine._stats
 27        self.emit("output", raw_data=raw_data)
 28
 29
 30class DataValidator(Routine):
 31    """Validates input data"""
 32
 33    def __init__(self):
 34        super().__init__()
 35        self.input_slot = self.define_slot("input", handler=self.validate)
 36        self.output_event = self.define_event("output", ["validated_data"])
 37        self.error_event = self.define_event("error", ["error_message"])
 38
 39    def validate(self, raw_data):
 40        """Validate the data"""
 41        if isinstance(raw_data, dict):
 42            data = raw_data.get("raw_data", raw_data)
 43        else:
 44            data = raw_data
 45
 46        if data and len(str(data)) > 0:
 47            # Execution state should be stored in JobState, not routine._stats
 48            self.emit("output", validated_data=data)
 49        else:
 50            # Execution state should be stored in JobState, not routine._stats
 51            self.emit("error", error_message="Invalid data")
 52
 53
 54class DataTransformer(Routine):
 55    """Transforms validated data"""
 56
 57    def __init__(self):
 58        super().__init__()
 59        self.input_slot = self.define_slot("input", handler=self.transform)
 60        self.output_event = self.define_event("output", ["transformed_data"])
 61
 62    def transform(self, validated_data):
 63        """Transform the data"""
 64        if isinstance(validated_data, dict):
 65            data = validated_data.get("validated_data", validated_data)
 66        else:
 67            data = validated_data
 68
 69        transformed = f"TRANSFORMED_{data.upper()}"
 70        # Execution state should be stored in JobState, not routine._stats
 71        self.emit("output", transformed_data=transformed)
 72
 73
 74class DataWriter(Routine):
 75    """Writes processed data"""
 76
 77    def __init__(self):
 78        super().__init__()
 79        self.input_slot = self.define_slot("input", handler=self.write)
 80        self.written_data = []
 81
 82    def write(self, transformed_data):
 83        """Write the data"""
 84        if isinstance(transformed_data, dict):
 85            data = transformed_data.get("transformed_data", transformed_data)
 86        else:
 87            data = transformed_data
 88
 89        self.written_data.append(data)
 90        # Execution state should be stored in JobState, not routine._stats
 91        print(f"Written: {data}")
 92
 93
 94def main():
 95    """Main function"""
 96    # Create a flow
 97    flow = Flow(flow_id="data_processing")
 98
 99    # Create routine instances
100    reader = InputReader()
101    validator = DataValidator()
102    transformer = DataTransformer()
103    writer = DataWriter()
104
105    # Add routines to the flow
106    reader_id = flow.add_routine(reader, "reader")
107    validator_id = flow.add_routine(validator, "validator")
108    transformer_id = flow.add_routine(transformer, "transformer")
109    writer_id = flow.add_routine(writer, "writer")
110
111    # Connect the pipeline
112    flow.connect(reader_id, "output", validator_id, "input")
113    flow.connect(validator_id, "output", transformer_id, "input")
114    flow.connect(transformer_id, "output", writer_id, "input")
115
116    # Execute the flow
117    print("Executing data processing pipeline...")
118    job_state = flow.execute(reader_id, entry_params={"filename": "data.txt"})
119
120    # Check results
121    print(f"\nExecution Status: {job_state.status}")
122    print(f"Written Data: {writer.written_data}")
123    # Execution state is tracked in JobState, not routine._stats
124    print(f"Execution History: {len(job_state.execution_history)} records")
125
126    assert job_state.status == "completed"
127    assert len(writer.written_data) > 0
128
129
130if __name__ == "__main__":
131    main()

This example demonstrates:

  • Multi-stage data processing pipeline

  • Automatic flow detection in emit() calls

  • Error handling in validation

  • Statistics tracking across stages

  • Event queue-based execution

Key Features:

  • Event Queue Pattern: Tasks are enqueued and processed asynchronously

  • Automatic Flow Detection: No need to pass flow parameter in emit() calls

  • Non-blocking Execution: Each stage processes independently via the event queue