openpilot/selfdrive/ui/tests/test_feedbackd.py
Vehicle Researcher c5d5c5d1f3 openpilot v0.10.1 release
date: 2025-10-24T00:30:59
master commit: 405631baf9685e171a0dd19547cb763f1b163d18
2025-10-24 00:31:03 -07:00

54 lines
2.0 KiB
Python

import pytest
import cereal.messaging as messaging
from cereal import car
from openpilot.common.params import Params
from openpilot.system.manager.process_config import managed_processes
@pytest.mark.skip("tmp disabled")
class TestFeedbackd:
def setup_method(self):
self.pm = messaging.PubMaster(['carState', 'rawAudioData'])
self.sm = messaging.SubMaster(['audioFeedback'])
def _send_lkas_button(self, pressed: bool):
msg = messaging.new_message('carState')
msg.carState.canValid = True
msg.carState.buttonEvents = [{'type': car.CarState.ButtonEvent.Type.lkas, 'pressed': pressed}]
self.pm.send('carState', msg)
def _send_audio_data(self, count: int = 5):
for _ in range(count):
audio_msg = messaging.new_message('rawAudioData')
audio_msg.rawAudioData.data = bytes(1600) # 800 samples of int16
audio_msg.rawAudioData.sampleRate = 16000
self.pm.send('rawAudioData', audio_msg)
self.sm.update(timeout=100)
@pytest.mark.parametrize("record_feedback", [False, True])
def test_audio_feedback(self, record_feedback):
Params().put_bool("RecordAudioFeedback", record_feedback)
managed_processes["feedbackd"].start()
assert self.pm.wait_for_readers_to_update('carState', timeout=5)
assert self.pm.wait_for_readers_to_update('rawAudioData', timeout=5)
self._send_lkas_button(pressed=True)
self._send_audio_data()
self._send_lkas_button(pressed=False)
self._send_audio_data()
if record_feedback:
assert self.sm.updated['audioFeedback'], "audioFeedback should be published when enabled"
else:
assert not self.sm.updated['audioFeedback'], "audioFeedback should not be published when disabled"
self._send_lkas_button(pressed=True)
self._send_audio_data()
self._send_lkas_button(pressed=False)
self._send_audio_data()
assert not self.sm.updated['audioFeedback'], "audioFeedback should not be published after second press"
managed_processes["feedbackd"].stop()