Getting Started

In this first tutorial, you’ll learn the basics of Routilux by creating a simple routine and connecting it in a flow. By the end, you’ll understand the core concepts of routines, slots, events, and flows.

Learning Objectives

By the end of this tutorial, you’ll be able to:

  • Create a custom routine with slots and events

  • Define slot handlers to process incoming data

  • Emit events to send data to other routines

  • Create a flow and add routines to it

  • Connect routines together

  • Execute a flow and check results

Step 1: Understanding Routines, Slots, and Events

Routilux is built around three core concepts:

  • Routine: A unit of work that processes data

  • Slot: An input mechanism that receives data (think of it as a “receiver”)

  • Event: An output mechanism that sends data (think of it as a “sender”)

Let’s create a simple routine that receives data through a slot and emits it through an event:

 1from routilux import Routine
 2
 3class Greeter(Routine):
 4    """A simple routine that greets someone"""
 5
 6    def __init__(self):
 7        super().__init__()
 8        # Define an input slot with a handler function
 9        self.input_slot = self.define_slot("input", handler=self.greet)
10        # Define an output event
11        self.output_event = self.define_event("output", ["message"])
12
13    def greet(self, name=None, **kwargs):
14        """Handle incoming data and emit a greeting"""
15        # Extract the name from kwargs if not provided directly
16        name = name or kwargs.get("name", "World")
17
18        # Create a greeting message
19        message = f"Hello, {name}!"
20
21        # Emit the message through the output event
22        # Flow is automatically detected from routine context
23        self.emit("output", message=message)

Key Points:

  • All routines inherit from Routine base class

  • Slots are defined with define_slot() and require a handler function

  • Events are defined with define_event() and specify parameter names

  • The handler function receives data through keyword arguments

  • emit() automatically detects the flow from routine context (no need to pass flow parameter)

Step 2: Creating Your First Flow

A Flow is a container that manages multiple routines and their connections. Let’s create a flow and add our Greeter routine:

 1from routilux import Flow
 2
 3# Create a flow
 4flow = Flow(flow_id="greeting_flow")
 5
 6# Create a routine instance
 7greeter = Greeter()
 8
 9# Add the routine to the flow
10greeter_id = flow.add_routine(greeter, "greeter")
11
12print(f"Added routine with ID: {greeter_id}")

Expected Output:

Added routine with ID: <some-uuid>

Key Points:

  • Each flow has a unique flow_id (auto-generated if not provided)

  • Routines are added to flows with add_routine() which returns a routine ID

  • The routine ID is used to reference the routine when making connections

Step 3: Executing a Flow

To execute a flow, we need to call execute() on an entry routine. An entry routine is one that has a slot we can trigger directly. Let’s execute our flow:

 1from routilux import Flow, Routine
 2
 3class Greeter(Routine):
 4    def __init__(self):
 5        super().__init__()
 6        # Use "trigger" as the slot name for entry routines
 7        self.trigger_slot = self.define_slot("trigger", handler=self.greet)
 8        self.output_event = self.define_event("output", ["message"])
 9
10    def greet(self, name=None, **kwargs):
11        name = name or kwargs.get("name", "World")
12        message = f"Hello, {name}!"
13        self.emit("output", message=message)
14
15# Create and execute flow
16flow = Flow(flow_id="greeting_flow")
17greeter = Greeter()
18greeter_id = flow.add_routine(greeter, "greeter")
19
20# Execute the flow with entry parameters
21job_state = flow.execute(greeter_id, entry_params={"name": "Routilux"})
22
23# Check execution status
24print(f"Execution status: {job_state.status}")

Expected Output:

Execution status: completed

Key Points:

  • execute() takes a routine ID and optional entry_params

  • Entry parameters are passed to the entry routine’s slot handler

  • execute() returns a JobState object that tracks execution status

  • The status will be “completed” if execution succeeds

Step 4: Connecting Two Routines

Now let’s create two routines and connect them. The first routine will send data to the second:

 1from routilux import Flow, Routine
 2
 3class DataSource(Routine):
 4    """A routine that generates data"""
 5
 6    def __init__(self):
 7        super().__init__()
 8        self.trigger_slot = self.define_slot("trigger", handler=self.generate)
 9        self.output_event = self.define_event("output", ["data"])
10
11    def generate(self, value=None, **kwargs):
12        value = value or kwargs.get("value", "default")
13        self.emit("output", data=value)
14
15class DataProcessor(Routine):
16    """A routine that processes data"""
17
18    def __init__(self):
19        super().__init__()
20        self.input_slot = self.define_slot("input", handler=self.process)
21        self.output_event = self.define_event("output", ["result"])
22
23    def process(self, data=None, **kwargs):
24        # Extract data from kwargs
25        data_value = data or kwargs.get("data", "no data")
26        result = f"Processed: {data_value}"
27        self.emit("output", result=result)
28        print(f"Processor received: {data_value}, produced: {result}")
29
30# Create flow
31flow = Flow(flow_id="data_flow")
32
33# Create routines
34source = DataSource()
35processor = DataProcessor()
36
37# Add to flow
38source_id = flow.add_routine(source, "source")
39processor_id = flow.add_routine(processor, "processor")
40
41# Connect: source's output event -> processor's input slot
42flow.connect(source_id, "output", processor_id, "input")
43
44# Execute from source
45job_state = flow.execute(source_id, entry_params={"value": "Hello"})
46
47print(f"Status: {job_state.status}")

