ExecutionTracker API¶
Execution tracker.
Tracks flow execution state and performance metrics.
- class routilux.execution_tracker.ExecutionTracker(flow_id='')[source]¶
Bases:
SerializableExecution tracker for monitoring flow execution state and performance.
ExecutionTracker provides detailed monitoring and performance analysis capabilities for flow execution. It tracks routine executions, event flow, and performance metrics to help understand flow behavior and identify optimization opportunities.
- Key Features:
Routine Execution Tracking: Start/end times, parameters, results
Event Flow Tracking: Record all event emissions and data flow
Performance Metrics: Execution times, throughput, bottlenecks
Execution Analysis: Query execution patterns and statistics
- Data Structure:
routine_executions: Dict mapping routine_id to list of execution records
event_flow: List of all event emissions with source/target information
performance_metrics: Dictionary of calculated performance metrics
- Use Cases:
Performance Analysis: Identify slow routines, bottlenecks
Debugging: Trace execution flow and data transmission
Monitoring: Track routine execution patterns
Optimization: Find opportunities to improve flow performance
Examples
- Basic usage:
>>> tracker = ExecutionTracker(flow_id="my_flow") >>> flow.execution_tracker = tracker >>> # Tracker automatically records during execution >>> # After execution, analyze results: >>> metrics = tracker.get_performance_metrics() >>> print(f"Total executions: {metrics.get('total_executions')}")
- Analyze routine performance:
>>> executions = tracker.routine_executions.get("my_routine", []) >>> for exec in executions: ... print(f"Duration: {exec.get('execution_time')}s")
- __init__(flow_id='')[source]¶
Initialize ExecutionTracker.
- Parameters:
flow_id (str) – Flow identifier.
- record_routine_start(routine_id, params=None)[source]¶
Record the start of a routine execution.
This method is called when a routine begins execution. It creates a new execution record with start time and parameters. The record is completed when record_routine_end() is called.
- Parameters:
routine_id (str) – Unique identifier of the routine starting execution. Must match the ID used when adding the routine to the flow.
params (dict[str, Any]) – Optional dictionary of parameters passed to the routine. These are the keyword arguments passed to
routine.__call__(**params). Example: {“input”: “data”, “count”: 42}
- Side Effects:
Creates a new execution record in routine_executions[routine_id]
Sets start_time to current timestamp
Sets status to “running”
Stores params in the execution record
Examples
- Record routine start:
>>> tracker.record_routine_start("processor", {"input": "data"}) >>> # Later, call record_routine_end() to complete the record
- record_routine_end(routine_id, status='completed', result=None, error=None)[source]¶
Record the end of a routine execution.
This method completes an execution record started by record_routine_start(). It updates the record with end time, status, result, and calculates execution duration.
- Parameters:
routine_id (str) – Unique identifier of the routine completing execution. Must match the ID used in record_routine_start().
status (str) – Execution status. Common values: - “completed”: Routine executed successfully - “failed”: Routine execution failed - “skipped”: Routine was skipped - “error_continued”: Error occurred but execution continued
result (Any) – Optional execution result. Can be any serializable value. Stored in the execution record for later analysis.
error (str | None) – Optional error message if execution failed. Should be a string describing the error. Only used when status is “failed” or “error_continued”.
- Side Effects:
Updates the most recent execution record for routine_id
Sets end_time to current timestamp
Updates status, result, and error fields
Calculates execution_time (end_time - start_time)
Examples
- Record successful completion:
>>> tracker.record_routine_end("processor", "completed", result="success")
- Record failure:
>>> tracker.record_routine_end( ... "processor", ... status="failed", ... error="Connection timeout" ... )
- record_event(source_routine_id, event_name, target_routine_id=None, data=None)[source]¶
Record an event emission in the event flow.
This method records when a routine emits an event, tracking the data flow from source to target routines. This helps understand the execution flow and data transmission patterns.
- Parameters:
source_routine_id (str) – Unique identifier of the routine emitting the event. This is the routine that called emit().
event_name (str) – Name of the event that was emitted. Example: “output”, “result”, “error”
target_routine_id (str | None) – Optional identifier of the target routine receiving data. If the event is connected to multiple slots, this may be the first target or None if unknown. Used for tracking data flow direction.
data (dict[str, Any]) – Optional dictionary of data transmitted with the event. This contains the keyword arguments passed to
emit(**kwargs). Example: {“result”: “success”, “count”: 42}
- Side Effects:
Appends a new event record to event_flow list
Record includes timestamp, source, target, event name, and data
Examples
- Record event emission:
>>> tracker.record_event( ... "processor", ... "output", ... target_routine_id="validator", ... data={"result": "processed", "count": 10} ... )
- get_flow_performance()[source]¶
Get performance metrics for the entire flow.
This method aggregates performance metrics across all routines in the flow to provide an overall view of flow performance.
- Returns:
total_routines: int - Number of routines that executed
total_events: int - Total number of events emitted
total_execution_time: float - Sum of all routine execution times
avg_routine_time: float - Average execution time per routine
- Return type:
Dictionary containing overall flow performance metrics
Examples
- Get overall flow performance:
>>> metrics = tracker.get_flow_performance() >>> print(f"Total routines: {metrics['total_routines']}") >>> print(f"Total events: {metrics['total_events']}") >>> print(f"Total time: {metrics['total_execution_time']:.2f}s")