Data Flow and Parameters

In this tutorial, you’ll learn how data flows through Routilux workflows, including parameter extraction, parameter mapping in connections, and best practices for handling data.

Learning Objectives

By the end of this tutorial, you’ll be able to:

  • Extract data from slot handler parameters correctly

  • Use parameter mapping in connections to transform parameter names

  • Understand how data is passed through events and slots

  • Handle different data types and structures

  • Debug data flow issues

Step 1: Understanding Parameter Extraction

Slot handlers receive data through keyword arguments. You need to extract the data correctly:

 1from routilux import Flow, Routine
 2
 3class DataSource(Routine):
 4    def __init__(self):
 5        super().__init__()
 6        self.trigger_slot = self.define_slot("trigger", handler=self.send)
 7        self.output_event = self.define_event("output", ["name", "age", "city"])
 8
 9    def send(self, **kwargs):
10        # Emit multiple parameters
11        self.emit("output", name="Alice", age=30, city="New York")
12
13class DataReceiver(Routine):
14    def __init__(self):
15        super().__init__()
16        self.input_slot = self.define_slot("input", handler=self.receive)
17
18    def receive(self, name=None, age=None, city=None, **kwargs):
19        # Extract parameters with defaults
20        name = name or kwargs.get("name", "Unknown")
21        age = age or kwargs.get("age", 0)
22        city = city or kwargs.get("city", "Unknown")
23
24        print(f"Received: {name}, {age} years old, from {city}")
25
26flow = Flow(flow_id="param_flow")
27
28source = DataSource()
29receiver = DataReceiver()
30
31source_id = flow.add_routine(source, "source")
32receiver_id = flow.add_routine(receiver, "receiver")
33
34flow.connect(source_id, "output", receiver_id, "input")
35
36job_state = flow.execute(source_id)
37print(f"Status: {job_state.status}")

Expected Output:

Received: Alice, 30 years old, from New York
Status: completed

Key Points:

  • Parameters are passed as keyword arguments to slot handlers

  • Use the pattern param = param or kwargs.get("param", default) for safe extraction

  • All parameters specified in define_event() are available in the handler

Step 2: Parameter Mapping in Connections

You can map parameter names when connecting events to slots. This is useful when you want to transform data or match different naming conventions:

 1from routilux import Flow, Routine
 2
 3class Source(Routine):
 4    def __init__(self):
 5        super().__init__()
 6        self.trigger_slot = self.define_slot("trigger", handler=self.send)
 7        self.output_event = self.define_event("output", ["user_name", "user_age"])
 8
 9    def send(self, **kwargs):
10        self.emit("output", user_name="Bob", user_age=25)
11
12class Target(Routine):
13    def __init__(self):
14        super().__init__()
15        self.input_slot = self.define_slot("input", handler=self.receive)
16
17    def receive(self, name=None, age=None, **kwargs):
18        # Expects "name" and "age", not "user_name" and "user_age"
19        name = name or kwargs.get("name", "Unknown")
20        age = age or kwargs.get("age", 0)
21        print(f"Target received: name={name}, age={age}")
22
23flow = Flow(flow_id="mapping_flow")
24
25source = Source()
26target = Target()
27
28source_id = flow.add_routine(source, "source")
29target_id = flow.add_routine(target, "target")
30
31# Map parameters: user_name -> name, user_age -> age
32flow.connect(
33    source_id, "output", target_id, "input",
34    param_mapping={"user_name": "name", "user_age": "age"}
35)
36
37job_state = flow.execute(source_id)
38print(f"Status: {job_state.status}")

Expected Output:

Target received: name=Bob, age=25
Status: completed

Key Points:

  • Use param_mapping in connect() to transform parameter names

  • Mapping format: {"source_param": "target_param"}

  • Unmapped parameters are passed through unchanged

  • This is useful for adapting different routine interfaces

Step 3: Handling Complex Data Structures

You can pass complex data structures (dicts, lists) through events:

 1from routilux import Flow, Routine
 2
 3class DataSource(Routine):
 4    def __init__(self):
 5        super().__init__()
 6        self.trigger_slot = self.define_slot("trigger", handler=self.send)
 7        self.output_event = self.define_event("output", ["data"])
 8
 9    def send(self, **kwargs):
10        # Send a complex data structure
11        complex_data = {
12            "users": [
13                {"name": "Alice", "scores": [85, 90, 88]},
14                {"name": "Bob", "scores": [92, 87, 91]}
15            ],
16            "metadata": {"version": "1.0", "timestamp": "2024-01-01"}
17        }
18        self.emit("output", data=complex_data)
19
20class DataProcessor(Routine):
21    def __init__(self):
22        super().__init__()
23        self.input_slot = self.define_slot("input", handler=self.process)
24        self.output_event = self.define_event("output", ["result"])
25
26    def process(self, data=None, **kwargs):
27        data_value = data or kwargs.get("data", {})
28
29        # Process the complex data
30        if isinstance(data_value, dict) and "users" in data_value:
31            total_scores = []
32            for user in data_value["users"]:
33                scores = user.get("scores", [])
34                total_scores.extend(scores)
35
36            result = {
37                "total_users": len(data_value["users"]),
38                "average_score": sum(total_scores) / len(total_scores) if total_scores else 0,
39                "metadata": data_value.get("metadata", {})
40            }
41
42            print(f"Processed {result['total_users']} users")
43            print(f"Average score: {result['average_score']:.2f}")
44
45            self.emit("output", result=result)
46
47flow = Flow(flow_id="complex_flow")
48
49source = DataSource()
50processor = DataProcessor()
51
52source_id = flow.add_routine(source, "source")
53processor_id = flow.add_routine(processor, "processor")
54
55flow.connect(source_id, "output", processor_id, "input")
56
57job_state = flow.execute(source_id)
58print(f"Status: {job_state.status}")

