openpilot/selfdrive/monitoring/dpmonitoringd.py
2025-11-11 22:44:56 +08:00

143 lines
4.4 KiB
Python

#!/usr/bin/env python3
import time
import cereal.messaging as messaging
from openpilot.common.params import Params
from openpilot.common.realtime import config_realtime_process, Ratekeeper, DT_DMON
from cereal import log
EventName = log.OnroadEvent.EventName
class SimpleDriverMonitoring:
def __init__(self):
# Timing configuration (in seconds)
self.FIRST_WARNING_TIME = 45.0
self.SECOND_WARNING_TIME = 60.0
self.THIRD_WARNING_TIME = 75.0
# State variables
self.awareness = 1.0 # Full awareness
self.current_events = []
# self.last_interaction_time = 0
self.hands_on_steering = False
# Warning thresholds (normalized to 0-1 scale)
self.threshold_prompt = self.FIRST_WARNING_TIME / self.THIRD_WARNING_TIME # ~0.643 for first warning
self.threshold_critical = self.SECOND_WARNING_TIME / self.THIRD_WARNING_TIME # ~0.857 for second warning
# Step change (how much awareness decreases per step)
self.step_change = DT_DMON / self.THIRD_WARNING_TIME
params = Params()
self.is_rhd = params.get_bool("dp_dev_is_rhd")
self.monitoring_disabled = params.get_bool("dp_dev_monitoring_disabled")
def update_events(self, reset_condition, op_engaged):
self.current_events = []
if self.monitoring_disabled:
return
# If not engaged, reset awareness and return
if not op_engaged:
self.awareness = 1.0
return
# Reset awareness on any reset condition (standstill, any input)
if reset_condition:
self.awareness = 1.0
return
# Only decrease awareness if we're not detecting hands on steering
self.awareness = max(self.awareness - self.step_change, 0.0)
# Determine alert level based on awareness
if self.awareness <= 0.0:
# Third warning (red alert) at 70 seconds
self.current_events.append(EventName.driverUnresponsive)
elif self.awareness <= (1.0 - self.threshold_critical):
# Second warning (orange alert) at 60 seconds
self.current_events.append(EventName.promptDriverUnresponsive)
elif self.awareness <= (1.0 - self.threshold_prompt):
# First warning (green alert) at 45 seconds
self.current_events.append(EventName.preDriverUnresponsive)
def get_state_packet(self, valid=True):
# Create driver monitoring state message
dat = messaging.new_message('driverMonitoringState', valid=valid)
events = []
for event_name in self.current_events:
event = log.OnroadEvent.new_message()
event.name = event_name
events.append(event)
dat.driverMonitoringState = {
"events": events,
"faceDetected": False, # Not using face detection
"isDistracted": self.awareness <= (1.0 - self.threshold_prompt),
"distractedType": 0, # Not using distraction types
"awarenessStatus": 1.0, #self.awareness, (always 1.0 so no decel)
"posePitchOffset": 0.0,
"posePitchValidCount": 0,
"poseYawOffset": 0.0,
"poseYawValidCount": 0,
"stepChange": self.step_change,
"awarenessActive": self.awareness,
"awarenessPassive": self.awareness,
"isLowStd": True,
"hiStdCount": 0,
"isActiveMode": True,
"isRHD": self.is_rhd,
}
return dat
def dmonitoringd_thread():
# Configure process priority
config_realtime_process([0, 1, 2, 3], 5)
# Initialize parameters and messaging
pm = messaging.PubMaster(['driverMonitoringState'])
sm = messaging.SubMaster(['carState', 'selfdriveState'])
# Initialize driver monitoring system
DM = SimpleDriverMonitoring()
# Create ratekeeper for 20Hz operation
rk = Ratekeeper(20, None)
# Main loop running at 20Hz
while True:
sm.update()
# Check if steering is touched (only monitoring steering for hands-on)
# Reset conditions: not engaged, standstill, or any input
reset_condition = (
sm['carState'].standstill or
sm['carState'].steeringPressed or
sm['carState'].gasPressed or
sm['carState'].brakePressed or
sm['carState'].leftBlinker or
sm['carState'].rightBlinker
)
# Process driver monitoring - monitoring only steering for hands-on
# but resetting on any input
DM.update_events(
reset_condition=reset_condition,
op_engaged=sm['selfdriveState'].enabled
)
# Publish driver monitoring state
dat = DM.get_state_packet()
pm.send('driverMonitoringState', dat)
# Maintain 20Hz
rk.keep_time()
def main():
dmonitoringd_thread()
if __name__ == '__main__':
main()