ExecutionTracker API

Execution tracker.

Tracks flow execution state and performance metrics.

class routilux.execution_tracker.ExecutionTracker(flow_id='')[source]

Bases: Serializable

Execution tracker for monitoring flow execution state and performance.

ExecutionTracker provides detailed monitoring and performance analysis capabilities for flow execution. It tracks routine executions, event flow, and performance metrics to help understand flow behavior and identify optimization opportunities.

Key Features:
  • Routine Execution Tracking: Start/end times, parameters, results

  • Event Flow Tracking: Record all event emissions and data flow

  • Performance Metrics: Execution times, throughput, bottlenecks

  • Execution Analysis: Query execution patterns and statistics

Data Structure:
  • routine_executions: Dict mapping routine_id to list of execution records

  • event_flow: List of all event emissions with source/target information

  • performance_metrics: Dictionary of calculated performance metrics

Use Cases:
  • Performance Analysis: Identify slow routines, bottlenecks

  • Debugging: Trace execution flow and data transmission

  • Monitoring: Track routine execution patterns

  • Optimization: Find opportunities to improve flow performance

Examples

Basic usage:
>>> tracker = ExecutionTracker(flow_id="my_flow")
>>> flow.execution_tracker = tracker
>>> # Tracker automatically records during execution
>>> # After execution, analyze results:
>>> metrics = tracker.get_performance_metrics()
>>> print(f"Total executions: {metrics.get('total_executions')}")
Analyze routine performance:
>>> executions = tracker.routine_executions.get("my_routine", [])
>>> for exec in executions:
...     print(f"Duration: {exec.get('execution_time')}s")
__init__(flow_id='')[source]

Initialize ExecutionTracker.

Parameters:

flow_id (str) – Flow identifier.

record_routine_start(routine_id, params=None)[source]

Record the start of a routine execution.

This method is called when a routine begins execution. It creates a new execution record with start time and parameters. The record is completed when record_routine_end() is called.

Parameters:
  • routine_id (str) – Unique identifier of the routine starting execution. Must match the ID used when adding the routine to the flow.

  • params (dict[str, Any]) – Optional dictionary of parameters passed to the routine. These are the keyword arguments passed to routine.__call__(**params). Example: {“input”: “data”, “count”: 42}

Side Effects:
  • Creates a new execution record in routine_executions[routine_id]

  • Sets start_time to current timestamp

  • Sets status to “running”

  • Stores params in the execution record

Examples

Record routine start:
>>> tracker.record_routine_start("processor", {"input": "data"})
>>> # Later, call record_routine_end() to complete the record
record_routine_end(routine_id, status='completed', result=None, error=None)[source]

Record the end of a routine execution.

This method completes an execution record started by record_routine_start(). It updates the record with end time, status, result, and calculates execution duration.

Parameters:
  • routine_id (str) – Unique identifier of the routine completing execution. Must match the ID used in record_routine_start().

  • status (str) – Execution status. Common values: - “completed”: Routine executed successfully - “failed”: Routine execution failed - “skipped”: Routine was skipped - “error_continued”: Error occurred but execution continued

  • result (Any) – Optional execution result. Can be any serializable value. Stored in the execution record for later analysis.

  • error (str | None) – Optional error message if execution failed. Should be a string describing the error. Only used when status is “failed” or “error_continued”.

Side Effects:
  • Updates the most recent execution record for routine_id

  • Sets end_time to current timestamp

  • Updates status, result, and error fields

  • Calculates execution_time (end_time - start_time)

Examples

Record successful completion:
>>> tracker.record_routine_end("processor", "completed", result="success")
Record failure:
>>> tracker.record_routine_end(
...     "processor",
...     status="failed",
...     error="Connection timeout"
... )
record_event(source_routine_id, event_name, target_routine_id=None, data=None)[source]

Record an event emission in the event flow.

This method records when a routine emits an event, tracking the data flow from source to target routines. This helps understand the execution flow and data transmission patterns.

Parameters:
  • source_routine_id (str) – Unique identifier of the routine emitting the event. This is the routine that called emit().

  • event_name (str) – Name of the event that was emitted. Example: “output”, “result”, “error”

  • target_routine_id (str | None) – Optional identifier of the target routine receiving data. If the event is connected to multiple slots, this may be the first target or None if unknown. Used for tracking data flow direction.

  • data (dict[str, Any]) – Optional dictionary of data transmitted with the event. This contains the keyword arguments passed to emit(**kwargs). Example: {“result”: “success”, “count”: 42}

Side Effects:
  • Appends a new event record to event_flow list

  • Record includes timestamp, source, target, event name, and data

Examples

Record event emission:
>>> tracker.record_event(
...     "processor",
...     "output",
...     target_routine_id="validator",
...     data={"result": "processed", "count": 10}
... )
get_routine_performance(routine_id)[source]

Get performance metrics for a routine.

Parameters:

routine_id (str) – Routine identifier.

Returns:

Dictionary containing performance metrics, or None if routine not found.

Return type:

dict[str, Any] | None

get_flow_performance()[source]

Get performance metrics for the entire flow.

This method aggregates performance metrics across all routines in the flow to provide an overall view of flow performance.

Returns:

  • total_routines: int - Number of routines that executed

  • total_events: int - Total number of events emitted

  • total_execution_time: float - Sum of all routine execution times

  • avg_routine_time: float - Average execution time per routine

Return type:

Dictionary containing overall flow performance metrics

Examples

Get overall flow performance:
>>> metrics = tracker.get_flow_performance()
>>> print(f"Total routines: {metrics['total_routines']}")
>>> print(f"Total events: {metrics['total_events']}")
>>> print(f"Total time: {metrics['total_execution_time']:.2f}s")