Quick Start Guide¶
This guide will help you get started with Routilux quickly. We’ll cover everything from basic concepts to advanced features, with practical examples you can run immediately.
Installation¶
Install Routilux using pip:
pip install routilux
For development with all dependencies:
pip install routilux[dev]
Basic Concepts¶
Routilux is built around a few simple concepts:
Routine: A unit of work that can receive input through slots and emit output through events
Flow: A manager that orchestrates multiple routines and their connections
Event: An output mechanism that can be connected to slots
Slot: An input mechanism that can receive data from events
Connection: A link between an event and a slot
Creating Your First Routine¶
Let’s create a simple routine that processes data:
from routilux import Routine
class DataProcessor(Routine):
def __init__(self):
super().__init__()
# Define an input slot with a handler
self.input_slot = self.define_slot("input", handler=self.process_data)
# Define an output event
self.output_event = self.define_event("output", ["result"])
def process_data(self, data=None, **kwargs):
# Extract input data using helper method
extracted_data = self._extract_input_data(data, **kwargs)
# Process the data
result = f"Processed: {extracted_data}"
# Track operation statistics
self._track_operation("processing", success=True)
# Emit the result (flow is automatically detected)
self.emit("output", result=result)
Creating a Flow¶
Now let’s create a flow and connect routines:
from routilux import Flow
# Create a flow
flow = Flow(flow_id="my_flow")
# Create routine instances
processor1 = DataProcessor()
processor2 = DataProcessor()
# Add routines to the flow
id1 = flow.add_routine(processor1, "processor1")
id2 = flow.add_routine(processor2, "processor2")
# Connect processor1's output to processor2's input
flow.connect(id1, "output", id2, "input")
Executing a Flow¶
Execute the flow with entry parameters:
# Execute the flow (non-blocking, returns immediately)
from routilux.job_state import JobState
job_state = flow.execute(id1, entry_params={"data": "test"})
# Wait for completion
JobState.wait_for_completion(flow, job_state, timeout=2.0)
# Check the status
print(job_state.status) # "completed"
# For long-running tasks, you can specify a custom timeout
job_state = flow.execute(
id1,
entry_params={"data": "test"},
timeout=600.0 # 10 minutes
)
# Check statistics
print(processor1.stats()) # {"processing": {"success": 1, "total": 1}}
Using Built-in Routines¶
Routilux comes with many built-in routines ready to use. Let’s use TextClipper to clip text:
from routilux import Flow
from routilux.builtin_routines import TextClipper, TextRenderer
flow = Flow()
# Create and configure built-in routines
renderer = TextRenderer()
renderer.set_config(tag_format="xml")
clipper = TextClipper()
clipper.set_config(max_length=100)
# Add to flow
renderer_id = flow.add_routine(renderer, "renderer")
clipper_id = flow.add_routine(clipper, "clipper")
# Connect: renderer -> clipper
flow.connect(renderer_id, "output", clipper_id, "input")
# Execute
data = {"name": "Alice", "age": 30, "city": "New York"}
renderer.input_slot.receive({"data": data})
# Get clipped result
print(clipper.get_stat("clipped_text"))
Using Conditional Router¶
ConditionalRouter lets you route data based on conditions:
from routilux import Flow
from routilux.builtin_routines import ConditionalRouter
flow = Flow()
router = ConditionalRouter()
router.set_config(
routes=[
("high", "data.get('priority', 0) > 10"),
("normal", "data.get('priority', 0) <= 10"),
],
default_route="normal"
)
router_id = flow.add_routine(router, "router")
# Define output events
router.define_event("high")
router.define_event("normal")
# Connect to different handlers
# flow.connect(router_id, "high", high_handler_id, "input")
# flow.connect(router_id, "normal", normal_handler_id, "input")
# Route data
router.input_slot.receive({"data": {"priority": 15}}) # Routes to "high"
Merge Strategies¶
When multiple events connect to the same slot, you can control how data is merged:
from routilux import Routine
class Aggregator(Routine):
def __init__(self):
super().__init__()
# Use "append" strategy to accumulate data
self.input_slot = self.define_slot(
"input",
handler=self.aggregate,
merge_strategy="append" # Accumulates data in lists
)
def aggregate(self, **kwargs):
# Access accumulated data
data = self.input_slot._data
print(f"Aggregated: {data}")
Available Merge Strategies:
“override” (default): New data replaces old data
“append”: Values are appended to lists
Custom function: Define your own merge logic
See Working with Connections for details.
Concurrent Execution¶
For I/O-bound operations, use concurrent execution to run multiple routines in parallel:
from routilux import Flow
# Create a concurrent flow
flow = Flow(
execution_strategy="concurrent",
max_workers=5
)
# Add routines (they'll execute concurrently when possible)
# ...
try:
# Execute - routines run in parallel
job_state = flow.execute(entry_routine_id, entry_params={"data": "test"})
# Wait for all concurrent tasks to complete
from routilux.job_state import JobState
JobState.wait_for_completion(flow, job_state, timeout=10.0)
finally:
# Always clean up resources
flow.shutdown(wait=True)
Key Points:
Routines that can run in parallel execute concurrently automatically
Use
wait_for_completion()to ensure all tasks finishAlways call
shutdown()to clean up thread poolThread-safe operations required for shared state
See Working with Flows for detailed execution order behavior.
Error Handling¶
Configure error handling strategies for robust workflows:
from routilux import Flow, ErrorHandler, ErrorStrategy
flow = Flow()
# Set error handler with retry strategy
error_handler = ErrorHandler(
strategy=ErrorStrategy.RETRY,
max_retries=3,
retry_delay=1.0
)
flow.set_error_handler(error_handler)
# Execute - errors will be retried automatically
job_state = flow.execute(entry_routine_id)
Available Strategies:
STOP: Stop execution on error (default)
CONTINUE: Continue execution, log error
RETRY: Retry failed routine up to max_retries
SKIP: Skip failed routine, continue with next
See Error Handling and Exception Management for details.
State Management¶
Track routine state using the _stats dictionary:
class MyRoutine(Routine):
def __init__(self):
super().__init__()
self.input_slot = self.define_slot("input", handler=self.process)
def process(self, data):
# Track operations
self._track_operation("processing", success=True)
# Set custom stats
self.set_stat("last_processed", data)
self.increment_stat("total_processed")
# Access stats
total = self.get_stat("total_processed", 0)
print(f"Total processed: {total}")
def get_summary(self):
return {
"total": self.get_stat("total_processed", 0),
"last": self.get_stat("last_processed"),
"stats": self.stats()
}
Helper Methods:
set_stat(key, value): Set a stat valueget_stat(key, default=None): Get a stat valueincrement_stat(key, amount=1): Increment a stat_track_operation(name, success=True): Track operation statisticsstats(): Get all statistics
See State Management for details.
Serialization¶
Serialize and deserialize flows for persistence:
from routilux import Flow
import json
# Create and configure flow
flow = Flow(flow_id="my_flow")
# ... add routines and connections ...
# Serialize
flow_data = flow.serialize()
# Save to file
with open("flow.json", "w") as f:
json.dump(flow_data, f, indent=2)
# Load and deserialize
with open("flow.json", "r") as f:
flow_data = json.load(f)
new_flow = Flow.deserialize(flow_data)
# Execute deserialized flow
job_state = new_flow.execute(entry_routine_id)
Important: All routines must have no-argument constructors for serialization to work.
Use _config dictionary for configuration instead of constructor parameters.
See Serialization for details.
Aggregation Pattern¶
Collect data from multiple sources before processing:
class ResultAggregator(Routine):
def __init__(self, expected_count=3):
super().__init__()
self.expected_count = expected_count
self.input_slot = self.define_slot(
"input",
handler=self._handle_input,
merge_strategy="append" # Accumulate data
)
self.output_event = self.define_event("output", ["results"])
def _handle_input(self, **kwargs):
# Check if we have enough data
count = self.get_stat("message_count", 0) + 1
self.set_stat("message_count", count)
if count >= self.expected_count:
# Process all accumulated data
results = self.input_slot._data
self.emit("output", results=results)
# Reset for next batch
self.input_slot._data = {}
self.reset_stats()
This pattern is useful for: * Collecting results from multiple parallel operations * Batching data for batch processing * Aggregating metrics from multiple sources
See Aggregation Pattern for details.
Complete Example¶
Here’s a complete example combining multiple features:
from routilux import Flow, Routine
from routilux.builtin_routines import TextRenderer, ConditionalRouter
from routilux import ErrorHandler, ErrorStrategy
# Define custom routine
class DataValidator(Routine):
def __init__(self):
super().__init__()
self.input_slot = self.define_slot("input", handler=self.validate)
self.output_event = self.define_event("output", ["data", "valid"])
def validate(self, data):
is_valid = isinstance(data, dict) and "value" in data
self._track_operation("validation", success=is_valid)
self.emit("output", data=data, valid=is_valid)
# Create flow
flow = Flow(flow_id="complete_example")
# Add error handler
flow.set_error_handler(ErrorHandler(strategy=ErrorStrategy.CONTINUE))
# Create routines
validator = DataValidator()
renderer = TextRenderer()
router = ConditionalRouter()
validator_id = flow.add_routine(validator, "validator")
renderer_id = flow.add_routine(renderer, "renderer")
router_id = flow.add_routine(router, "router")
# Configure router
router.set_config(routes=[
("valid", "data.get('valid') == True"),
("invalid", "data.get('valid') == False"),
])
router.define_event("valid")
router.define_event("invalid")
# Connect: validator -> renderer, validator -> router
flow.connect(validator_id, "output", renderer_id, "input")
flow.connect(validator_id, "output", router_id, "input")
# Execute
job_state = flow.execute(validator_id, entry_params={"data": {"value": 42}})
print(f"Status: {job_state.status}")
print(f"Validator stats: {validator.stats()}")
Next Steps¶
User Guide - Comprehensive user guide with detailed explanations
Built-in Routines - Explore all built-in routines
Working with Flows - Deep dive into flow execution and strategies
Working with Connections - Learn about merge strategies and parameter mapping
API Reference - Complete API documentation
Examples - Real-world examples and use cases
Tips for Success¶
Start Simple: Begin with basic routines and flows, then add complexity
Use Built-ins: Leverage built-in routines before creating custom ones
Track Operations: Use
_track_operation()for consistent statisticsHandle Errors: Always configure error handling for production workflows
Test Serialization: Verify your flows can be serialized/deserialized
Monitor Stats: Use
stats()to understand routine behaviorRead Documentation: Check the user guide for advanced features
Happy coding with Routilux! 🚀