Source code for routilux.flow.task

"""
Task-related classes for Flow execution.

Contains TaskPriority enum and SlotActivationTask dataclass.
"""

from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
from typing import Dict, Optional, Any, TYPE_CHECKING

if TYPE_CHECKING:
    from routilux.slot import Slot
    from routilux.connection import Connection


[docs] class TaskPriority(Enum): """Task priority for queue scheduling.""" HIGH = 1 NORMAL = 2 LOW = 3
[docs] @dataclass class SlotActivationTask: """Slot activation task for queue-based execution. Each task is associated with a JobState to track execution state. This allows tasks executed in worker threads to access and update the correct JobState, even when running concurrently. """ slot: "Slot" data: Dict[str, Any] connection: Optional["Connection"] = None priority: TaskPriority = TaskPriority.NORMAL retry_count: int = 0 max_retries: int = 0 created_at: Optional[datetime] = None job_state: Optional[Any] = None # JobState for this execution def __post_init__(self): if self.created_at is None: self.created_at = datetime.now()
[docs] def __lt__(self, other): """For priority queue sorting.""" return self.priority.value < other.priority.value