"""
Conditional router routine.
Routes data to different outputs based on conditions.
"""
from __future__ import annotations
from typing import Dict, Any, Callable, Optional, Union
from routilux.routine import Routine
from serilux import (
register_serializable,
serialize_callable_with_fallback,
deserialize_callable,
)
[docs]
@register_serializable
class ConditionalRouter(Routine):
"""Routine for routing data based on conditions.
This routine evaluates conditions on input data and routes it to
different output events based on which conditions are met.
Features:
- Multiple conditional routes
- Configurable condition functions, string expressions, or dictionaries
- Default route for unmatched cases
- Priority-based routing
- Access to routine's config and stats in conditions
- Full serialization support
Condition Types:
- **String expressions** (recommended): Fully serializable, can access
``data``, ``config``, and ``stats`` variables
- **Dictionary conditions**: Field matching, fully serializable
- **Function references**: Module-level functions, serializable if in module.
Can accept ``data``, ``config``, and ``stats`` as parameters
- **Lambda functions**: Can be used at runtime, may be converted to string
expressions during serialization (if source code is available).
Can access external variables via closure, but closure variables are lost
during serialization
Examples:
Using string expressions with config access (recommended):
>>> router = ConditionalRouter()
>>> router.set_config(
... routes=[
... ("high", "data.get('value', 0) > config.get('threshold', 0)"),
... ("low", "data.get('value', 0) <= config.get('threshold', 0)"),
... ],
... threshold=10
... )
>>> router.input_slot.receive({"data": {"value": 15}}) # Routes to "high"
Using string expressions with stats access:
>>> router = ConditionalRouter()
>>> router.set_config(
... routes=[
... ("active", "stats.get('count', 0) < 10"),
... ("full", "stats.get('count', 0) >= 10"),
... ]
... )
>>> # Note: set_stat() is deprecated, use JobState for execution state
>>> router.input_slot.receive({"data": {}}) # Routes to "active"
Using dictionary conditions:
>>> router = ConditionalRouter()
>>> router.set_config(
... routes=[
... ("high", {"priority": "high"}),
... ("low", {"priority": "low"}),
... ]
... )
>>> router.input_slot.receive({"data": {"priority": "high"}})
Using lambda functions (runtime only, serialization may fail):
>>> threshold = 10
>>> router = ConditionalRouter()
>>> router.set_config(
... routes=[
... ("high", lambda data: data.get('value', 0) > threshold),
... ]
... )
>>> # Lambda works at runtime but may not serialize properly
"""
[docs]
def __init__(self):
"""Initialize ConditionalRouter routine."""
super().__init__()
# Set default configuration
self.set_config(
routes=[], # List of (route_name, condition_func) tuples
default_route=None, # Default route name if no condition matches
route_priority="first_match", # "first_match" or "all_matches"
)
# Define input slot
self.input_slot = self.define_slot("input", handler=self._handle_input)
# Default output event (will be created dynamically)
self.default_output = self.define_event("output", ["data", "route"])
def _handle_input(self, data: Any = None, **kwargs):
"""Handle input data and route it.
Args:
data: Data to route.
**kwargs: Additional data from slot. If 'data' is not provided,
will use kwargs or the first value.
"""
# Extract data using Routine helper method
data = self._extract_input_data(data, **kwargs)
# Operation tracking removed - use JobState for execution state
routes = self.get_config("routes", [])
default_route = self.get_config("default_route", None)
route_priority = self.get_config("route_priority", "first_match")
matched_routes = []
# Evaluate conditions
for route_name, condition in routes:
try:
if isinstance(condition, str):
# String expression condition
result = self._evaluate_string_condition(data, condition)
if result:
matched_routes.append(route_name)
if route_priority == "first_match":
break
elif callable(condition):
# Function condition
# Pass data, config, and stats to the function if it accepts them
try:
import inspect
sig = inspect.signature(condition)
params = list(sig.parameters.keys())
# Check if function accepts config or stats
if len(params) == 1:
# Single parameter: assume it's data
result = condition(data)
elif len(params) == 2:
# Two parameters: try to pass data and config/stats
if "config" in params or "stats" in params:
# Pass both data and config/stats as keyword arguments
func_kwargs = {"data": data}
if "config" in params:
func_kwargs["config"] = self._config
if "stats" in params:
# Deprecated: stats is now in JobState, provide empty dict for backward compatibility
func_kwargs["stats"] = {}
result = condition(**func_kwargs)
else:
# Pass data as first positional arg, config as second
result = condition(data, self._config)
else:
# Multiple parameters: try to pass all as keyword arguments
func_kwargs = {}
if "data" in params:
func_kwargs["data"] = data
if "config" in params:
func_kwargs["config"] = self._config
if "stats" in params:
# Deprecated: stats is now in JobState, provide empty dict for backward compatibility
func_kwargs["stats"] = {}
if func_kwargs:
result = condition(**func_kwargs)
else:
# Fallback: just pass data
result = condition(data)
except Exception:
# Fallback: just pass data
result = condition(data)
if result:
matched_routes.append(route_name)
if route_priority == "first_match":
break
elif isinstance(condition, dict):
# Dictionary-based condition (field matching)
if self._evaluate_dict_condition(data, condition):
matched_routes.append(route_name)
if route_priority == "first_match":
break
except Exception:
# Operation tracking removed - use JobState for execution state
pass
# Route data
if matched_routes:
for route_name in matched_routes:
# Get or create event for this route
event = self.get_event(route_name)
if event is None:
event = self.define_event(route_name, ["data", "route"])
self.emit(route_name, data=data, route=route_name)
# Statistics tracking removed - use JobState for execution state
else:
# Use default route
if default_route:
event = self.get_event(default_route)
if event is None:
event = self.define_event(default_route, ["data", "route"])
self.emit(default_route, data=data, route=default_route)
# Statistics tracking removed - use JobState for execution state
else:
# Emit to default output
self.emit("output", data=data, route="unmatched")
# Statistics tracking removed - use JobState for execution state
def _evaluate_dict_condition(self, data: Any, condition: Dict[str, Any]) -> bool:
"""Evaluate a dictionary-based condition.
Args:
data: Data to evaluate.
condition: Condition dictionary with field -> expected_value mappings.
Returns:
True if condition matches, False otherwise.
"""
if not isinstance(data, dict):
return False
for field, expected_value in condition.items():
if field not in data:
return False
actual_value = data[field]
# Support callable expected values (custom comparison)
if callable(expected_value):
if not expected_value(actual_value):
return False
elif actual_value != expected_value:
return False
return True
def _evaluate_string_condition(self, data: Any, condition: str) -> bool:
"""Evaluate a string expression condition.
Args:
data: Data to evaluate.
condition: String expression to evaluate (e.g., "data.get('priority') == 'high'").
Returns:
True if condition matches, False otherwise.
Note:
The expression is evaluated in a restricted scope for security.
Only basic operations and data access are allowed.
The expression can access:
- ``data``: The input data being evaluated
- ``config``: The routine's configuration dictionary (``_config``)
- ``stats``: The routine's statistics dictionary (``_stats``)
Examples:
Access config in condition:
"data.get('value', 0) > config.get('threshold', 0)"
Access stats in condition:
"stats.get('count', 0) < 10"
"""
try:
# Restricted scope for safe evaluation
safe_globals = {
"__builtins__": {
"isinstance": isinstance,
"dict": dict,
"list": list,
"str": str,
"int": int,
"float": float,
"bool": bool,
"len": len,
"getattr": getattr,
"hasattr": hasattr,
}
}
# Provide data, config, and stats to the expression
# Note: stats is now accessed from JobState, but for backward compatibility
# in string expressions, we provide an empty dict as stats
# Users should use JobState for execution state instead
safe_locals = {
"data": data,
"config": self._config,
"stats": {}, # Deprecated: use JobState for execution state
}
result = eval(condition, safe_globals, safe_locals)
return bool(result)
except Exception:
return False
[docs]
def serialize(self) -> Dict[str, Any]:
"""Serialize ConditionalRouter, handling lambda functions in routes.
Callable conditions are automatically serialized using the serialization module's
smart serialization with fallback to expression extraction.
Returns:
Serialized dictionary.
"""
data = super().serialize()
# Process routes configuration to serialize callable conditions
routes = self.get_config("routes", [])
serialized_routes = []
for route_name, condition in routes:
if callable(condition):
# Use smart serialization with automatic fallback to expression extraction
try:
condition_data = serialize_callable_with_fallback(
condition, owner=self, fallback_to_expression=True
)
serialized_routes.append((route_name, condition_data))
except ValueError as e:
# Re-raise with route name context
raise ValueError(
f"Condition for route '{route_name}' cannot be serialized: {str(e)}"
) from e
else:
# Non-callable (dict, str, etc.) - serialize directly
serialized_routes.append((route_name, condition))
# Update config in serialized data
if "_config" in data:
data["_config"]["routes"] = serialized_routes
return data
[docs]
def deserialize(self, data: Dict[str, Any], registry: Optional[Any] = None) -> None:
"""Deserialize ConditionalRouter, restoring callable conditions from routes.
Callable conditions are automatically deserialized by the serialization module,
including support for lambda expressions.
Args:
data: Serialized dictionary.
registry: Optional ObjectRegistry for deserializing callables.
"""
super().deserialize(data, registry=registry)
# Process routes configuration to restore callable conditions
# Most callables are already deserialized by super().deserialize(), but we need
# to handle lambda_expression format explicitly since it's stored in config
routes = self.get_config("routes", [])
deserialized_routes = []
for route_name, condition_data in routes:
# If already deserialized by super().deserialize(), use directly
if callable(condition_data):
deserialized_routes.append((route_name, condition_data))
continue
# Try to deserialize using the serialization module
if isinstance(condition_data, dict) and "_type" in condition_data:
condition = deserialize_callable(condition_data, registry=registry)
if condition:
deserialized_routes.append((route_name, condition))
else:
# If deserialization failed, check if it's a lambda_expression with error
if condition_data.get("_type") == "lambda_expression":
# deserialize_callable should have handled this, but if it failed,
# extract more information for error message
expr = condition_data.get("expression", "unknown")
raise ValueError(
f"Failed to deserialize lambda condition for route '{route_name}': "
f"cannot restore lambda expression '{expr}'. "
f"The expression may contain unsupported syntax or operations."
)
else:
# Extract more information for error message
callable_type = condition_data.get("callable_type") or condition_data.get(
"_type", "unknown"
)
module_name = condition_data.get("module", "unknown")
function_name = condition_data.get("name") or condition_data.get(
"method_name", "unknown"
)
raise ValueError(
f"Failed to deserialize {callable_type} condition for route '{route_name}': "
f"cannot restore {callable_type} '{function_name}' from module '{module_name}'. "
f"The function may not exist in the module or the module cannot be imported."
)
else:
# Non-serialized format (dict, str, etc.) - use directly
deserialized_routes.append((route_name, condition_data))
# Update config
self.set_config(routes=deserialized_routes)
[docs]
def add_route(self, route_name: str, condition: Union[Callable, Dict[str, Any], str]) -> None:
"""Add a routing condition.
Args:
route_name: Name of the route (will be used as event name).
condition: Condition function, dictionary, or string expression.
"""
routes = self.get_config("routes", [])
routes.append((route_name, condition))
self.set_config(routes=routes)
# Pre-create event for this route
if self.get_event(route_name) is None:
self.define_event(route_name, ["data", "route"])