143 lines
4.4 KiB
Python
143 lines
4.4 KiB
Python
#!/usr/bin/env python3
|
|
import time
|
|
import cereal.messaging as messaging
|
|
from openpilot.common.params import Params
|
|
from openpilot.common.realtime import config_realtime_process, Ratekeeper, DT_DMON
|
|
from cereal import log
|
|
|
|
EventName = log.OnroadEvent.EventName
|
|
|
|
class SimpleDriverMonitoring:
|
|
def __init__(self):
|
|
# Timing configuration (in seconds)
|
|
self.FIRST_WARNING_TIME = 45.0
|
|
self.SECOND_WARNING_TIME = 60.0
|
|
self.THIRD_WARNING_TIME = 75.0
|
|
|
|
# State variables
|
|
self.awareness = 1.0 # Full awareness
|
|
self.current_events = []
|
|
# self.last_interaction_time = 0
|
|
self.hands_on_steering = False
|
|
|
|
# Warning thresholds (normalized to 0-1 scale)
|
|
self.threshold_prompt = self.FIRST_WARNING_TIME / self.THIRD_WARNING_TIME # ~0.643 for first warning
|
|
self.threshold_critical = self.SECOND_WARNING_TIME / self.THIRD_WARNING_TIME # ~0.857 for second warning
|
|
|
|
# Step change (how much awareness decreases per step)
|
|
self.step_change = DT_DMON / self.THIRD_WARNING_TIME
|
|
|
|
params = Params()
|
|
self.is_rhd = params.get_bool("dp_dev_is_rhd")
|
|
self.monitoring_disabled = params.get_bool("dp_dev_monitoring_disabled")
|
|
|
|
def update_events(self, reset_condition, op_engaged):
|
|
self.current_events = []
|
|
|
|
if self.monitoring_disabled:
|
|
return
|
|
|
|
# If not engaged, reset awareness and return
|
|
if not op_engaged:
|
|
self.awareness = 1.0
|
|
return
|
|
|
|
# Reset awareness on any reset condition (standstill, any input)
|
|
if reset_condition:
|
|
self.awareness = 1.0
|
|
return
|
|
|
|
# Only decrease awareness if we're not detecting hands on steering
|
|
self.awareness = max(self.awareness - self.step_change, 0.0)
|
|
|
|
# Determine alert level based on awareness
|
|
if self.awareness <= 0.0:
|
|
# Third warning (red alert) at 70 seconds
|
|
self.current_events.append(EventName.driverUnresponsive)
|
|
elif self.awareness <= (1.0 - self.threshold_critical):
|
|
# Second warning (orange alert) at 60 seconds
|
|
self.current_events.append(EventName.promptDriverUnresponsive)
|
|
elif self.awareness <= (1.0 - self.threshold_prompt):
|
|
# First warning (green alert) at 45 seconds
|
|
self.current_events.append(EventName.preDriverUnresponsive)
|
|
|
|
def get_state_packet(self, valid=True):
|
|
# Create driver monitoring state message
|
|
dat = messaging.new_message('driverMonitoringState', valid=valid)
|
|
events = []
|
|
|
|
for event_name in self.current_events:
|
|
event = log.OnroadEvent.new_message()
|
|
event.name = event_name
|
|
events.append(event)
|
|
|
|
dat.driverMonitoringState = {
|
|
"events": events,
|
|
"faceDetected": False, # Not using face detection
|
|
"isDistracted": self.awareness <= (1.0 - self.threshold_prompt),
|
|
"distractedType": 0, # Not using distraction types
|
|
"awarenessStatus": 1.0, #self.awareness, (always 1.0 so no decel)
|
|
"posePitchOffset": 0.0,
|
|
"posePitchValidCount": 0,
|
|
"poseYawOffset": 0.0,
|
|
"poseYawValidCount": 0,
|
|
"stepChange": self.step_change,
|
|
"awarenessActive": self.awareness,
|
|
"awarenessPassive": self.awareness,
|
|
"isLowStd": True,
|
|
"hiStdCount": 0,
|
|
"isActiveMode": True,
|
|
"isRHD": self.is_rhd,
|
|
}
|
|
return dat
|
|
|
|
def dmonitoringd_thread():
|
|
# Configure process priority
|
|
config_realtime_process([0, 1, 2, 3], 5)
|
|
|
|
# Initialize parameters and messaging
|
|
pm = messaging.PubMaster(['driverMonitoringState'])
|
|
sm = messaging.SubMaster(['carState', 'selfdriveState'])
|
|
|
|
# Initialize driver monitoring system
|
|
DM = SimpleDriverMonitoring()
|
|
|
|
# Create ratekeeper for 20Hz operation
|
|
rk = Ratekeeper(20, None)
|
|
|
|
# Main loop running at 20Hz
|
|
while True:
|
|
sm.update()
|
|
|
|
# Check if steering is touched (only monitoring steering for hands-on)
|
|
|
|
# Reset conditions: not engaged, standstill, or any input
|
|
reset_condition = (
|
|
sm['carState'].standstill or
|
|
sm['carState'].steeringPressed or
|
|
sm['carState'].gasPressed or
|
|
sm['carState'].brakePressed or
|
|
sm['carState'].leftBlinker or
|
|
sm['carState'].rightBlinker
|
|
)
|
|
|
|
# Process driver monitoring - monitoring only steering for hands-on
|
|
# but resetting on any input
|
|
DM.update_events(
|
|
reset_condition=reset_condition,
|
|
op_engaged=sm['selfdriveState'].enabled
|
|
)
|
|
|
|
# Publish driver monitoring state
|
|
dat = DM.get_state_packet()
|
|
pm.send('driverMonitoringState', dat)
|
|
|
|
# Maintain 20Hz
|
|
rk.keep_time()
|
|
|
|
def main():
|
|
dmonitoringd_thread()
|
|
|
|
if __name__ == '__main__':
|
|
main()
|