Expected Output:

Processed 2 users
Average score: 88.83
Status: completed

Key Points:

  • You can pass any Python object through events (dicts, lists, custom objects)

  • Always check data types and structure in handlers

  • Complex data structures are passed by reference (not copied)

  • Be careful with mutable objects - modifications affect all references

Step 4: Using _extract_input_data Helper

Routilux provides a helper method _extract_input_data() for easier data extraction:

 1from routilux import Flow, Routine
 2
 3class DataSource(Routine):
 4    def __init__(self):
 5        super().__init__()
 6        self.trigger_slot = self.define_slot("trigger", handler=self.send)
 7        self.output_event = self.define_event("output", ["data", "metadata"])
 8
 9    def send(self, **kwargs):
10        self.emit("output", data="test data", metadata={"source": "tutorial"})
11
12class DataReceiver(Routine):
13    def __init__(self):
14        super().__init__()
15        self.input_slot = self.define_slot("input", handler=self.receive)
16
17    def receive(self, data=None, metadata=None, **kwargs):
18        # Use helper method for cleaner extraction
19        data_value = self._extract_input_data(data, **kwargs)
20        metadata_value = self._extract_input_data(metadata, **kwargs)
21
22        print(f"Data: {data_value}")
23        print(f"Metadata: {metadata_value}")
24
25flow = Flow(flow_id="helper_flow")
26
27source = DataSource()
28receiver = DataReceiver()
29
30source_id = flow.add_routine(source, "source")
31receiver_id = flow.add_routine(receiver, "receiver")
32
33flow.connect(source_id, "output", receiver_id, "input")
34
35job_state = flow.execute(source_id)
36print(f"Status: {job_state.status}")

Expected Output:

Data: test data
Metadata: {'source': 'tutorial'}
Status: completed

Key Points:

  • _extract_input_data() is a convenience method for data extraction

  • It handles both direct parameters and kwargs

  • Use it for cleaner, more readable code

Step 5: Debugging Data Flow

When debugging data flow issues, it’s helpful to inspect what data is being passed:

 1from routilux import Flow, Routine
 2
 3class DebugSource(Routine):
 4    def __init__(self):
 5        super().__init__()
 6        self.trigger_slot = self.define_slot("trigger", handler=self.send)
 7        self.output_event = self.define_event("output", ["value", "extra"])
 8
 9    def send(self, **kwargs):
10        print(f"[DEBUG] Source emitting: value='test', extra='info'")
11        self.emit("output", value="test", extra="info")
12
13class DebugReceiver(Routine):
14    def __init__(self):
15        super().__init__()
16        self.input_slot = self.define_slot("input", handler=self.receive)
17
18    def receive(self, **kwargs):
19        # Print all received kwargs for debugging
20        print(f"[DEBUG] Receiver received kwargs: {kwargs}")
21
22        # Extract specific values
23        value = kwargs.get("value", "NOT_FOUND")
24        extra = kwargs.get("extra", "NOT_FOUND")
25
26        print(f"[DEBUG] Extracted: value={value}, extra={extra}")
27
28flow = Flow(flow_id="debug_flow")
29
30source = DebugSource()
31receiver = DebugReceiver()
32
33source_id = flow.add_routine(source, "source")
34receiver_id = flow.add_routine(receiver, "receiver")
35
36flow.connect(source_id, "output", receiver_id, "input")
37
38job_state = flow.execute(source_id)
39print(f"Status: {job_state.status}")

Expected Output:

[DEBUG] Source emitting: value='test', extra='info'
[DEBUG] Receiver received kwargs: {'value': 'test', 'extra': 'info'}
[DEBUG] Extracted: value=test, extra=info
Status: completed

Key Points:

  • Print kwargs in handlers to see what data is received

  • Check parameter names match between define_event() and handler

  • Verify connections are set up correctly

  • Use debug prints during development

Common Pitfalls

Pitfall 1: Not handling missing parameters

def process(self, data):
    # Assumes 'data' is always provided
    result = data.upper()  # May fail if data is None

Solution: Always use defaults: data = data or kwargs.get("data", "")

Pitfall 2: Wrong parameter names

self.output_event = self.define_event("output", ["user_name"])  # Defined as "user_name"

def receive(self, username=None, **kwargs):  # Looking for "username"!
    username = username or kwargs.get("username")  # Won't find it

Solution: Use exact parameter names from define_event(), or use parameter mapping.

Pitfall 3: Modifying mutable data structures

def process(self, data=None, **kwargs):
    data_value = data or kwargs.get("data", {})
    data_value["modified"] = True  # Modifies original object!

Solution: Create copies when modifying: data_value = dict(data_value) or data_value = data_value.copy()

Best Practices

  1. Always use safe extraction pattern: param = param or kwargs.get("param", default)

  2. Use parameter mapping for interface adaptation: Match different naming conventions

  3. Validate data types: Check types before processing complex structures

  4. Use debug prints during development: Inspect kwargs to understand data flow

  5. Document parameter names: Make it clear what parameters events emit

  6. Copy mutable data when modifying: Avoid side effects from shared references

Next Steps

Now that you understand data flow, let’s move on to State Management to learn how to track execution state and monitor your workflows.