Data Processing Example¶
A multi-stage data processing pipeline example.
Example Code¶
1#!/usr/bin/env python
2"""
3Data Processing Example: Multi-stage data processing pipeline
4
5This example demonstrates:
6- Complex data flow with multiple stages
7- Parameter mapping
8- Statistics tracking
9"""
10
11from routilux import Flow, Routine
12
13
14class InputReader(Routine):
15 """Reads input data"""
16
17 def __init__(self):
18 super().__init__()
19 # Define trigger slot for entry routine
20 self.trigger_slot = self.define_slot("trigger", handler=self._handle_trigger)
21 self.output_event = self.define_event("output", ["raw_data"])
22
23 def _handle_trigger(self, filename=None, **kwargs):
24 """Handle trigger and simulate reading from a file"""
25 raw_data = filename or kwargs.get("filename", "sample_data.txt")
26 # Execution state should be stored in JobState, not routine._stats
27 self.emit("output", raw_data=raw_data)
28
29
30class DataValidator(Routine):
31 """Validates input data"""
32
33 def __init__(self):
34 super().__init__()
35 self.input_slot = self.define_slot("input", handler=self.validate)
36 self.output_event = self.define_event("output", ["validated_data"])
37 self.error_event = self.define_event("error", ["error_message"])
38
39 def validate(self, raw_data):
40 """Validate the data"""
41 if isinstance(raw_data, dict):
42 data = raw_data.get("raw_data", raw_data)
43 else:
44 data = raw_data
45
46 if data and len(str(data)) > 0:
47 # Execution state should be stored in JobState, not routine._stats
48 self.emit("output", validated_data=data)
49 else:
50 # Execution state should be stored in JobState, not routine._stats
51 self.emit("error", error_message="Invalid data")
52
53
54class DataTransformer(Routine):
55 """Transforms validated data"""
56
57 def __init__(self):
58 super().__init__()
59 self.input_slot = self.define_slot("input", handler=self.transform)
60 self.output_event = self.define_event("output", ["transformed_data"])
61
62 def transform(self, validated_data):
63 """Transform the data"""
64 if isinstance(validated_data, dict):
65 data = validated_data.get("validated_data", validated_data)
66 else:
67 data = validated_data
68
69 transformed = f"TRANSFORMED_{data.upper()}"
70 # Execution state should be stored in JobState, not routine._stats
71 self.emit("output", transformed_data=transformed)
72
73
74class DataWriter(Routine):
75 """Writes processed data"""
76
77 def __init__(self):
78 super().__init__()
79 self.input_slot = self.define_slot("input", handler=self.write)
80 self.written_data = []
81
82 def write(self, transformed_data):
83 """Write the data"""
84 if isinstance(transformed_data, dict):
85 data = transformed_data.get("transformed_data", transformed_data)
86 else:
87 data = transformed_data
88
89 self.written_data.append(data)
90 # Execution state should be stored in JobState, not routine._stats
91 print(f"Written: {data}")
92
93
94def main():
95 """Main function"""
96 # Create a flow
97 flow = Flow(flow_id="data_processing")
98
99 # Create routine instances
100 reader = InputReader()
101 validator = DataValidator()
102 transformer = DataTransformer()
103 writer = DataWriter()
104
105 # Add routines to the flow
106 reader_id = flow.add_routine(reader, "reader")
107 validator_id = flow.add_routine(validator, "validator")
108 transformer_id = flow.add_routine(transformer, "transformer")
109 writer_id = flow.add_routine(writer, "writer")
110
111 # Connect the pipeline
112 flow.connect(reader_id, "output", validator_id, "input")
113 flow.connect(validator_id, "output", transformer_id, "input")
114 flow.connect(transformer_id, "output", writer_id, "input")
115
116 # Execute the flow
117 print("Executing data processing pipeline...")
118 job_state = flow.execute(reader_id, entry_params={"filename": "data.txt"})
119
120 # Check results
121 print(f"\nExecution Status: {job_state.status}")
122 print(f"Written Data: {writer.written_data}")
123 # Execution state is tracked in JobState, not routine._stats
124 print(f"Execution History: {len(job_state.execution_history)} records")
125
126 assert job_state.status == "completed"
127 assert len(writer.written_data) > 0
128
129
130if __name__ == "__main__":
131 main()
This example demonstrates:
Multi-stage data processing pipeline
Automatic flow detection in
emit()callsError handling in validation
Statistics tracking across stages
Event queue-based execution
Key Features:
Event Queue Pattern: Tasks are enqueued and processed asynchronously
Automatic Flow Detection: No need to pass flow parameter in
emit()callsNon-blocking Execution: Each stage processes independently via the event queue