Data Flow and Parameters¶
In this tutorial, you’ll learn how data flows through Routilux workflows, including parameter extraction, parameter mapping in connections, and best practices for handling data.
Learning Objectives¶
By the end of this tutorial, you’ll be able to:
Extract data from slot handler parameters correctly
Use parameter mapping in connections to transform parameter names
Understand how data is passed through events and slots
Handle different data types and structures
Debug data flow issues
Step 1: Understanding Parameter Extraction¶
Slot handlers receive data through keyword arguments. You need to extract the data correctly:
1from routilux import Flow, Routine
2
3class DataSource(Routine):
4 def __init__(self):
5 super().__init__()
6 self.trigger_slot = self.define_slot("trigger", handler=self.send)
7 self.output_event = self.define_event("output", ["name", "age", "city"])
8
9 def send(self, **kwargs):
10 # Emit multiple parameters
11 self.emit("output", name="Alice", age=30, city="New York")
12
13class DataReceiver(Routine):
14 def __init__(self):
15 super().__init__()
16 self.input_slot = self.define_slot("input", handler=self.receive)
17
18 def receive(self, name=None, age=None, city=None, **kwargs):
19 # Extract parameters with defaults
20 name = name or kwargs.get("name", "Unknown")
21 age = age or kwargs.get("age", 0)
22 city = city or kwargs.get("city", "Unknown")
23
24 print(f"Received: {name}, {age} years old, from {city}")
25
26flow = Flow(flow_id="param_flow")
27
28source = DataSource()
29receiver = DataReceiver()
30
31source_id = flow.add_routine(source, "source")
32receiver_id = flow.add_routine(receiver, "receiver")
33
34flow.connect(source_id, "output", receiver_id, "input")
35
36job_state = flow.execute(source_id)
37print(f"Status: {job_state.status}")
Expected Output:
Received: Alice, 30 years old, from New York
Status: completed
Key Points:
Parameters are passed as keyword arguments to slot handlers
Use the pattern
param = param or kwargs.get("param", default)for safe extractionAll parameters specified in
define_event()are available in the handler
Step 2: Parameter Mapping in Connections¶
You can map parameter names when connecting events to slots. This is useful when you want to transform data or match different naming conventions:
1from routilux import Flow, Routine
2
3class Source(Routine):
4 def __init__(self):
5 super().__init__()
6 self.trigger_slot = self.define_slot("trigger", handler=self.send)
7 self.output_event = self.define_event("output", ["user_name", "user_age"])
8
9 def send(self, **kwargs):
10 self.emit("output", user_name="Bob", user_age=25)
11
12class Target(Routine):
13 def __init__(self):
14 super().__init__()
15 self.input_slot = self.define_slot("input", handler=self.receive)
16
17 def receive(self, name=None, age=None, **kwargs):
18 # Expects "name" and "age", not "user_name" and "user_age"
19 name = name or kwargs.get("name", "Unknown")
20 age = age or kwargs.get("age", 0)
21 print(f"Target received: name={name}, age={age}")
22
23flow = Flow(flow_id="mapping_flow")
24
25source = Source()
26target = Target()
27
28source_id = flow.add_routine(source, "source")
29target_id = flow.add_routine(target, "target")
30
31# Map parameters: user_name -> name, user_age -> age
32flow.connect(
33 source_id, "output", target_id, "input",
34 param_mapping={"user_name": "name", "user_age": "age"}
35)
36
37job_state = flow.execute(source_id)
38print(f"Status: {job_state.status}")
Expected Output:
Target received: name=Bob, age=25
Status: completed
Key Points:
Use
param_mappinginconnect()to transform parameter namesMapping format:
{"source_param": "target_param"}Unmapped parameters are passed through unchanged
This is useful for adapting different routine interfaces
Step 3: Handling Complex Data Structures¶
You can pass complex data structures (dicts, lists) through events:
1from routilux import Flow, Routine
2
3class DataSource(Routine):
4 def __init__(self):
5 super().__init__()
6 self.trigger_slot = self.define_slot("trigger", handler=self.send)
7 self.output_event = self.define_event("output", ["data"])
8
9 def send(self, **kwargs):
10 # Send a complex data structure
11 complex_data = {
12 "users": [
13 {"name": "Alice", "scores": [85, 90, 88]},
14 {"name": "Bob", "scores": [92, 87, 91]}
15 ],
16 "metadata": {"version": "1.0", "timestamp": "2024-01-01"}
17 }
18 self.emit("output", data=complex_data)
19
20class DataProcessor(Routine):
21 def __init__(self):
22 super().__init__()
23 self.input_slot = self.define_slot("input", handler=self.process)
24 self.output_event = self.define_event("output", ["result"])
25
26 def process(self, data=None, **kwargs):
27 data_value = data or kwargs.get("data", {})
28
29 # Process the complex data
30 if isinstance(data_value, dict) and "users" in data_value:
31 total_scores = []
32 for user in data_value["users"]:
33 scores = user.get("scores", [])
34 total_scores.extend(scores)
35
36 result = {
37 "total_users": len(data_value["users"]),
38 "average_score": sum(total_scores) / len(total_scores) if total_scores else 0,
39 "metadata": data_value.get("metadata", {})
40 }
41
42 print(f"Processed {result['total_users']} users")
43 print(f"Average score: {result['average_score']:.2f}")
44
45 self.emit("output", result=result)
46
47flow = Flow(flow_id="complex_flow")
48
49source = DataSource()
50processor = DataProcessor()
51
52source_id = flow.add_routine(source, "source")
53processor_id = flow.add_routine(processor, "processor")
54
55flow.connect(source_id, "output", processor_id, "input")
56
57job_state = flow.execute(source_id)
58print(f"Status: {job_state.status}")
Expected Output:
Processed 2 users
Average score: 88.83
Status: completed
Key Points:
You can pass any Python object through events (dicts, lists, custom objects)
Always check data types and structure in handlers
Complex data structures are passed by reference (not copied)
Be careful with mutable objects - modifications affect all references
Step 4: Using _extract_input_data Helper¶
Routilux provides a helper method _extract_input_data() for easier data
extraction:
1from routilux import Flow, Routine
2
3class DataSource(Routine):
4 def __init__(self):
5 super().__init__()
6 self.trigger_slot = self.define_slot("trigger", handler=self.send)
7 self.output_event = self.define_event("output", ["data", "metadata"])
8
9 def send(self, **kwargs):
10 self.emit("output", data="test data", metadata={"source": "tutorial"})
11
12class DataReceiver(Routine):
13 def __init__(self):
14 super().__init__()
15 self.input_slot = self.define_slot("input", handler=self.receive)
16
17 def receive(self, data=None, metadata=None, **kwargs):
18 # Use helper method for cleaner extraction
19 data_value = self._extract_input_data(data, **kwargs)
20 metadata_value = self._extract_input_data(metadata, **kwargs)
21
22 print(f"Data: {data_value}")
23 print(f"Metadata: {metadata_value}")
24
25flow = Flow(flow_id="helper_flow")
26
27source = DataSource()
28receiver = DataReceiver()
29
30source_id = flow.add_routine(source, "source")
31receiver_id = flow.add_routine(receiver, "receiver")
32
33flow.connect(source_id, "output", receiver_id, "input")
34
35job_state = flow.execute(source_id)
36print(f"Status: {job_state.status}")
Expected Output:
Data: test data
Metadata: {'source': 'tutorial'}
Status: completed
Key Points:
_extract_input_data()is a convenience method for data extractionIt handles both direct parameters and kwargs
Use it for cleaner, more readable code
Step 5: Debugging Data Flow¶
When debugging data flow issues, it’s helpful to inspect what data is being passed:
1from routilux import Flow, Routine
2
3class DebugSource(Routine):
4 def __init__(self):
5 super().__init__()
6 self.trigger_slot = self.define_slot("trigger", handler=self.send)
7 self.output_event = self.define_event("output", ["value", "extra"])
8
9 def send(self, **kwargs):
10 print(f"[DEBUG] Source emitting: value='test', extra='info'")
11 self.emit("output", value="test", extra="info")
12
13class DebugReceiver(Routine):
14 def __init__(self):
15 super().__init__()
16 self.input_slot = self.define_slot("input", handler=self.receive)
17
18 def receive(self, **kwargs):
19 # Print all received kwargs for debugging
20 print(f"[DEBUG] Receiver received kwargs: {kwargs}")
21
22 # Extract specific values
23 value = kwargs.get("value", "NOT_FOUND")
24 extra = kwargs.get("extra", "NOT_FOUND")
25
26 print(f"[DEBUG] Extracted: value={value}, extra={extra}")
27
28flow = Flow(flow_id="debug_flow")
29
30source = DebugSource()
31receiver = DebugReceiver()
32
33source_id = flow.add_routine(source, "source")
34receiver_id = flow.add_routine(receiver, "receiver")
35
36flow.connect(source_id, "output", receiver_id, "input")
37
38job_state = flow.execute(source_id)
39print(f"Status: {job_state.status}")
Expected Output:
[DEBUG] Source emitting: value='test', extra='info'
[DEBUG] Receiver received kwargs: {'value': 'test', 'extra': 'info'}
[DEBUG] Extracted: value=test, extra=info
Status: completed
Key Points:
Print kwargs in handlers to see what data is received
Check parameter names match between
define_event()and handlerVerify connections are set up correctly
Use debug prints during development
Common Pitfalls¶
Pitfall 1: Not handling missing parameters
def process(self, data):
# Assumes 'data' is always provided
result = data.upper() # May fail if data is None
Solution: Always use defaults: data = data or kwargs.get("data", "")
Pitfall 2: Wrong parameter names
self.output_event = self.define_event("output", ["user_name"]) # Defined as "user_name"
def receive(self, username=None, **kwargs): # Looking for "username"!
username = username or kwargs.get("username") # Won't find it
Solution: Use exact parameter names from define_event(), or use parameter mapping.
Pitfall 3: Modifying mutable data structures
def process(self, data=None, **kwargs):
data_value = data or kwargs.get("data", {})
data_value["modified"] = True # Modifies original object!
Solution: Create copies when modifying: data_value = dict(data_value) or
data_value = data_value.copy()
Best Practices¶
Always use safe extraction pattern:
param = param or kwargs.get("param", default)Use parameter mapping for interface adaptation: Match different naming conventions
Validate data types: Check types before processing complex structures
Use debug prints during development: Inspect kwargs to understand data flow
Document parameter names: Make it clear what parameters events emit
Copy mutable data when modifying: Avoid side effects from shared references
Next Steps¶
Now that you understand data flow, let’s move on to State Management to learn how to track execution state and monitor your workflows.