Data Processing Example

A multi-stage data processing pipeline example.

Example Code

  1#!/usr/bin/env python
  2"""
  3Data Processing Example: Multi-stage data processing pipeline
  4
  5This example demonstrates:
  6- Complex data flow with multiple stages
  7- Parameter mapping
  8- Statistics tracking
  9"""
 10from routilux import Flow, Routine
 11
 12
 13class InputReader(Routine):
 14    """Reads input data"""
 15
 16    def __init__(self):
 17        super().__init__()
 18        # Define trigger slot for entry routine
 19        self.trigger_slot = self.define_slot("trigger", handler=self._handle_trigger)
 20        self.output_event = self.define_event("output", ["raw_data"])
 21
 22    def _handle_trigger(self, filename=None, **kwargs):
 23        """Handle trigger and simulate reading from a file"""
 24        raw_data = filename or kwargs.get("filename", "sample_data.txt")
 25        # Execution state should be stored in JobState, not routine._stats
 26        self.emit("output", raw_data=raw_data)
 27
 28
 29class DataValidator(Routine):
 30    """Validates input data"""
 31
 32    def __init__(self):
 33        super().__init__()
 34        self.input_slot = self.define_slot("input", handler=self.validate)
 35        self.output_event = self.define_event("output", ["validated_data"])
 36        self.error_event = self.define_event("error", ["error_message"])
 37
 38    def validate(self, raw_data):
 39        """Validate the data"""
 40        if isinstance(raw_data, dict):
 41            data = raw_data.get("raw_data", raw_data)
 42        else:
 43            data = raw_data
 44
 45        if data and len(str(data)) > 0:
 46            # Execution state should be stored in JobState, not routine._stats
 47            self.emit("output", validated_data=data)
 48        else:
 49            # Execution state should be stored in JobState, not routine._stats
 50            self.emit("error", error_message="Invalid data")
 51
 52
 53class DataTransformer(Routine):
 54    """Transforms validated data"""
 55
 56    def __init__(self):
 57        super().__init__()
 58        self.input_slot = self.define_slot("input", handler=self.transform)
 59        self.output_event = self.define_event("output", ["transformed_data"])
 60
 61    def transform(self, validated_data):
 62        """Transform the data"""
 63        if isinstance(validated_data, dict):
 64            data = validated_data.get("validated_data", validated_data)
 65        else:
 66            data = validated_data
 67
 68        transformed = f"TRANSFORMED_{data.upper()}"
 69        # Execution state should be stored in JobState, not routine._stats
 70        self.emit("output", transformed_data=transformed)
 71
 72
 73class DataWriter(Routine):
 74    """Writes processed data"""
 75
 76    def __init__(self):
 77        super().__init__()
 78        self.input_slot = self.define_slot("input", handler=self.write)
 79        self.written_data = []
 80
 81    def write(self, transformed_data):
 82        """Write the data"""
 83        if isinstance(transformed_data, dict):
 84            data = transformed_data.get("transformed_data", transformed_data)
 85        else:
 86            data = transformed_data
 87
 88        self.written_data.append(data)
 89        # Execution state should be stored in JobState, not routine._stats
 90        print(f"Written: {data}")
 91
 92
 93def main():
 94    """Main function"""
 95    # Create a flow
 96    flow = Flow(flow_id="data_processing")
 97
 98    # Create routine instances
 99    reader = InputReader()
100    validator = DataValidator()
101    transformer = DataTransformer()
102    writer = DataWriter()
103
104    # Add routines to the flow
105    reader_id = flow.add_routine(reader, "reader")
106    validator_id = flow.add_routine(validator, "validator")
107    transformer_id = flow.add_routine(transformer, "transformer")
108    writer_id = flow.add_routine(writer, "writer")
109
110    # Connect the pipeline
111    flow.connect(reader_id, "output", validator_id, "input")
112    flow.connect(validator_id, "output", transformer_id, "input")
113    flow.connect(transformer_id, "output", writer_id, "input")
114
115    # Execute the flow
116    print("Executing data processing pipeline...")
117    job_state = flow.execute(reader_id, entry_params={"filename": "data.txt"})
118
119    # Check results
120    print(f"\nExecution Status: {job_state.status}")
121    print(f"Written Data: {writer.written_data}")
122    # Execution state is tracked in JobState, not routine._stats
123    print(f"Execution History: {len(job_state.execution_history)} records")
124
125    assert job_state.status == "completed"
126    assert len(writer.written_data) > 0
127
128
129if __name__ == "__main__":
130    main()

This example demonstrates:

  • Multi-stage data processing pipeline

  • Automatic flow detection in emit() calls

  • Error handling in validation

  • Statistics tracking across stages

  • Event queue-based execution

Key Features:

  • Event Queue Pattern: Tasks are enqueued and processed asynchronously

  • Automatic Flow Detection: No need to pass flow parameter in emit() calls

  • Non-blocking Execution: Each stage processes independently via the event queue