Expected Output:

Processor received: Hello, produced: Processed: Hello
Status: completed

Key Points:

  • connect() links an event from one routine to a slot in another

  • The connection format is: flow.connect(source_id, "event_name", target_id, "slot_name")

  • When the source emits an event, connected slots automatically receive the data

  • Data flows automatically through connections

Step 5: Complete Example - A Simple Pipeline

Let’s create a complete example with three routines connected in a pipeline:

 1from routilux import Flow, Routine
 2
 3class DataSource(Routine):
 4    """Generate data"""
 5
 6    def __init__(self):
 7        super().__init__()
 8        self.trigger_slot = self.define_slot("trigger", handler=self.generate)
 9        self.output_event = self.define_event("output", ["data"])
10
11    def generate(self, text=None, **kwargs):
12        text = text or kwargs.get("text", "default")
13        self.emit("output", data=text)
14
15class Transformer(Routine):
16    """Transform data to uppercase"""
17
18    def __init__(self):
19        super().__init__()
20        self.input_slot = self.define_slot("input", handler=self.transform)
21        self.output_event = self.define_event("output", ["transformed"])
22
23    def transform(self, data=None, **kwargs):
24        data_value = data or kwargs.get("data", "")
25        transformed = data_value.upper()
26        self.emit("output", transformed=transformed)
27
28class Printer(Routine):
29    """Print the final result"""
30
31    def __init__(self):
32        super().__init__()
33        self.input_slot = self.define_slot("input", handler=self.print_result)
34
35    def print_result(self, transformed=None, **kwargs):
36        result = transformed or kwargs.get("transformed", "")
37        print(f"Final result: {result}")
38
39def main():
40    # Create flow
41    flow = Flow(flow_id="pipeline")
42
43    # Create routines
44    source = DataSource()
45    transformer = Transformer()
46    printer = Printer()
47
48    # Add to flow
49    source_id = flow.add_routine(source, "source")
50    transformer_id = flow.add_routine(transformer, "transformer")
51    printer_id = flow.add_routine(printer, "printer")
52
53    # Connect: source -> transformer -> printer
54    flow.connect(source_id, "output", transformer_id, "input")
55    flow.connect(transformer_id, "output", printer_id, "input")
56
57    # Execute
58    print("Executing pipeline...")
59    job_state = flow.execute(source_id, entry_params={"text": "hello, routilux!"})
60
61    print(f"Pipeline status: {job_state.status}")
62
63if __name__ == "__main__":
64    main()

Expected Output:

Executing pipeline...
Final result: HELLO, ROUTILUX!
Pipeline status: completed

Key Points:

  • Routines can be connected in chains (pipelines)

  • Data flows automatically from one routine to the next

  • Each routine processes data and passes it along

  • The flow executes all connected routines automatically

Common Pitfalls

Pitfall 1: Forgetting to call super().__init__()

class MyRoutine(Routine):
    def __init__(self):
        # Missing super().__init__()!
        self.input_slot = self.define_slot("input", handler=self.process)
        # This will fail because _slots and _events are not initialized

Solution: Always call super().__init__() first in your __init__ method.

Pitfall 2: Not defining events before emitting

class MyRoutine(Routine):
    def __init__(self):
        super().__init__()
        self.input_slot = self.define_slot("input", handler=self.process)
        # Forgot to define the event!

    def process(self, **kwargs):
        self.emit("output", data="value")  # Error: event not defined!

Solution: Always define events with define_event() before using them in emit().

Pitfall 3: Wrong parameter names in emit()

class MyRoutine(Routine):
    def __init__(self):
        super().__init__()
        self.output_event = self.define_event("output", ["message"])  # Defined as "message"

    def process(self, **kwargs):
        self.emit("output", msg="Hello")  # Wrong parameter name "msg"!

Solution: Use the exact parameter names specified in define_event(). In this case, use message="Hello".

Pitfall 4: Not handling data extraction properly

def process(self, data, **kwargs):
    # This assumes 'data' is always provided as a positional argument
    # But it might come as a keyword argument instead
    result = f"Processed: {data}"

Solution: Use the pattern data = data or kwargs.get("data", default_value) to handle both cases.

Best Practices

  1. Use descriptive names: Choose clear names for routines, slots, and events

  2. Define events with parameter names: Always specify parameter names in define_event()

  3. Handle data extraction flexibly: Use data or kwargs.get("data", default) pattern

  4. Use “trigger” for entry slots: Convention for slots that start execution

  5. Print or log in handlers: Helps with debugging during development

  6. Check job_state.status: Always verify execution completed successfully

Next Steps

Now that you understand the basics, let’s move on to Connecting Routines to learn about more complex connection patterns, multiple connections, and understanding the event queue architecture.