Data Processing Example¶
A multi-stage data processing pipeline example.
Example Code¶
1#!/usr/bin/env python
2"""
3Data Processing Example: Multi-stage data processing pipeline
4
5This example demonstrates:
6- Complex data flow with multiple stages
7- Parameter mapping
8- Statistics tracking
9"""
10from routilux import Flow, Routine
11
12
13class InputReader(Routine):
14 """Reads input data"""
15
16 def __init__(self):
17 super().__init__()
18 # Define trigger slot for entry routine
19 self.trigger_slot = self.define_slot("trigger", handler=self._handle_trigger)
20 self.output_event = self.define_event("output", ["raw_data"])
21
22 def _handle_trigger(self, filename=None, **kwargs):
23 """Handle trigger and simulate reading from a file"""
24 raw_data = filename or kwargs.get("filename", "sample_data.txt")
25 # Execution state should be stored in JobState, not routine._stats
26 self.emit("output", raw_data=raw_data)
27
28
29class DataValidator(Routine):
30 """Validates input data"""
31
32 def __init__(self):
33 super().__init__()
34 self.input_slot = self.define_slot("input", handler=self.validate)
35 self.output_event = self.define_event("output", ["validated_data"])
36 self.error_event = self.define_event("error", ["error_message"])
37
38 def validate(self, raw_data):
39 """Validate the data"""
40 if isinstance(raw_data, dict):
41 data = raw_data.get("raw_data", raw_data)
42 else:
43 data = raw_data
44
45 if data and len(str(data)) > 0:
46 # Execution state should be stored in JobState, not routine._stats
47 self.emit("output", validated_data=data)
48 else:
49 # Execution state should be stored in JobState, not routine._stats
50 self.emit("error", error_message="Invalid data")
51
52
53class DataTransformer(Routine):
54 """Transforms validated data"""
55
56 def __init__(self):
57 super().__init__()
58 self.input_slot = self.define_slot("input", handler=self.transform)
59 self.output_event = self.define_event("output", ["transformed_data"])
60
61 def transform(self, validated_data):
62 """Transform the data"""
63 if isinstance(validated_data, dict):
64 data = validated_data.get("validated_data", validated_data)
65 else:
66 data = validated_data
67
68 transformed = f"TRANSFORMED_{data.upper()}"
69 # Execution state should be stored in JobState, not routine._stats
70 self.emit("output", transformed_data=transformed)
71
72
73class DataWriter(Routine):
74 """Writes processed data"""
75
76 def __init__(self):
77 super().__init__()
78 self.input_slot = self.define_slot("input", handler=self.write)
79 self.written_data = []
80
81 def write(self, transformed_data):
82 """Write the data"""
83 if isinstance(transformed_data, dict):
84 data = transformed_data.get("transformed_data", transformed_data)
85 else:
86 data = transformed_data
87
88 self.written_data.append(data)
89 # Execution state should be stored in JobState, not routine._stats
90 print(f"Written: {data}")
91
92
93def main():
94 """Main function"""
95 # Create a flow
96 flow = Flow(flow_id="data_processing")
97
98 # Create routine instances
99 reader = InputReader()
100 validator = DataValidator()
101 transformer = DataTransformer()
102 writer = DataWriter()
103
104 # Add routines to the flow
105 reader_id = flow.add_routine(reader, "reader")
106 validator_id = flow.add_routine(validator, "validator")
107 transformer_id = flow.add_routine(transformer, "transformer")
108 writer_id = flow.add_routine(writer, "writer")
109
110 # Connect the pipeline
111 flow.connect(reader_id, "output", validator_id, "input")
112 flow.connect(validator_id, "output", transformer_id, "input")
113 flow.connect(transformer_id, "output", writer_id, "input")
114
115 # Execute the flow
116 print("Executing data processing pipeline...")
117 job_state = flow.execute(reader_id, entry_params={"filename": "data.txt"})
118
119 # Check results
120 print(f"\nExecution Status: {job_state.status}")
121 print(f"Written Data: {writer.written_data}")
122 # Execution state is tracked in JobState, not routine._stats
123 print(f"Execution History: {len(job_state.execution_history)} records")
124
125 assert job_state.status == "completed"
126 assert len(writer.written_data) > 0
127
128
129if __name__ == "__main__":
130 main()
This example demonstrates:
Multi-stage data processing pipeline
Automatic flow detection in
emit()callsError handling in validation
Statistics tracking across stages
Event queue-based execution
Key Features:
Event Queue Pattern: Tasks are enqueued and processed asynchronously
Automatic Flow Detection: No need to pass flow parameter in
emit()callsNon-blocking Execution: Each stage processes independently via the event queue