Source code for routilux.builtin_routines.text_processing.result_extractor

"""
Result extractor routine.

Extracts and formats results from various output formats with extensible architecture.
"""

from __future__ import annotations

import json
import logging
import re
from typing import Any, Callable, Protocol

from routilux.routine import Routine

logger = logging.getLogger(__name__)


[docs] class ExtractorProtocol(Protocol): """Protocol for custom extractors."""
[docs] def extract(self, data: Any, config: dict[str, Any]) -> tuple[Any, str, dict[str, Any]] | None: """Extract data from input. Args: data: Input data to extract from. config: Configuration dictionary. Returns: Tuple of (extracted_result, format_type, metadata) if successful, None otherwise. """ ...
[docs] class ResultExtractor(Routine): """Routine for extracting results from structured output. This routine provides a flexible, extensible system for extracting structured data from various formats. It supports multiple extraction strategies and allows custom extractors to be registered. Features: - Multiple built-in extractors (JSON, YAML, XML, CSV, code blocks) - Extensible architecture with custom extractor support - Chain-based extraction with fallback mechanisms - Intelligent type detection and conversion - Comprehensive error handling and reporting - Rich metadata about extraction process Extraction Strategies: - "auto": Try all extractors in order until one succeeds - "priority": Try extractors in priority order - "all": Extract using all applicable extractors - "first_match": Return first successful extraction Examples: >>> extractor = ResultExtractor() >>> extractor.set_config(strategy="auto") >>> extractor.input_slot.receive({"data": '```json\\n{"key": "value"}\\n```'}) >>> # Register custom extractor >>> def my_extractor(data, config): ... if isinstance(data, str) and data.startswith("CUSTOM:"): ... return data[7:], "custom", {"method": "prefix"} ... return None >>> extractor.register_extractor("custom", my_extractor) """
[docs] def __init__(self): """Initialize ResultExtractor routine.""" super().__init__() # Set default configuration self.set_config( # Extraction strategy strategy="auto", # "auto", "priority", "all", "first_match" extractor_priority=[], # Custom priority order for extractors # Built-in extractor settings extract_json_blocks=True, extract_code_blocks=True, extract_yaml_blocks=False, # Requires pyyaml extract_xml_blocks=False, code_block_languages=["json", "python", "output", "yaml", "xml"], # Format-specific settings format_interpreter_output=True, parse_json_strings=True, # Try to parse JSON from plain strings parse_yaml_strings=False, # Try to parse YAML from plain strings detect_nested_structures=True, # Error handling continue_on_error=True, # Continue trying other extractors on error return_original_on_failure=True, # Return original data if all extractors fail log_errors=True, ) # Register built-in extractors self._register_builtin_extractors() # Define input slot self.input_slot = self.define_slot("input", handler=self._handle_input) # Define output event self.output_event = self.define_event( "output", ["extracted_result", "format", "metadata", "confidence", "extraction_path"] )
def _register_builtin_extractors(self): """Register built-in extractor functions.""" if not hasattr(self, "_extractors"): self._extractors: dict[str, Callable] = {} # Specialized extractors (higher priority) # Interpreter output extractor (should check before generic list) self._extractors["interpreter_output"] = self._format_interpreter_output # JSON extractors self._extractors["json_code_block"] = self._extract_json_code_block self._extractors["json_string"] = self._extract_json_string # Code block extractors self._extractors["code_block"] = self._extract_code_block # YAML extractors (if available) # Note: yaml is imported conditionally in the methods themselves try: import yaml # noqa: F401 # Only register if yaml is available self._extractors["yaml_code_block"] = self._extract_yaml_code_block self._extractors["yaml_string"] = self._extract_yaml_string except ImportError: pass # XML extractors self._extractors["xml_code_block"] = self._extract_xml_code_block # Type-based extractors (lower priority, fallback) self._extractors["dict_extractor"] = self._extract_dict self._extractors["list_extractor"] = self._extract_list def _handle_input(self, data: str | list | dict = None, **kwargs): """Handle input data and extract results. Args: data: Data to extract results from. Can be: - String with code blocks or structured data - List of output dictionaries - Dictionary with structured data **kwargs: Additional data from slot. If 'data' is not provided, will use kwargs or the first value. """ # Extract data using Routine helper method data = self._extract_input_data(data, **kwargs) if data == {}: data = "" # Default to empty string for result extractor # Track statistics # Operation tracking removed - use JobState for execution state # Perform extraction result = self._extract_with_strategy(data) # Emit result self.emit("output", **result) def _extract_with_strategy(self, data: Any) -> dict[str, Any]: """Extract data using configured strategy. Args: data: Input data to extract from. Returns: Dictionary with extraction results and metadata. """ strategy = self.get_config("strategy", "auto") continue_on_error = self.get_config("continue_on_error", True) return_original = self.get_config("return_original_on_failure", True) # Get extractor order extractors = self._get_extractor_order() results = [] errors = [] for extractor_name, extractor_func in extractors: try: result = extractor_func(data, self._config) if result is not None: extracted_data, format_type, metadata = result # Add extractor info to metadata metadata["extractor"] = extractor_name metadata["strategy"] = strategy # Calculate confidence confidence = self._calculate_confidence(extracted_data, format_type, metadata) # Build extraction path extraction_path = [extractor_name] result_dict = { "extracted_result": extracted_data, "format": format_type, "metadata": metadata, "confidence": confidence, "extraction_path": extraction_path, } if strategy in ("first_match", "auto", "priority"): # For these strategies, return first successful match # (auto and priority behave like first_match) return result_dict results.append(result_dict) except Exception as e: error_msg = f"Extractor '{extractor_name}' failed: {str(e)}" errors.append({"extractor": extractor_name, "error": error_msg}) if self.get_config("log_errors", True): logger.warning(error_msg, exc_info=True) if not continue_on_error: break # Handle results based on strategy if results: if strategy == "all": # Return all successful extractions return { "extracted_result": [r["extracted_result"] for r in results], "format": "multi", "metadata": {"extractions": results, "count": len(results), "errors": errors}, "confidence": max(r["confidence"] for r in results), "extraction_path": [r["extraction_path"][0] for r in results], } else: # Return best result (highest confidence) return max(results, key=lambda x: x["confidence"]) # All extractors failed if return_original: return { "extracted_result": data, "format": type(data).__name__.lower(), "metadata": { "extraction_method": "none", "errors": errors, "original_type": type(data).__name__, }, "confidence": 0.0, "extraction_path": [], } else: raise ValueError(f"All extractors failed. Errors: {errors}") def _get_extractor_order(self) -> list[tuple[str, Callable]]: """Get extractors in the correct order based on configuration. Returns: List of (extractor_name, extractor_function) tuples. """ priority = self.get_config("extractor_priority", []) all_extractors = list(self._extractors.items()) if priority: # Sort by priority priority_map = {name: i for i, name in enumerate(priority)} all_extractors.sort(key=lambda x: priority_map.get(x[0], 999)) else: # Default order: specialized extractors first, then generic ones # This ensures interpreter_output is tried before list_extractor specialized = [ "interpreter_output", "json_code_block", "json_string", "yaml_code_block", "yaml_string", "xml_code_block", "code_block", ] generic = ["dict_extractor", "list_extractor"] def sort_key(item): name = item[0] if name in specialized: return specialized.index(name) if name in specialized else 100 elif name in generic: return 50 + generic.index(name) if name in generic else 200 else: return 150 # Custom extractors in the middle all_extractors.sort(key=sort_key) return all_extractors def _calculate_confidence(self, data: Any, format_type: str, metadata: dict[str, Any]) -> float: """Calculate confidence score for extraction result. Args: data: Extracted data. format_type: Detected format type. metadata: Extraction metadata. Returns: Confidence score between 0.0 and 1.0. """ confidence = 0.5 # Base confidence # Increase confidence for structured data if isinstance(data, (dict, list)): confidence += 0.2 # Increase confidence for code block extraction if "code_block" in metadata.get("extraction_method", ""): confidence += 0.1 # Increase confidence for JSON/YAML (well-formed) if format_type in ("json", "yaml"): confidence += 0.1 # Decrease confidence for empty results if not data or (isinstance(data, str) and not data.strip()): confidence -= 0.3 return max(0.0, min(1.0, confidence)) # Built-in extractors def _extract_json_code_block( self, data: Any, config: dict[str, Any] ) -> tuple[Any, str, dict[str, Any]] | None: """Extract JSON from markdown code blocks.""" if not isinstance(data, str) or not config.get("extract_json_blocks", True): return None blocks = self._extract_code_blocks(data, "json") if not blocks: return None for block in reversed(blocks): # Try last block first try: result = json.loads(block.strip()) return ( result, "json", { "extraction_method": "json_code_block", "block_count": len(blocks), "block_index": blocks.index(block), }, ) except json.JSONDecodeError: continue return None def _extract_json_string( self, data: Any, config: dict[str, Any] ) -> tuple[Any, str, dict[str, Any]] | None: """Extract JSON from plain string.""" if not isinstance(data, str) or not config.get("parse_json_strings", True): return None # Try to parse as JSON stripped = data.strip() if (stripped.startswith("{") or stripped.startswith("[")) and len(stripped) > 2: try: result = json.loads(stripped) return result, "json", {"extraction_method": "json_string"} except json.JSONDecodeError: pass return None def _extract_code_block( self, data: Any, config: dict[str, Any] ) -> tuple[Any, str, dict[str, Any]] | None: """Extract code blocks of various languages.""" if not isinstance(data, str) or not config.get("extract_code_blocks", True): return None languages = config.get("code_block_languages", ["json", "python", "output"]) for lang in languages: if lang == "json": # Skip JSON, handled separately continue blocks = self._extract_code_blocks(data, lang) if blocks: return ( blocks[-1], lang, {"extraction_method": f"{lang}_code_block", "block_count": len(blocks)}, ) return None def _extract_yaml_code_block( self, data: Any, config: dict[str, Any] ) -> tuple[Any, str, dict[str, Any]] | None: """Extract YAML from markdown code blocks.""" if not isinstance(data, str) or not config.get("extract_yaml_blocks", False): return None try: import yaml except ImportError: return None blocks = self._extract_code_blocks(data, "yaml") if not blocks: blocks = self._extract_code_blocks(data, "yml") if not blocks: return None for block in reversed(blocks): try: result = yaml.safe_load(block.strip()) return ( result, "yaml", {"extraction_method": "yaml_code_block", "block_count": len(blocks)}, ) except Exception: continue return None def _extract_yaml_string( self, data: Any, config: dict[str, Any] ) -> tuple[Any, str, dict[str, Any]] | None: """Extract YAML from plain string.""" if not isinstance(data, str) or not config.get("parse_yaml_strings", False): return None try: import yaml except ImportError: return None try: result = yaml.safe_load(data.strip()) if result is not None: return result, "yaml", {"extraction_method": "yaml_string"} except Exception: pass return None def _extract_xml_code_block( self, data: Any, config: dict[str, Any] ) -> tuple[Any, str, dict[str, Any]] | None: """Extract XML from markdown code blocks.""" if not isinstance(data, str) or not config.get("extract_xml_blocks", False): return None blocks = self._extract_code_blocks(data, "xml") if not blocks: return None # Return XML as string (parsing can be done by downstream routines) return ( blocks[-1], "xml", {"extraction_method": "xml_code_block", "block_count": len(blocks)}, ) def _format_interpreter_output( self, data: Any, config: dict[str, Any] ) -> tuple[Any, str, dict[str, Any]] | None: """Format code interpreter output list. This extractor specifically handles lists of output dictionaries from code interpreters, which have a specific structure. """ if not isinstance(data, list) or not config.get("format_interpreter_output", True): return None # Check if this looks like interpreter output (list of dicts with format/content) has_interpreter_structure = False lines = [] output_count = 0 for output in data: if isinstance(output, dict): # Check for interpreter output structure if "format" in output or "content" in output: has_interpreter_structure = True if output.get("format") == "output" and output.get("content"): content = output["content"] if content and len(str(content).strip()) > 0: lines.append(str(content)) output_count += 1 elif isinstance(output, str): # Plain strings in list - might be interpreter output if len(output.strip()) > 0: lines.append(output) output_count += 1 # Only treat as interpreter output if we found the expected structure if has_interpreter_structure and lines: formatted_text = "\n".join(lines) return ( formatted_text, "interpreter_output", { "extraction_method": "interpreter_output", "output_count": output_count, "line_count": len(lines), }, ) return None def _extract_dict( self, data: Any, config: dict[str, Any] ) -> tuple[Any, str, dict[str, Any]] | None: """Extract from dictionary.""" if isinstance(data, dict): return data, "dict", {"extraction_method": "direct", "key_count": len(data)} return None def _extract_list( self, data: Any, config: dict[str, Any] ) -> tuple[Any, str, dict[str, Any]] | None: """Extract from list.""" if isinstance(data, list): return data, "list", {"extraction_method": "direct", "item_count": len(data)} return None def _extract_code_blocks(self, text: str, language: str) -> list[str]: """Extract code blocks of specified language from markdown text. Args: text: Text containing code blocks. language: Language identifier (e.g., "json", "python"). Returns: List of extracted code block contents. """ # Support both ```language and ```{language} formats patterns = [ rf"```{re.escape(language)}\s*\n(.*?)```", rf"```\{{{re.escape(language)}\}}\s*\n(.*?)```", ] all_matches = [] for pattern in patterns: matches = re.findall(pattern, text, re.DOTALL | re.IGNORECASE) all_matches.extend(matches) return all_matches
[docs] def register_extractor( self, name: str, extractor: Callable[[Any, dict[str, Any]], tuple[Any, str, dict[str, Any]] | None], ) -> None: """Register a custom extractor function. Args: name: Extractor name (must be unique). extractor: Extractor function that takes (data, config) and returns (extracted_data, format_type, metadata) or None. Examples: >>> def my_extractor(data, config): ... if isinstance(data, str) and data.startswith("CUSTOM:"): ... return data[7:], "custom", {"method": "prefix"} ... return None >>> extractor.register_extractor("custom_prefix", my_extractor) """ if not hasattr(self, "_extractors"): self._extractors = {} self._extractors[name] = extractor logger.info(f"Registered custom extractor: {name}")