Source code for routilux.builtin_routines.utils.data_flattener

"""
Data flattener routine.

Flattens nested data structures into flat dictionaries.
"""

from __future__ import annotations

from typing import Any

from serilux import Serializable

from routilux.routine import Routine


[docs] class DataFlattener(Routine): """Routine for flattening nested data structures. This routine converts nested dictionaries, lists, and Serializable objects into flat dictionaries with dot-notation keys. Features: - Recursive flattening of nested structures - Handles Serializable objects - Configurable separator for keys - Preserves list indices Examples: >>> flattener = DataFlattener() >>> flattener.set_config(separator=".") >>> flattener.define_slot("input", handler=flattener.flatten) >>> flattener.define_event("output", ["flattened_data"]) >>> # Input: {"a": {"b": 1, "c": [2, 3]}} >>> # Output: {"a.b": 1, "a.c.0": 2, "a.c.1": 3} """
[docs] def __init__(self): """Initialize DataFlattener routine.""" super().__init__() # Set default configuration self.set_config( separator=".", preserve_lists=True, max_depth=100, # Prevent infinite recursion ) # Define input slot self.input_slot = self.define_slot("input", handler=self._handle_input) # Define output event self.output_event = self.define_event( "output", ["flattened_data", "original_type", "depth"] )
def _handle_input(self, data: Any = None, **kwargs): """Handle input data and flatten it. Args: data: Data to flatten (dict, list, or Serializable object). **kwargs: Additional data from slot. If 'data' is not provided, will use kwargs or the first value. """ # Extract data using Routine helper method data = self._extract_input_data(data, **kwargs) # Track statistics # Operation tracking removed - use JobState for execution state flattened_data, original_type, depth = self._flatten(data) # Emit result self.emit("output", flattened_data=flattened_data, original_type=original_type, depth=depth) def _flatten( self, value: Any, prefix: str = "", depth: int = 0, visited: set | None = None ) -> tuple[dict[str, Any], str, int]: """Recursively flatten a value. Args: value: Value to flatten. prefix: Current key prefix. depth: Current recursion depth. visited: Set of visited object IDs to prevent circular references. Returns: Tuple of (flattened_dict, original_type, max_depth). """ # Initialize visited set on first call if visited is None: visited = set() max_depth = depth separator = self.get_config("separator", ".") if depth >= self.get_config("max_depth", 100): # Prevent infinite recursion return {prefix or "value": str(value)}, type(value).__name__, depth # Prevent circular references for mutable types if isinstance(value, (dict, list)): obj_id = id(value) if obj_id in visited: return {prefix or "value": "[Circular Reference]"}, type(value).__name__, depth visited.add(obj_id) if isinstance(value, list): try: result = {} preserve_lists = self.get_config("preserve_lists", True) if preserve_lists: for i, item in enumerate(value): key = f"{prefix}{separator}{i}" if prefix else str(i) flattened, _, item_depth = self._flatten(item, key, depth + 1, visited) result.update(flattened) max_depth = max(max_depth, item_depth) else: # Flatten list items without indices for item in value: flattened, _, item_depth = self._flatten(item, prefix, depth + 1, visited) result.update(flattened) max_depth = max(max_depth, item_depth) return result, "list", max_depth finally: # Remove from visited set when done with this branch visited.discard(id(value)) elif isinstance(value, dict): try: result = {} for key, val in value.items(): new_prefix = f"{prefix}{separator}{key}" if prefix else key flattened, _, item_depth = self._flatten(val, new_prefix, depth + 1, visited) result.update(flattened) max_depth = max(max_depth, item_depth) return result, "dict", max_depth finally: # Remove from visited set when done with this branch visited.discard(id(value)) elif isinstance(value, Serializable): # Serialize Serializable objects first serialized = value.serialize() return self._flatten(serialized, prefix, depth + 1, visited) else: # Primitive type key = prefix if prefix else "value" return {key: value}, type(value).__name__, depth