Source code for routilux.builtin_routines.data_processing.data_transformer
"""
Data transformer routine.
Transforms data using configurable transformation functions.
"""
from __future__ import annotations
from typing import Any, Callable
from routilux.routine import Routine
[docs]
class DataTransformer(Routine):
"""Routine for transforming data using configurable functions.
This routine applies transformation functions to input data, useful
for data cleaning, normalization, and format conversion.
Features:
- Configurable transformation functions
- Chain multiple transformations
- Support for custom transformation logic
- Error handling and validation
Examples:
>>> transformer = DataTransformer()
>>> transformer.set_config(
... transformations=["lowercase", "strip_whitespace"]
... )
>>> transformer.define_slot("input", handler=transformer.transform)
>>> transformer.define_event("output", ["transformed_data", "transformation_applied"])
"""
[docs]
def __init__(self):
"""Initialize DataTransformer routine."""
super().__init__()
# Set default configuration
self.set_config(
transformations=[], # List of transformation names or functions
transformation_map={}, # Map of transformation names to functions
chain_transformations=True, # Apply transformations in sequence
)
# Register built-in transformations
self._register_builtin_transformations()
# Define input slot
self.input_slot = self.define_slot("input", handler=self._handle_input)
# Define output event
self.output_event = self.define_event(
"output", ["transformed_data", "transformation_applied", "errors"]
)
def _register_builtin_transformations(self):
"""Register built-in transformation functions."""
builtins = {
"lowercase": lambda x: str(x).lower() if isinstance(x, str) else x,
"uppercase": lambda x: str(x).upper() if isinstance(x, str) else x,
"strip_whitespace": lambda x: str(x).strip() if isinstance(x, str) else x,
"to_string": lambda x: str(x),
"to_int": lambda x: int(x) if isinstance(x, (int, float, str)) else x,
"to_float": lambda x: float(x) if isinstance(x, (int, float, str)) else x,
"remove_none": lambda x: (
{k: v for k, v in x.items() if v is not None} if isinstance(x, dict) else x
),
}
# Get existing map or create new one
current_map = self._config.get("transformation_map", {})
current_map.update(builtins)
self._config["transformation_map"] = current_map
def _handle_input(self, data: Any = None, transformations: list | None = None, **kwargs):
"""Handle input data and apply transformations.
Args:
data: Data to transform.
transformations: Optional list of transformation names/functions.
If not provided, uses _config["transformations"].
**kwargs: Additional data from slot. If 'data' is not provided,
will use kwargs or the first value.
"""
# Extract data using Routine helper method
data = self._extract_input_data(data, **kwargs)
# Track statistics
# Operation tracking removed - use JobState for execution state
# Get transformations from input or config
transformations = transformations or self.get_config("transformations", [])
transformation_map = self.get_config("transformation_map", {})
transformed_data = data
applied_transformations = []
errors = []
# Apply transformations
for transform in transformations:
try:
if isinstance(transform, str):
# Look up transformation function
if transform in transformation_map:
transform_func = transformation_map[transform]
transformed_data = transform_func(transformed_data)
applied_transformations.append(transform)
else:
errors.append(f"Unknown transformation: {transform}")
elif callable(transform):
# Direct function
transformed_data = transform(transformed_data)
applied_transformations.append(
transform.__name__ if hasattr(transform, "__name__") else "custom"
)
else:
errors.append(f"Invalid transformation: {transform}")
except Exception as e:
errors.append(f"Error applying {transform}: {str(e)}")
# Operation tracking removed - use JobState for execution state
# Emit result
self.emit(
"output",
transformed_data=transformed_data,
transformation_applied=applied_transformations,
errors=errors if errors else None,
)
[docs]
def register_transformation(self, name: str, func: Callable) -> None:
"""Register a custom transformation function.
Args:
name: Transformation name.
func: Transformation function that takes data and returns transformed data.
"""
transformation_map = self.get_config("transformation_map", {})
transformation_map[name] = func
self.set_config(transformation_map=transformation_map)