import bisect from enum import IntEnum from abc import abstractmethod from collections.abc import Callable from cereal import log, car import cereal.messaging as messaging from openpilot.common.realtime import DT_CTRL AlertSize = log.SelfdriveState.AlertSize AlertStatus = log.SelfdriveState.AlertStatus VisualAlert = car.CarControl.HUDControl.VisualAlert AudibleAlert = car.CarControl.HUDControl.AudibleAlert # Alert priorities class Priority(IntEnum): LOWEST = 0 LOWER = 1 LOW = 2 MID = 3 HIGH = 4 HIGHEST = 5 # Event types class ET: ENABLE = 'enable' PRE_ENABLE = 'preEnable' OVERRIDE_LATERAL = 'overrideLateral' OVERRIDE_LONGITUDINAL = 'overrideLongitudinal' NO_ENTRY = 'noEntry' WARNING = 'warning' USER_DISABLE = 'userDisable' SOFT_DISABLE = 'softDisable' IMMEDIATE_DISABLE = 'immediateDisable' PERMANENT = 'permanent' class Alert: def __init__(self, alert_text_1: str, alert_text_2: str, alert_status: log.SelfdriveState.AlertStatus, alert_size: log.SelfdriveState.AlertSize, priority: Priority, visual_alert: car.CarControl.HUDControl.VisualAlert, audible_alert: car.CarControl.HUDControl.AudibleAlert, duration: float, creation_delay: float = 0.): self.alert_text_1 = alert_text_1 self.alert_text_2 = alert_text_2 self.alert_status = alert_status self.alert_size = alert_size self.priority = priority self.visual_alert = visual_alert self.audible_alert = audible_alert self.duration = int(duration / DT_CTRL) self.creation_delay = creation_delay self.alert_type = "" self.event_type: str | None = None def __str__(self) -> str: return f"{self.alert_text_1}/{self.alert_text_2} {self.priority} {self.visual_alert} {self.audible_alert}" def __gt__(self, alert2) -> bool: if not isinstance(alert2, Alert): return False return self.priority > alert2.priority class AlertBase(Alert): def __init__(self, alert_text_1: str, alert_text_2: str, alert_status: log.SelfdriveState.AlertStatus, alert_size: log.SelfdriveState.AlertSize, priority: Priority, visual_alert: car.CarControl.HUDControl.VisualAlert, audible_alert: car.CarControl.HUDControl.AudibleAlert, duration: float): super().__init__(alert_text_1, alert_text_2, alert_status, alert_size, priority, visual_alert, audible_alert, duration) AlertCallbackType = Callable[[car.CarParams, car.CarState, messaging.SubMaster, bool, int, log.ControlsState], Alert] # ********** alert callback functions ********** def wrong_car_mode_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int, personality) -> Alert: text = "启用自适应巡航" if CP.brand == "honda": text = "启用主开关" return NoEntryAlert(text) class EventsBase: def __init__(self): self.events: list[int] = [] self.static_events: list[int] = [] self.event_counters = {} @property def names(self) -> list[int]: return self.events def __len__(self) -> int: return len(self.events) def add(self, event_name: int, static: bool = False) -> None: if static: bisect.insort(self.static_events, event_name) bisect.insort(self.events, event_name) def clear(self) -> None: self.event_counters = {k: (v + 1 if k in self.events else 0) for k, v in self.event_counters.items()} self.events = self.static_events.copy() def contains(self, event_type: str) -> bool: return any(event_type in self.get_events_mapping().get(e, {}) for e in self.events) def create_alerts(self, event_types: list[str], callback_args=None): if callback_args is None: callback_args = [] ret = [] for e in self.events: types = self.get_events_mapping()[e].keys() for et in event_types: if et in types: alert = self.get_events_mapping()[e][et] if not isinstance(alert, Alert): alert = alert(*callback_args) if DT_CTRL * (self.event_counters[e] + 1) >= alert.creation_delay: alert.alert_type = f"{self.get_event_name(e)}/{et}" alert.event_type = et ret.append(alert) return ret def add_from_msg(self, events): for e in events: bisect.insort(self.events, e.name.raw) def to_msg(self): ret = [] for event_name in self.events: event = self.get_event_msg_type().new_message() event.name = event_name for event_type in self.get_events_mapping().get(event_name, {}): setattr(event, event_type, True) ret.append(event) return ret def has(self, event_name: int) -> bool: return event_name in self.events def contains_in_list(self, events_list: list[int]) -> bool: return any(event_name in self.events for event_name in events_list) def remove(self, event_name: int, static: bool = False) -> None: if static and event_name in self.static_events: self.static_events.remove(event_name) if event_name in self.events: self.event_counters[event_name] = self.event_counters[event_name] + 1 self.events.remove(event_name) @abstractmethod def get_events_mapping(self) -> dict[int, dict[str, Alert | AlertCallbackType]]: raise NotImplementedError @abstractmethod def get_event_name(self, event: int) -> str: raise NotImplementedError @abstractmethod def get_event_msg_type(self): raise NotImplementedError EmptyAlert = Alert("" , "", AlertStatus.normal, AlertSize.none, Priority.LOWEST, VisualAlert.none, AudibleAlert.none, 0) class NoEntryAlert(Alert): def __init__(self, alert_text_2: str, alert_text_1: str = "openpilot不可用", visual_alert: car.CarControl.HUDControl.VisualAlert=VisualAlert.none): super().__init__(alert_text_1, alert_text_2, AlertStatus.normal, AlertSize.mid, Priority.LOW, visual_alert, AudibleAlert.refuse, 3.) class SoftDisableAlert(Alert): def __init__(self, alert_text_2: str): super().__init__("立即控制", alert_text_2, AlertStatus.userPrompt, AlertSize.full, Priority.MID, VisualAlert.steerRequired, AudibleAlert.warningSoft, 2.), # less harsh version of SoftDisable, where the condition is user-triggered class UserSoftDisableAlert(SoftDisableAlert): def __init__(self, alert_text_2: str): super().__init__(alert_text_2), self.alert_text_1 = "openpilot将停止工作" class ImmediateDisableAlert(Alert): def __init__(self, alert_text_2: str): super().__init__("立即掌控", alert_text_2, AlertStatus.critical, AlertSize.full, Priority.HIGHEST, VisualAlert.steerRequired, AudibleAlert.warningImmediate, 4.), class EngagementAlert(Alert): def __init__(self, audible_alert: car.CarControl.HUDControl.AudibleAlert): super().__init__("", "", AlertStatus.normal, AlertSize.none, Priority.MID, VisualAlert.none, audible_alert, .2), class NormalPermanentAlert(Alert): def __init__(self, alert_text_1: str, alert_text_2: str = "", duration: float = 0.2, priority: Priority = Priority.LOWER, creation_delay: float = 0.): super().__init__(alert_text_1, alert_text_2, AlertStatus.normal, AlertSize.mid if len(alert_text_2) else AlertSize.small, priority, VisualAlert.none, AudibleAlert.none, duration, creation_delay=creation_delay), class StartupAlert(Alert): def __init__(self, alert_text_1: str, alert_text_2: str = "驾驶时请双手握住方向盘并眼睛目视前方", alert_status=AlertStatus.normal): super().__init__(alert_text_1, alert_text_2, alert_status, AlertSize.mid, Priority.LOWER, VisualAlert.none, AudibleAlert.none, 5.),