openpilot/selfdrive/ui/lib/prime_state.py
Vehicle Researcher c5d5c5d1f3 openpilot v0.10.1 release
date: 2025-10-24T00:30:59
master commit: 405631baf9685e171a0dd19547cb763f1b163d18
2025-10-24 00:31:03 -07:00

105 lines
3.0 KiB
Python

from enum import IntEnum
import os
import threading
import time
from openpilot.common.api import api_get
from openpilot.common.params import Params
from openpilot.common.swaglog import cloudlog
from openpilot.system.athena.registration import UNREGISTERED_DONGLE_ID
from openpilot.selfdrive.ui.lib.api_helpers import get_token
class PrimeType(IntEnum):
UNKNOWN = -2
UNPAIRED = -1
NONE = 0
MAGENTA = 1
LITE = 2
BLUE = 3
MAGENTA_NEW = 4
PURPLE = 5
class PrimeState:
FETCH_INTERVAL = 5.0 # seconds between API calls
API_TIMEOUT = 10.0 # seconds for API requests
SLEEP_INTERVAL = 0.5 # seconds to sleep between checks in the worker thread
def __init__(self):
self._params = Params()
self._lock = threading.Lock()
self.prime_type: PrimeType = self._load_initial_state()
self._running = False
self._thread = None
self.start()
def _load_initial_state(self) -> PrimeType:
prime_type_str = os.getenv("PRIME_TYPE") or self._params.get("PrimeType")
try:
if prime_type_str is not None:
return PrimeType(int(prime_type_str))
except (ValueError, TypeError):
pass
return PrimeType.UNKNOWN
def _fetch_prime_status(self) -> None:
dongle_id = self._params.get("DongleId")
if not dongle_id or dongle_id == UNREGISTERED_DONGLE_ID:
return
try:
identity_token = get_token(dongle_id)
response = api_get(f"v1.1/devices/{dongle_id}", timeout=self.API_TIMEOUT, access_token=identity_token)
if response.status_code == 200:
data = response.json()
is_paired = data.get("is_paired", False)
prime_type = data.get("prime_type", 0)
self.set_type(PrimeType(prime_type) if is_paired else PrimeType.UNPAIRED)
except Exception as e:
cloudlog.error(f"Failed to fetch prime status: {e}")
def set_type(self, prime_type: PrimeType) -> None:
with self._lock:
if prime_type != self.prime_type:
self.prime_type = prime_type
self._params.put("PrimeType", int(prime_type))
cloudlog.info(f"Prime type updated to {prime_type}")
def _worker_thread(self) -> None:
while self._running:
self._fetch_prime_status()
for _ in range(int(self.FETCH_INTERVAL / self.SLEEP_INTERVAL)):
if not self._running:
break
time.sleep(self.SLEEP_INTERVAL)
def start(self) -> None:
if self._thread and self._thread.is_alive():
return
self._running = True
self._thread = threading.Thread(target=self._worker_thread, daemon=True)
self._thread.start()
def stop(self) -> None:
self._running = False
if self._thread and self._thread.is_alive():
self._thread.join(timeout=1.0)
def get_type(self) -> PrimeType:
with self._lock:
return self.prime_type
def is_prime(self) -> bool:
with self._lock:
return bool(self.prime_type > PrimeType.NONE)
def is_paired(self) -> bool:
with self._lock:
return self.prime_type > PrimeType.UNPAIRED
def __del__(self):
self.stop()