openpilot/selfdrive/controls/controlsd.py
mouxangithub 5e253a54ba feat(toyota): 添加 Toyota Wildlander PHEV 车型支持
新增 Toyota Wildlander PHEV 车型的指纹、参数配置和相关控制逻辑,包括:
- 在 substitute.toml 中将其映射为 TOYOTA_RAV4_TSS2 以复用现有参数
- 增加车辆指纹识别信息
- 配置车辆规格参数(质量、轴距、转向比等)
- 更新 carstate 和 interface 逻辑以适配该车型
- 添加对应的神经网络前馈模型文件

此外,临时注释了 controlsd.py 中的无 GPS 事件触发逻辑,待后续修复。
2025-11-12 09:18:23 +08:00

1009 lines
45 KiB
Python

#!/usr/bin/env python3
import os
import math
import time
import threading
from typing import SupportsFloat
import cereal.messaging as messaging
from cereal import car, custom, log
from msgq.visionipc import VisionIpcClient, VisionStreamType
from openpilot.common.conversions import Conversions as CV
from openpilot.common.git import get_short_branch
from openpilot.common.numpy_fast import clip
from openpilot.common.params import Params
from openpilot.common.realtime import config_realtime_process, Priority, Ratekeeper, DT_CTRL
from openpilot.common.swaglog import cloudlog
from openpilot.selfdrive.car.car_helpers import get_car_interface, get_startup_event
from openpilot.selfdrive.car.gm.values import CC_ONLY_CAR, GMFlags
from openpilot.selfdrive.controls.lib.alertmanager import AlertManager, set_offroad_alert
from openpilot.selfdrive.controls.lib.drive_helpers import VCruiseHelper, clip_curvature
from openpilot.selfdrive.controls.lib.events import Events, ET
from openpilot.selfdrive.controls.lib.latcontrol import LatControl, MIN_LATERAL_CONTROL_SPEED
from openpilot.selfdrive.controls.lib.latcontrol_pid import LatControlPID
from openpilot.selfdrive.controls.lib.latcontrol_angle import LatControlAngle, STEER_ANGLE_SATURATION_THRESHOLD
from openpilot.selfdrive.controls.lib.latcontrol_torque import LatControlTorque
from openpilot.selfdrive.controls.lib.longcontrol import LongControl
from openpilot.selfdrive.controls.lib.vehicle_model import VehicleModel
from openpilot.frogpilot.tinygrad_modeld.tinygrad_modeld import LAT_SMOOTH_SECONDS
from openpilot.system.hardware import HARDWARE
from openpilot.frogpilot.common.frogpilot_variables import get_frogpilot_toggles, params_memory
from openpilot.frogpilot.controls.lib.neural_network_feedforward import LatControlNNFF
SOFT_DISABLE_TIME = 3 # seconds
LDW_MIN_SPEED = 31 * CV.MPH_TO_MS
LANE_DEPARTURE_THRESHOLD = 0.1
CAMERA_OFFSET = 0.04
REPLAY = "REPLAY" in os.environ
SIMULATION = "SIMULATION" in os.environ
TESTING_CLOSET = "TESTING_CLOSET" in os.environ
IGNORE_PROCESSES = {"loggerd", "encoderd", "statsd"}
ThermalStatus = log.DeviceState.ThermalStatus
State = log.ControlsState.OpenpilotState
PandaType = log.PandaState.PandaType
Desire = log.Desire
LaneChangeState = log.LaneChangeState
LaneChangeDirection = log.LaneChangeDirection
EventName = car.CarEvent.EventName
FrogPilotEventName = custom.FrogPilotCarEvent.EventName
ButtonType = car.CarState.ButtonEvent.Type
FrogPilotButtonType = custom.FrogPilotCarState.ButtonEvent.Type
SafetyModel = car.CarParams.SafetyModel
IGNORED_SAFETY_MODES = (SafetyModel.silent, SafetyModel.noOutput)
CSID_MAP = {"1": EventName.roadCameraError, "2": EventName.wideRoadCameraError, "0": EventName.driverCameraError}
ACTUATOR_FIELDS = tuple(car.CarControl.Actuators.schema.fields.keys())
ACTIVE_STATES = (State.enabled, State.softDisabling, State.overriding)
ENABLED_STATES = (State.preEnabled, *ACTIVE_STATES)
class Controls:
def __init__(self, CI=None):
self.params = Params()
if CI is None:
cloudlog.info("controlsd is waiting for CarParams")
with car.CarParams.from_bytes(self.params.get("CarParams", block=True)) as msg:
# TODO: this shouldn't need to be a builder
self.CP = msg.as_builder()
cloudlog.info("controlsd got CarParams")
with custom.FrogPilotCarParams.from_bytes(self.params.get("FrogPilotCarParams", block=True)) as fpmsg:
self.FPCP = fpmsg.as_builder()
# Uses car interface helper functions, altering state won't be considered by card for actuation
self.CI = get_car_interface(self.CP, self.FPCP)
else:
self.CI, self.CP = CI, CI.CP
# Ensure the current branch is cached, otherwise the first iteration of controlsd lags
self.branch = get_short_branch()
# Setup sockets
self.pm = messaging.PubMaster(['controlsState', 'carControl', 'onroadEvents', 'frogpilotControlsState', 'frogpilotOnroadEvents'])
self.sensor_packets = ["accelerometer", "gyroscope"]
self.camera_packets = ["roadCameraState", "driverCameraState", "wideRoadCameraState"]
self.log_sock = messaging.sub_sock('androidLog')
# TODO: de-couple controlsd with card/conflate on carState without introducing controls mismatches
self.car_state_sock = messaging.sub_sock('carState', timeout=20)
ignore = self.sensor_packets + ['testJoystick']
if SIMULATION:
ignore += ['driverCameraState', 'managerState']
if REPLAY:
# no vipc in replay will make them ignored anyways
ignore += ['roadCameraState', 'wideRoadCameraState']
self.sm = messaging.SubMaster(['deviceState', 'pandaStates', 'peripheralState', 'modelV2', 'liveCalibration',
'carOutput', 'driverMonitoringState', 'longitudinalPlan', 'liveLocationKalman',
'managerState', 'liveParameters', 'radarState', 'liveTorqueParameters', 'liveDelay',
'testJoystick', 'frogpilotCarState', 'frogpilotPlan'] + self.camera_packets + self.sensor_packets,
ignore_alive=ignore, ignore_avg_freq=ignore+['radarState', 'testJoystick'], ignore_valid=['testJoystick', ],
frequency=int(1/DT_CTRL))
self.joystick_mode = self.params.get_bool("JoystickDebugMode")
# read params
self.is_metric = self.params.get_bool("IsMetric")
self.is_ldw_enabled = self.params.get_bool("IsLdwEnabled")
# detect sound card presence and ensure successful init
sounds_available = HARDWARE.get_sound_card_online()
car_recognized = self.CP.carName != 'mock'
# cleanup old params
if not self.CP.experimentalLongitudinalAvailable:
self.params.remove("ExperimentalLongitudinalEnabled")
if not self.CP.openpilotLongitudinalControl:
self.params.remove("ExperimentalMode")
self.CS_prev = car.CarState.new_message()
self.AM = AlertManager()
self.events = Events()
self.LoC = LongControl(self.CP)
self.VM = VehicleModel(self.CP)
self.LaC: LatControl
if self.CP.steerControlType == car.CarParams.SteerControlType.angle:
self.LaC = LatControlAngle(self.CP, self.CI, DT_CTRL)
elif self.CP.lateralTuning.which() == 'pid':
self.LaC = LatControlPID(self.CP, self.CI, DT_CTRL)
elif self.CP.lateralTuning.which() == 'torque':
self.LaC = LatControlTorque(self.CP, self.CI, DT_CTRL)
self.initialized = False
self.state = State.disabled
self.enabled = False
self.active = False
self.soft_disable_timer = 0
self.mismatch_counter = 0
self.cruise_mismatch_counter = 0
self.last_blinker_frame = 0
self.last_steering_pressed_frame = 0
self.distance_traveled = 0
self.last_functional_fan_frame = 0
self.events_prev = []
self.current_alert_types = [ET.PERMANENT]
self.logged_comm_issue = None
self.not_running_prev = None
self.steer_limited_by_safety = False
self.curvature = 0.0
self.desired_curvature = 0.0
self.experimental_mode = False
self.personality = self.read_personality_param()
self.v_cruise_helper = VCruiseHelper(self.CP)
self.recalibrating_seen = False
self.can_log_mono_time = 0
if car_recognized and not self.CP.passive and self.CP.secOcRequired and not self.CP.secOcKeyAvailable:
self.startup_event = EventName.startupNoSecOcKey
else:
self.startup_event = get_startup_event(car_recognized, not self.CP.passive, len(self.CP.carFw) > 0)
if not sounds_available:
self.events.add(EventName.soundsUnavailable, static=True)
if not car_recognized:
self.events.add(EventName.carUnrecognized, static=True)
if len(self.CP.carFw) > 0:
set_offroad_alert("Offroad_CarUnrecognized", True)
else:
set_offroad_alert("Offroad_NoFirmware", True)
elif self.CP.passive:
self.events.add(EventName.dashcamMode, static=True)
# controlsd is driven by carState, expected at 100Hz
self.rk = Ratekeeper(100, print_delay_threshold=None)
# FrogPilot variables
self.belowSteerSpeed_shown = False
self.distance_pressed_previously = False
self.resumeRequired_shown = False
self.steerTempUnavailableSilent_shown = False
self.display_timer = 0
self.frogpilot_events_prev = []
self.event_names_to_clear = set()
self.has_menu = self.CP.carName == "gm" and not (self.CP.flags & GMFlags.NO_CAMERA.value or self.CP.carFingerprint in CC_ONLY_CAR)
self.frogpilot_AM = AlertManager()
self.frogpilot_events = Events(frogpilot=True)
self.frogpilot_toggles = get_frogpilot_toggles()
if self.CP.lateralTuning.which() == "torque" and (self.frogpilot_toggles.nnff or self.frogpilot_toggles.nnff_lite):
self.LaC = LatControlNNFF(self.CP, self.CI, DT_CTRL)
def set_initial_state(self):
if REPLAY:
controls_state = self.params.get("ReplayControlsState")
if controls_state is not None:
with log.ControlsState.from_bytes(controls_state) as controls_state:
self.v_cruise_helper.v_cruise_kph = controls_state.vCruise
if any(ps.controlsAllowed for ps in self.sm['pandaStates']):
self.state = State.enabled
def contains_event_type(self, *event_types):
return any(self.events.contains(event_type) or self.frogpilot_events.contains(event_type) for event_type in event_types)
def update_events(self, CS):
"""Compute onroadEvents from carState"""
self.events.clear()
self.frogpilot_events.clear()
# Add joystick event, static on cars, dynamic on nonCars
if self.joystick_mode:
self.events.add(EventName.joystickDebug)
self.startup_event = None
# Add startup event
if self.startup_event is not None:
if self.startup_event == FrogPilotEventName.customStartupAlert:
self.frogpilot_events.add(self.startup_event)
else:
self.events.add(self.startup_event)
self.startup_event = None
# Don't add any more events if not initialized
if not self.initialized:
self.events.add(EventName.controlsInitializing)
return
# no more events while in dashcam mode
if self.CP.passive:
return
# Block resume if cruise never previously enabled
resume_pressed = any(be.type in (ButtonType.accelCruise, ButtonType.resumeCruise) for be in CS.buttonEvents)
if not self.CP.pcmCruise and not self.v_cruise_helper.v_cruise_initialized and resume_pressed:
self.events.add(EventName.resumeBlocked)
if not self.CP.notCar:
self.events.add_from_msg(self.sm['driverMonitoringState'].events)
# Add car events, ignore if CAN isn't valid
if CS.canValid:
self.events.add_from_msg(CS.events)
# Create events for temperature, disk space, and memory
if self.sm['deviceState'].thermalStatus >= ThermalStatus.red:
self.events.add(EventName.overheat)
if self.sm['deviceState'].freeSpacePercent < 7 and not SIMULATION:
# under 7% of space free no enable allowed
self.events.add(EventName.outOfSpace)
if self.sm['deviceState'].memoryUsagePercent > 90 and not SIMULATION:
self.events.add(EventName.lowMemory)
# TODO: enable this once loggerd CPU usage is more reasonable
#cpus = list(self.sm['deviceState'].cpuUsagePercent)
#if max(cpus, default=0) > 95 and not SIMULATION:
# self.events.add(EventName.highCpuUsage)
# Alert if fan isn't spinning for 5 seconds
if self.sm['peripheralState'].pandaType != log.PandaState.PandaType.unknown:
if self.sm['peripheralState'].fanSpeedRpm < 500 and self.sm['deviceState'].fanSpeedPercentDesired > 50:
# allow enough time for the fan controller in the panda to recover from stalls
if (self.sm.frame - self.last_functional_fan_frame) * DT_CTRL > 15.0:
self.events.add(EventName.fanMalfunction)
else:
self.last_functional_fan_frame = self.sm.frame
# Handle calibration status
cal_status = self.sm['liveCalibration'].calStatus
if cal_status != log.LiveCalibrationData.Status.calibrated:
if cal_status == log.LiveCalibrationData.Status.uncalibrated:
self.events.add(EventName.calibrationIncomplete)
elif cal_status == log.LiveCalibrationData.Status.recalibrating:
if not self.recalibrating_seen:
set_offroad_alert("Offroad_Recalibration", True)
self.recalibrating_seen = True
self.events.add(EventName.calibrationRecalibrating)
else:
self.events.add(EventName.calibrationInvalid)
# Handle lane change
if self.sm['modelV2'].meta.laneChangeState == LaneChangeState.preLaneChange:
direction = self.sm['modelV2'].meta.laneChangeDirection
if (CS.leftBlindspot and direction == LaneChangeDirection.left) or \
(CS.rightBlindspot and direction == LaneChangeDirection.right):
if self.frogpilot_toggles.loud_blindspot_alert:
self.frogpilot_events.add(FrogPilotEventName.laneChangeBlockedLoud)
else:
self.events.add(EventName.laneChangeBlocked)
else:
if direction == LaneChangeDirection.left:
if self.sm['frogpilotPlan'].laneWidthLeft >= self.frogpilot_toggles.lane_detection_width:
self.events.add(EventName.preLaneChangeLeft)
else:
self.frogpilot_events.add(FrogPilotEventName.noLaneAvailable)
else:
if self.sm['frogpilotPlan'].laneWidthRight >= self.frogpilot_toggles.lane_detection_width:
self.events.add(EventName.preLaneChangeRight)
else:
self.frogpilot_events.add(FrogPilotEventName.noLaneAvailable)
elif self.sm['modelV2'].meta.laneChangeState in (LaneChangeState.laneChangeStarting,
LaneChangeState.laneChangeFinishing):
self.events.add(EventName.laneChange)
for i, pandaState in enumerate(self.sm['pandaStates']):
# All pandas must match the list of safetyConfigs, and if outside this list, must be silent or noOutput
if i < len(self.CP.safetyConfigs):
expected_param = self.CP.safetyConfigs[i].safetyParam
if i < len(self.FPCP.safetyConfigs):
expected_param |= self.FPCP.safetyConfigs[i].safetyParam
safety_mismatch = pandaState.safetyModel != self.CP.safetyConfigs[i].safetyModel or \
pandaState.safetyParam != expected_param or \
pandaState.alternativeExperience != self.CP.alternativeExperience
else:
safety_mismatch = pandaState.safetyModel not in IGNORED_SAFETY_MODES
# safety mismatch allows some time for pandad to set the safety mode and publish it back from panda
if (safety_mismatch and self.sm.frame*DT_CTRL > 10.) or pandaState.safetyRxChecksInvalid or self.mismatch_counter >= 200:
self.events.add(EventName.controlsMismatch)
if log.PandaState.FaultType.relayMalfunction in pandaState.faults:
self.events.add(EventName.relayMalfunction)
# Handle HW and system malfunctions
# Order is very intentional here. Be careful when modifying this.
# All events here should at least have NO_ENTRY and SOFT_DISABLE.
num_events = len(self.events)
not_running = {p.name for p in self.sm['managerState'].processes if not p.running and p.shouldBeRunning}
if self.sm.recv_frame['managerState'] and (not_running - IGNORE_PROCESSES):
self.events.add(EventName.processNotRunning)
if not_running != self.not_running_prev:
cloudlog.event("process_not_running", not_running=not_running, error=True)
self.not_running_prev = not_running
else:
if not SIMULATION and not self.rk.lagging:
if not self.sm.all_alive(self.camera_packets):
self.events.add(EventName.cameraMalfunction)
elif not self.sm.all_freq_ok(self.camera_packets):
self.events.add(EventName.cameraFrameRate)
if not REPLAY and self.rk.lagging:
self.events.add(EventName.controlsdLagging)
if len(self.sm['radarState'].radarErrors) or ((not self.rk.lagging or REPLAY) and not self.sm.all_checks(['radarState'])):
self.events.add(EventName.radarFault)
if not self.sm.valid['pandaStates']:
self.events.add(EventName.usbError)
if CS.canTimeout:
self.events.add(EventName.canBusMissing)
elif not CS.canValid and not self.frogpilot_toggles.force_onroad:
self.events.add(EventName.canError)
# generic catch-all. ideally, a more specific event should be added above instead
has_disable_events = self.contains_event_type(ET.NO_ENTRY) and self.contains_event_type(ET.SOFT_DISABLE, ET.IMMEDIATE_DISABLE)
no_system_errors = (not has_disable_events) or (len(self.events) == num_events)
if not self.sm.all_checks() and no_system_errors:
if not self.sm.all_alive():
self.events.add(EventName.commIssue)
elif not self.sm.all_freq_ok():
self.events.add(EventName.commIssueAvgFreq)
else:
self.events.add(EventName.commIssue)
logs = {
'invalid': [s for s, valid in self.sm.valid.items() if not valid],
'not_alive': [s for s, alive in self.sm.alive.items() if not alive],
'not_freq_ok': [s for s, freq_ok in self.sm.freq_ok.items() if not freq_ok],
}
if logs != self.logged_comm_issue:
cloudlog.event("commIssue", error=True, **logs)
self.logged_comm_issue = logs
else:
self.logged_comm_issue = None
if not (self.CP.notCar and self.joystick_mode):
if not self.sm['liveLocationKalman'].posenetOK:
self.events.add(EventName.posenetInvalid)
if not self.sm['liveLocationKalman'].deviceStable:
self.events.add(EventName.deviceFalling)
if not self.sm['liveLocationKalman'].inputsOK:
self.events.add(EventName.locationdTemporaryError)
if not self.sm['liveParameters'].valid and not TESTING_CLOSET and (not SIMULATION or REPLAY):
self.events.add(EventName.paramsdTemporaryError)
# conservative HW alert. if the data or frequency are off, locationd will throw an error
if any((self.sm.frame - self.sm.recv_frame[s])*DT_CTRL > 10. for s in self.sensor_packets):
self.events.add(EventName.sensorDataInvalid)
if not REPLAY:
# Check for mismatch between openpilot and car's PCM
cruise_mismatch = CS.cruiseState.enabled and (not self.enabled or not self.CP.pcmCruise)
self.cruise_mismatch_counter = self.cruise_mismatch_counter + 1 if cruise_mismatch else 0
if self.cruise_mismatch_counter > int(6. / DT_CTRL):
self.events.add(EventName.cruiseMismatch)
# Check for FCW
stock_long_is_braking = self.enabled and not self.CP.openpilotLongitudinalControl and CS.aEgo < -1.25
model_fcw = self.sm['modelV2'].meta.hardBrakePredicted and not CS.brakePressed and not stock_long_is_braking
planner_fcw = self.sm['longitudinalPlan'].fcw and self.enabled
if (planner_fcw or model_fcw) and not (self.CP.notCar and self.joystick_mode):
self.events.add(EventName.fcw)
for m in messaging.drain_sock(self.log_sock, wait_for_one=False):
try:
msg = m.androidLog.message
if any(err in msg for err in ("ERROR_CRC", "ERROR_ECC", "ERROR_STREAM_UNDERFLOW", "APPLY FAILED")):
csid = msg.split("CSID:")[-1].split(" ")[0]
evt = CSID_MAP.get(csid, None)
if evt is not None:
self.events.add(evt)
except UnicodeDecodeError:
pass
# TODO: fix simulator
if not SIMULATION or REPLAY:
# Not show in first 1 km to allow for driving out of garage. This event shows after 5 minutes
if not self.sm['liveLocationKalman'].gpsOK and self.sm['liveLocationKalman'].inputsOK and (self.distance_traveled > 1500):
# self.events.add(EventName.noGps)
pass
if self.sm['liveLocationKalman'].gpsOK:
self.distance_traveled = 0
self.distance_traveled += CS.vEgo * DT_CTRL
if self.sm['modelV2'].frameDropPerc > 20:
self.events.add(EventName.modeldLagging)
# Add FrogPilot events
self.frogpilot_events.add_from_msg(self.sm['frogpilotPlan'].frogpilotEvents)
if self.frogpilot_toggles.block_user:
self.frogpilot_events.add(FrogPilotEventName.blockUser)
# Remove already played events
event_names = self.events.names
if EventName.belowSteerSpeed in event_names:
self.belowSteerSpeed_shown = True
if EventName.resumeRequired in event_names:
self.resumeRequired_shown = True
if EventName.steerTempUnavailableSilent in event_names:
self.steerTempUnavailableSilent_shown = True
if self.belowSteerSpeed_shown and CS.vEgo >= self.CP.minSteerSpeed:
self.event_names_to_clear.add(EventName.belowSteerSpeed)
if self.resumeRequired_shown and not CS.cruiseState.standstill and not self.CP.autoResumeSng:
self.event_names_to_clear.add(EventName.resumeRequired)
if self.steerTempUnavailableSilent_shown and not CS.steerFaultTemporary:
self.event_names_to_clear.add(EventName.steerTempUnavailableSilent)
if self.event_names_to_clear:
self.events.events = [event for event in self.events.events if event not in self.event_names_to_clear]
def data_sample(self):
"""Receive data from sockets"""
car_state = messaging.recv_one(self.car_state_sock)
CS = car_state.carState if car_state else self.CS_prev
self.sm.update(0)
if not self.initialized:
all_valid = CS.canValid and self.sm.all_checks()
timed_out = self.sm.frame * DT_CTRL > 6.
if all_valid or timed_out or (SIMULATION and not REPLAY):
available_streams = VisionIpcClient.available_streams("camerad", block=False)
if VisionStreamType.VISION_STREAM_ROAD not in available_streams:
self.sm.ignore_alive.append('roadCameraState')
if VisionStreamType.VISION_STREAM_WIDE_ROAD not in available_streams:
self.sm.ignore_alive.append('wideRoadCameraState')
self.initialized = True
self.set_initial_state()
cloudlog.event(
"controlsd.initialized",
dt=self.sm.frame*DT_CTRL,
timeout=timed_out,
canValid=CS.canValid,
invalid=[s for s, valid in self.sm.valid.items() if not valid],
not_alive=[s for s, alive in self.sm.alive.items() if not alive],
not_freq_ok=[s for s, freq_ok in self.sm.freq_ok.items() if not freq_ok],
error=True,
)
# When the panda and controlsd do not agree on controls_allowed
# we want to disengage openpilot. However the status from the panda goes through
# another socket other than the CAN messages and one can arrive earlier than the other.
# Therefore we allow a mismatch for two samples, then we trigger the disengagement.
if not self.enabled:
self.mismatch_counter = 0
# All pandas not in silent mode must have controlsAllowed when openpilot is enabled
if self.enabled and any(not ps.controlsAllowed for ps in self.sm['pandaStates']
if ps.safetyModel not in IGNORED_SAFETY_MODES):
self.mismatch_counter += 1
return CS
def state_transition(self, CS):
"""Compute conditional state transitions and execute actions on state transitions"""
self.v_cruise_helper.update_v_cruise(CS, self.enabled, self.is_metric, self.sm['frogpilotPlan'].speedLimitChanged, self.frogpilot_toggles)
# decrement the soft disable timer at every step, as it's reset on
# entrance in SOFT_DISABLING state
self.soft_disable_timer = max(0, self.soft_disable_timer - 1)
self.current_alert_types = [ET.PERMANENT]
# ENABLED, SOFT DISABLING, PRE ENABLING, OVERRIDING
if self.state != State.disabled:
# user and immediate disable always have priority in a non-disabled state
if self.contains_event_type(ET.USER_DISABLE):
self.state = State.disabled
self.current_alert_types.append(ET.USER_DISABLE)
elif self.contains_event_type(ET.IMMEDIATE_DISABLE):
self.state = State.disabled
self.current_alert_types.append(ET.IMMEDIATE_DISABLE)
else:
# ENABLED
if self.state == State.enabled:
if self.contains_event_type(ET.SOFT_DISABLE):
self.state = State.softDisabling
self.soft_disable_timer = int(SOFT_DISABLE_TIME / DT_CTRL)
self.current_alert_types.append(ET.SOFT_DISABLE)
elif self.contains_event_type(ET.OVERRIDE_LATERAL, ET.OVERRIDE_LONGITUDINAL):
self.state = State.overriding
self.current_alert_types += [ET.OVERRIDE_LATERAL, ET.OVERRIDE_LONGITUDINAL]
# SOFT DISABLING
elif self.state == State.softDisabling:
if not self.contains_event_type(ET.SOFT_DISABLE):
# no more soft disabling condition, so go back to ENABLED
self.state = State.enabled
elif self.soft_disable_timer > 0:
self.current_alert_types.append(ET.SOFT_DISABLE)
elif self.soft_disable_timer <= 0:
self.state = State.disabled
# PRE ENABLING
elif self.state == State.preEnabled:
if not self.contains_event_type(ET.PRE_ENABLE):
self.state = State.enabled
else:
self.current_alert_types.append(ET.PRE_ENABLE)
# OVERRIDING
elif self.state == State.overriding:
if self.contains_event_type(ET.SOFT_DISABLE):
self.state = State.softDisabling
self.soft_disable_timer = int(SOFT_DISABLE_TIME / DT_CTRL)
self.current_alert_types.append(ET.SOFT_DISABLE)
elif not self.contains_event_type(ET.OVERRIDE_LATERAL, ET.OVERRIDE_LONGITUDINAL):
self.state = State.enabled
else:
self.current_alert_types += [ET.OVERRIDE_LATERAL, ET.OVERRIDE_LONGITUDINAL]
# DISABLED
elif self.state == State.disabled:
if self.contains_event_type(ET.ENABLE):
if self.contains_event_type(ET.NO_ENTRY):
self.current_alert_types.append(ET.NO_ENTRY)
else:
if self.contains_event_type(ET.PRE_ENABLE):
self.state = State.preEnabled
elif self.contains_event_type(ET.OVERRIDE_LATERAL, ET.OVERRIDE_LONGITUDINAL):
self.state = State.overriding
else:
self.state = State.enabled
self.current_alert_types.append(ET.ENABLE)
self.v_cruise_helper.initialize_v_cruise(CS, self.experimental_mode, self.sm['frogpilotPlan'].slcSpeedLimit + self.sm['frogpilotPlan'].slcSpeedLimitOffset, self.frogpilot_toggles)
# Check if openpilot is engaged and actuators are enabled
self.enabled = self.state in ENABLED_STATES
self.active = self.state in ACTIVE_STATES
if self.active or self.sm['frogpilotCarState'].alwaysOnLateralEnabled:
self.current_alert_types.append(ET.WARNING)
def state_control(self, CS):
"""Given the state, this function returns a CarControl packet"""
# Update VehicleModel
lp = self.sm['liveParameters']
x = max(lp.stiffnessFactor, 0.1)
sr = max(lp.steerRatio, 0.1)
self.VM.update_params(x, sr)
steer_angle_without_offset = math.radians(CS.steeringAngleDeg - lp.angleOffsetDeg)
self.curvature = -self.VM.calc_curvature(steer_angle_without_offset, CS.vEgo, lp.roll)
# Update Torque Params
if self.CP.lateralTuning.which() == 'torque':
torque_params = self.sm['liveTorqueParameters']
if self.sm.all_checks(['liveTorqueParameters']) and (torque_params.useParams or self.frogpilot_toggles.force_auto_tune):
self.LaC.update_live_torque_params(torque_params.latAccelFactorFiltered, torque_params.latAccelOffsetFiltered,
torque_params.frictionCoefficientFiltered)
if self.sm.updated['liveDelay'] and hasattr(self.LaC, "update_live_delay"):
self.LaC.update_live_delay(self.sm['liveDelay'].lateralDelay)
long_plan = self.sm['longitudinalPlan']
model_v2 = self.sm['modelV2']
CC = car.CarControl.new_message()
CC.enabled = self.enabled
# Check which actuators can be enabled
standstill = CS.vEgo <= max(self.CP.minSteerSpeed, MIN_LATERAL_CONTROL_SPEED) or CS.standstill
CC.latActive = (self.active or self.sm['frogpilotCarState'].alwaysOnLateralEnabled) and not CS.steerFaultTemporary and not CS.steerFaultPermanent and \
(not standstill or self.joystick_mode) and self.sm['frogpilotPlan'].lateralCheck and not self.sm['frogpilotCarState'].pauseLateral
CC.longActive = self.enabled and not self.contains_event_type(ET.OVERRIDE_LONGITUDINAL) and not self.sm['frogpilotCarState'].pauseLongitudinal and self.CP.openpilotLongitudinalControl
actuators = CC.actuators
actuators.longControlState = self.LoC.long_control_state
# Enable blinkers while lane changing
if model_v2.meta.laneChangeState != LaneChangeState.off:
CC.leftBlinker = model_v2.meta.laneChangeDirection == LaneChangeDirection.left
CC.rightBlinker = model_v2.meta.laneChangeDirection == LaneChangeDirection.right
if CS.leftBlinker or CS.rightBlinker:
self.last_blinker_frame = self.sm.frame
# State specific actions
if not CC.latActive:
self.LaC.reset()
if not CC.longActive:
if self.frogpilot_toggles.old_long_api:
self.LoC.reset_old_long(v_pid=CS.vEgo)
else:
self.LoC.reset()
if not self.joystick_mode:
# accel PID loop
pid_accel_limits = self.CI.get_pid_accel_limits(self.CP, CS.vEgo, self.v_cruise_helper.v_cruise_kph * CV.KPH_TO_MS)
if self.frogpilot_toggles.old_long_api:
t_since_plan = (self.sm.frame - self.sm.recv_frame['longitudinalPlan']) * DT_CTRL
actuators.accel = float(min(self.LoC.update_old_long(CC.longActive, CS, long_plan, pid_accel_limits, t_since_plan, self.frogpilot_toggles), self.frogpilot_toggles.max_desired_acceleration))
else:
actuators.accel = float(min(self.LoC.update(CC.longActive, CS, long_plan.aTarget, long_plan.shouldStop, pid_accel_limits, self.frogpilot_toggles), self.frogpilot_toggles.max_desired_acceleration))
if len(long_plan.speeds):
actuators.speed = long_plan.speeds[-1]
# Steering PID loop and lateral MPC
# Reset desired curvature to current to avoid violating the limits on engage
new_desired_curvature = model_v2.action.desiredCurvature if CC.latActive else self.curvature
self.desired_curvature, curvature_limited = clip_curvature(CS.vEgo, self.desired_curvature, new_desired_curvature, lp.roll)
lat_delay = self.sm["liveDelay"].lateralDelay + LAT_SMOOTH_SECONDS
actuators.curvature = self.desired_curvature
steer, steeringAngleDeg, lac_log = self.LaC.update(CC.latActive, CS, self.VM, lp,
self.steer_limited_by_safety, self.desired_curvature,
curvature_limited, lat_delay,
self.sm['liveLocationKalman'],
self.sm['modelV2'],
self.frogpilot_toggles)
actuators.steer = float(steer)
actuators.steeringAngleDeg = float(steeringAngleDeg)
else:
lac_log = log.ControlsState.LateralDebugState.new_message()
if self.sm.recv_frame['testJoystick'] > 0:
# reset joystick if it hasn't been received in a while
should_reset_joystick = (self.sm.frame - self.sm.recv_frame['testJoystick'])*DT_CTRL > 0.2
if not should_reset_joystick:
joystick_axes = self.sm['testJoystick'].axes
else:
joystick_axes = [0.0, 0.0]
if CC.longActive:
actuators.accel = 4.0*clip(joystick_axes[0], -1, 1)
if CC.latActive:
steer = clip(joystick_axes[1], -1, 1)
# max angle is 45 for angle-based cars, max curvature is 0.02
actuators.steer, actuators.steeringAngleDeg, actuators.curvature = steer, steer * 90., steer * -0.02
lac_log.active = self.active
lac_log.steeringAngleDeg = CS.steeringAngleDeg
lac_log.output = actuators.steer
lac_log.saturated = abs(actuators.steer) >= 0.9
# Send a "steering required alert" if saturation count has reached the limit
if CS.steeringPressed:
self.last_steering_pressed_frame = self.sm.frame
recent_steer_pressed = (self.sm.frame - self.last_steering_pressed_frame)*DT_CTRL < 2.0
if lac_log.active and not recent_steer_pressed and not self.CP.notCar:
clipped_speed = max(CS.vEgo, 0.3)
actual_lateral_accel = self.curvature * (clipped_speed**2)
desired_lateral_accel = model_v2.action.desiredCurvature * (clipped_speed**2)
undershooting = abs(desired_lateral_accel) / abs(1e-3 + actual_lateral_accel) > 1.2
turning = abs(desired_lateral_accel) > 1.0
# TODO: lac.saturated includes speed and other checks, should be pulled out
if undershooting and turning and lac_log.saturated:
if self.frogpilot_toggles.goat_scream_alert:
self.frogpilot_events.add(FrogPilotEventName.goatSteerSaturated)
else:
self.events.add(EventName.steerSaturated)
# Ensure no NaNs/Infs
for p in ACTUATOR_FIELDS:
attr = getattr(actuators, p)
if not isinstance(attr, SupportsFloat):
continue
if not math.isfinite(attr):
cloudlog.error(f"actuators.{p} not finite {actuators.to_dict()}")
setattr(actuators, p, 0.0)
# decrement personality on distance button press
if self.CP.openpilotLongitudinalControl:
distance_pressed = params_memory.get_bool("OnroadDistanceButtonPressed")
if self.frogpilot_toggles.personality_profile_via_distance:
distance_pressed |= any(not be.pressed and be.type == ButtonType.gapAdjustCruise for be in CS.buttonEvents)
distance_pressed &= not (self.sm['frogpilotCarState'].distanceLongPressed or self.sm['frogpilotCarState'].distanceVeryLongPressed)
if self.frogpilot_toggles.personality_profile_via_distance_long:
distance_pressed |= self.sm['frogpilotCarState'].distanceLongPressed
if self.frogpilot_toggles.personality_profile_via_distance_very_long:
distance_pressed |= self.sm['frogpilotCarState'].distanceVeryLongPressed
if self.frogpilot_toggles.personality_profile_via_lkas:
distance_pressed |= any(be.pressed and be.type == FrogPilotButtonType.lkas for be in CS.buttonEvents)
if not distance_pressed and self.distance_pressed_previously:
if self.display_timer > 0 or not self.has_menu:
self.personality = (self.personality - 1) % 3
self.params.put_nonblocking('LongitudinalPersonality', str(self.personality))
self.display_timer = 350
self.distance_pressed_previously = distance_pressed
self.display_timer -= 1
self.update_frogpilot_variables(CS)
return CC, lac_log
def update_frogpilot_variables(self, CS):
if self.frogpilot_toggles.conditional_experimental_mode or self.frogpilot_toggles.slc_fallback_experimental_mode:
self.experimental_mode = self.sm['frogpilotPlan'].experimentalMode
if hasattr(self.LaC, "pid") and self.CP.lateralTuning.which() != "pid":
self.LaC.pid._k_p = self.frogpilot_toggles.steerKp
# Update FrogPilot variables
if self.sm['frogpilotPlan'].togglesUpdated:
self.frogpilot_toggles = get_frogpilot_toggles()
def publish_logs(self, CS, start_time, CC, lac_log):
"""Send actuators and hud commands to the car, send controlsstate and MPC logging"""
# Orientation and angle rates can be useful for carcontroller
# Only calibrated (car) frame is relevant for the carcontroller
orientation_value = list(self.sm['liveLocationKalman'].calibratedOrientationNED.value)
if len(orientation_value) > 2:
CC.orientationNED = orientation_value
angular_rate_value = list(self.sm['liveLocationKalman'].angularVelocityCalibrated.value)
if len(angular_rate_value) > 2:
CC.angularVelocity = angular_rate_value
CC.cruiseControl.override = self.enabled and not CC.longActive and self.CP.openpilotLongitudinalControl
CC.cruiseControl.cancel = CS.cruiseState.enabled and (not self.enabled or not self.CP.pcmCruise)
if self.joystick_mode and self.sm.recv_frame['testJoystick'] > 0 and self.sm['testJoystick'].buttons[0]:
CC.cruiseControl.cancel = True
speeds = self.sm['longitudinalPlan'].speeds
if len(speeds):
CC.cruiseControl.resume = self.enabled and CS.cruiseState.standstill and speeds[-1] > 0.1
hudControl = CC.hudControl
hudControl.setSpeed = float(self.v_cruise_helper.v_cruise_cluster_kph * CV.KPH_TO_MS)
hudControl.speedVisible = self.enabled
hudControl.lanesVisible = self.enabled
hudControl.leadVisible = self.sm['longitudinalPlan'].hasLead
hudControl.leadDistanceBars = self.personality + 1
hudControl.rightLaneVisible = True
hudControl.leftLaneVisible = True
recent_blinker = (self.sm.frame - self.last_blinker_frame) * DT_CTRL < 5.0 # 5s blinker cooldown
ldw_allowed = self.is_ldw_enabled and CS.vEgo > LDW_MIN_SPEED and not recent_blinker \
and not CC.latActive and self.sm['liveCalibration'].calStatus == log.LiveCalibrationData.Status.calibrated
model_v2 = self.sm['modelV2']
desire_prediction = model_v2.meta.desirePrediction
if len(desire_prediction) and ldw_allowed:
right_lane_visible = model_v2.laneLineProbs[2] > 0.5
left_lane_visible = model_v2.laneLineProbs[1] > 0.5
l_lane_change_prob = desire_prediction[Desire.laneChangeLeft]
r_lane_change_prob = desire_prediction[Desire.laneChangeRight]
lane_lines = model_v2.laneLines
l_lane_close = left_lane_visible and (lane_lines[1].y[0] > -(1.08 + CAMERA_OFFSET))
r_lane_close = right_lane_visible and (lane_lines[2].y[0] < (1.08 - CAMERA_OFFSET))
hudControl.leftLaneDepart = bool(l_lane_change_prob > LANE_DEPARTURE_THRESHOLD and l_lane_close)
hudControl.rightLaneDepart = bool(r_lane_change_prob > LANE_DEPARTURE_THRESHOLD and r_lane_close)
if hudControl.rightLaneDepart or hudControl.leftLaneDepart:
self.events.add(EventName.ldw)
clear_event_types = set()
if ET.WARNING not in self.current_alert_types:
clear_event_types.add(ET.WARNING)
if self.enabled:
clear_event_types.add(ET.NO_ENTRY)
alerts = self.events.create_alerts(self.current_alert_types, [self.CP, CS, self.sm, self.is_metric, self.soft_disable_timer, self.frogpilot_toggles])
self.AM.add_many(self.sm.frame, alerts)
current_alert = self.AM.process_alerts(self.sm.frame, clear_event_types)
if current_alert:
hudControl.visualAlert = current_alert.visual_alert
if not self.CP.passive and self.initialized:
CO = self.sm['carOutput']
if self.CP.steerControlType == car.CarParams.SteerControlType.angle:
self.steer_limited_by_safety = abs(CC.actuators.steeringAngleDeg - CO.actuatorsOutput.steeringAngleDeg) > \
STEER_ANGLE_SATURATION_THRESHOLD
else:
self.steer_limited_by_safety = abs(CC.actuators.steer - CO.actuatorsOutput.steer) > 1e-2
force_decel = (self.sm['driverMonitoringState'].awarenessStatus < 0.) or \
(self.state == State.softDisabling) or \
self.sm['frogpilotCarState'].forceCoast
# Curvature & Steering angle
lp = self.sm['liveParameters']
steer_angle_without_offset = math.radians(CS.steeringAngleDeg - lp.angleOffsetDeg)
curvature = -self.VM.calc_curvature(steer_angle_without_offset, CS.vEgo, lp.roll)
# controlsState
dat = messaging.new_message('controlsState')
dat.valid = CS.canValid
controlsState = dat.controlsState
if current_alert:
controlsState.alertText1 = current_alert.alert_text_1
controlsState.alertText2 = current_alert.alert_text_2
controlsState.alertSize = current_alert.alert_size
controlsState.alertStatus = current_alert.alert_status
controlsState.alertBlinkingRate = current_alert.alert_rate
controlsState.alertType = current_alert.alert_type
controlsState.alertSound = current_alert.audible_alert
controlsState.longitudinalPlanMonoTime = self.sm.logMonoTime['longitudinalPlan']
controlsState.lateralPlanMonoTime = self.sm.logMonoTime['modelV2']
controlsState.enabled = self.enabled
controlsState.active = self.active
controlsState.curvature = curvature
controlsState.desiredCurvature = self.desired_curvature
controlsState.state = self.state
controlsState.engageable = not self.contains_event_type(ET.NO_ENTRY)
controlsState.longControlState = self.LoC.long_control_state
controlsState.vPid = float(self.LoC.v_pid)
controlsState.vCruise = float(self.v_cruise_helper.v_cruise_kph)
controlsState.vCruiseCluster = float(self.v_cruise_helper.v_cruise_cluster_kph)
controlsState.upAccelCmd = float(self.LoC.pid.p)
controlsState.uiAccelCmd = float(self.LoC.pid.i)
controlsState.ufAccelCmd = float(self.LoC.pid.f)
controlsState.cumLagMs = -self.rk.remaining * 1000.
controlsState.startMonoTime = int(start_time * 1e9)
controlsState.forceDecel = bool(force_decel)
controlsState.experimentalMode = self.experimental_mode
controlsState.personality = self.personality
lat_tuning = self.CP.lateralTuning.which()
if self.joystick_mode:
controlsState.lateralControlState.debugState = lac_log
elif self.CP.steerControlType == car.CarParams.SteerControlType.angle:
controlsState.lateralControlState.angleState = lac_log
elif lat_tuning == 'pid':
controlsState.lateralControlState.pidState = lac_log
elif lat_tuning == 'torque':
controlsState.lateralControlState.torqueState = lac_log
self.pm.send('controlsState', dat)
# onroadEvents - logged every second or on change
if (self.sm.frame % int(1. / DT_CTRL) == 0) or (self.events.names != self.events_prev) or (self.frogpilot_events.names != self.frogpilot_events_prev):
ce_send = messaging.new_message('onroadEvents', len(self.events))
ce_send.valid = True
ce_send.onroadEvents = self.events.to_msg()
self.pm.send('onroadEvents', ce_send)
fpce_send = messaging.new_message('frogpilotOnroadEvents', len(self.frogpilot_events))
fpce_send.valid = True
fpce_send.frogpilotOnroadEvents = self.frogpilot_events.to_msg()
self.pm.send('frogpilotOnroadEvents', fpce_send)
self.events_prev = self.events.names.copy()
self.frogpilot_events_prev = self.frogpilot_events.names.copy()
# carControl
cc_send = messaging.new_message('carControl')
cc_send.valid = CS.canValid
cc_send.carControl = CC
self.pm.send('carControl', cc_send)
# frogpilotControlsState
frogpilot_dat = messaging.new_message('frogpilotControlsState')
frogpilot_dat.valid = CS.canValid
frogpilotControlsState = frogpilot_dat.frogpilotControlsState
frogpilot_alerts = self.frogpilot_events.create_alerts(self.current_alert_types, [self.CP, CS, self.sm, self.is_metric, self.soft_disable_timer, self.frogpilot_toggles])
self.frogpilot_AM.add_many(self.sm.frame, frogpilot_alerts)
current_frogpilot_alert = self.frogpilot_AM.process_alerts(self.sm.frame, clear_event_types)
if current_frogpilot_alert:
frogpilotControlsState.alertText1 = current_frogpilot_alert.alert_text_1
frogpilotControlsState.alertText2 = current_frogpilot_alert.alert_text_2
frogpilotControlsState.alertSize = current_frogpilot_alert.alert_size
frogpilotControlsState.alertStatus = current_frogpilot_alert.alert_status
frogpilotControlsState.alertBlinkingRate = current_frogpilot_alert.alert_rate
frogpilotControlsState.alertType = current_frogpilot_alert.alert_type
frogpilotControlsState.alertSound = current_frogpilot_alert.audible_alert
self.pm.send('frogpilotControlsState', frogpilot_dat)
def step(self):
start_time = time.monotonic()
# Sample data from sockets and get a carState
CS = self.data_sample()
cloudlog.timestamp("Data sampled")
self.update_events(CS)
cloudlog.timestamp("Events updated")
if not self.CP.passive and self.initialized:
# Update control state
self.state_transition(CS)
# Compute actuators (runs PID loops and lateral MPC)
CC, lac_log = self.state_control(CS)
# Publish data
self.publish_logs(CS, start_time, CC, lac_log)
self.CS_prev = CS
def read_personality_param(self):
try:
return int(self.params.get('LongitudinalPersonality'))
except (ValueError, TypeError):
return log.LongitudinalPersonality.standard
def params_thread(self, evt):
while not evt.is_set():
self.is_metric = self.params.get_bool("IsMetric")
if not (self.frogpilot_toggles.conditional_experimental_mode or self.frogpilot_toggles.slc_fallback_experimental_mode):
self.experimental_mode = self.params.get_bool("ExperimentalMode") and self.CP.openpilotLongitudinalControl
self.personality = self.read_personality_param()
if self.CP.notCar:
self.joystick_mode = self.params.get_bool("JoystickDebugMode")
time.sleep(0.1)
def controlsd_thread(self):
e = threading.Event()
t = threading.Thread(target=self.params_thread, args=(e, ))
try:
t.start()
while True:
self.step()
self.rk.monitor_time()
except SystemExit:
e.set()
t.join()
def main():
config_realtime_process(4, Priority.CTRL_HIGH)
controls = Controls()
controls.controlsd_thread()
if __name__ == "__main__":
main()