openpilot/frogpilot/controls/lib/speed_limit_controller.py
2025-11-01 12:00:00 -07:00

351 lines
13 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# PFEIFER - SLC - Modified by FrogAi for FrogPilot
import calendar
import json
import math
import numpy as np
import requests
from concurrent.futures import ThreadPoolExecutor
from openpilot.common.conversions import Conversions as CV
from openpilot.common.realtime import DT_MDL
from openpilot.frogpilot.common.frogpilot_utilities import calculate_bearing_offset, calculate_distance_to_point, is_url_pingable
from openpilot.frogpilot.common.frogpilot_variables import params, params_memory
FREE_MAPBOX_REQUESTS = 100_000
class SpeedLimitController:
def __init__(self):
self.calling_mapbox = False
self.override_slc = False
self.denied_target = 0
self.map_speed_limit = 0
self.mapbox_limit = 0
self.next_speed_limit = 0
self.overridden_speed = 0
self.segment_distance = 0
self.speed_limit_changed_timer = 0
self.target = 0
self.unconfirmed_speed_limit = 0
self.previous_source = "None"
self.source = "None"
self.mapbox_requests = json.loads(params.get("MapBoxRequests") or "{}")
self.mapbox_requests.setdefault("total_requests", 0)
self.mapbox_requests.setdefault("max_requests", FREE_MAPBOX_REQUESTS - (28 * 100))
self.mapbox_host = "https://api.mapbox.com"
self.mapbox_token = params.get("MapboxSecretKey", encoding="utf8")
self.previous_target = params.get_float("PreviousSpeedLimit")
self.executor = ThreadPoolExecutor(max_workers=1)
self.session = requests.Session()
self.session.headers.update({"Accept-Language": "en"})
self.session.headers.update({"User-Agent": "frogpilot-mapbox-speed-limit-retriever/1.0 (https://github.com/FrogAi/FrogPilot)"})
@property
def experimental_mode(self):
return self.frogpilot_toggles.slc_fallback_experimental_mode and self.target == 0
@property
def offset(self):
if self.frogpilot_toggles.is_metric:
offset_map = [
(0, 8.1, self.frogpilot_toggles.speed_limit_offset1), # 029 km/h
(8.1, 13.6, self.frogpilot_toggles.speed_limit_offset2), # 3049
(13.6, 16.4, self.frogpilot_toggles.speed_limit_offset3), # 5059
(16.4, 21.9, self.frogpilot_toggles.speed_limit_offset4), # 6079
(21.9, 27.5, self.frogpilot_toggles.speed_limit_offset5), # 8099
(27.5, 33.1, self.frogpilot_toggles.speed_limit_offset6), # 100119
(33.1, 38.9, self.frogpilot_toggles.speed_limit_offset7), # 120140
]
else:
offset_map = [
(0, 11.2, self.frogpilot_toggles.speed_limit_offset1), # 024 mph
(11.2, 15.2, self.frogpilot_toggles.speed_limit_offset2), # 2534
(15.2, 19.6, self.frogpilot_toggles.speed_limit_offset3), # 3544
(19.6, 24.1, self.frogpilot_toggles.speed_limit_offset4), # 4554
(24.1, 28.6, self.frogpilot_toggles.speed_limit_offset5), # 5564
(28.6, 33.1, self.frogpilot_toggles.speed_limit_offset6), # 6574
(33.1, 44.2, self.frogpilot_toggles.speed_limit_offset7), # 7599
]
return next((offset for low, high, offset in offset_map if low < self.target < high), 0)
def get_mapbox_speed_limit(self, gps_position, now, time_validated, v_ego, sm):
if not gps_position or not self.mapbox_token or (sm["carState"].steeringAngleDeg - sm["liveParameters"].angleOffsetDeg) >= 45:
self.mapbox_limit = 0
self.segment_distance = 0
return
if v_ego < 1:
return
if self.segment_distance > 0:
self.segment_distance -= v_ego * DT_MDL
return
if self.calling_mapbox:
self.segment_distance = v_ego
return
def make_request():
try:
self.calling_mapbox = True
successful = False
if not is_url_pingable(self.mapbox_host):
self.segment_distance = 1000
return None
if time_validated:
current_month = now.month
if current_month != self.mapbox_requests.get("month"):
self.mapbox_requests.update({
"month": current_month,
"total_requests": 0,
"max_requests": FREE_MAPBOX_REQUESTS - calendar.monthrange(now.year, current_month)[1] * 100,
})
self.mapbox_requests["total_requests"] += 1
params.put_nonblocking("MapBoxRequests", json.dumps(self.mapbox_requests))
current_bearing = gps_position.get("bearing")
current_latitude = gps_position.get("latitude")
current_longitude = gps_position.get("longitude")
future_latitude, future_longitude = calculate_bearing_offset(current_latitude, current_longitude, current_bearing, v_ego)
url = (
f"{self.mapbox_host}/matching/v5/mapbox/driving/"
f"{current_longitude},{current_latitude};"
f"{future_longitude},{future_latitude}.json"
)
mapbox_params = {
"access_token": self.mapbox_token,
"annotations": "maxspeed,distance",
"geometries": "polyline6",
"overview": "full",
"steps": "false",
"radiuses": "10;10",
"tidy": "true",
}
response = self.session.get(url, params=mapbox_params, timeout=10)
response.raise_for_status()
successful = True
return response.json()
except Exception as exception:
print(f"Unexpected error in Mapbox request: {exception}")
finally:
self.calling_mapbox = False
if not successful:
self.mapbox_limit = 0
self.segment_distance = v_ego
return None
def complete_request(future):
try:
data = future.result()
if data:
matchings = data.get("matchings") or []
if not matchings:
self.mapbox_limit = 0
self.segment_distance = v_ego
return
legs = (matchings[0] or {}).get("legs") or []
if not legs:
self.mapbox_limit = 0
self.segment_distance = v_ego
return
annotation = legs[0].get("annotation") or {}
distances = annotation.get("distance") or [v_ego]
segment_distance = distances[0]
speed_data = annotation.get("maxspeed", [])
speed_limit_kph = 0
if speed_data:
first_segment_speed = speed_data[0]
speed_limit_kph = (first_segment_speed.get("speed") if first_segment_speed.get("speed") != "none" else 0) or 0
if speed_limit_kph > 0:
self.mapbox_limit = speed_limit_kph * CV.KPH_TO_MS
self.segment_distance = segment_distance
return
self.mapbox_limit = 0
self.segment_distance = v_ego
except Exception as exception:
print(f"Mapbox Callback Error: {exception}")
self.mapbox_limit = 0
self.segment_distance = v_ego
future = self.executor.submit(make_request)
future.add_done_callback(complete_request)
def handle_limit_change(self, desired_source, desired_target, sm):
self.speed_limit_changed_timer += DT_MDL
speed_limit_accepted = (sm["frogpilotCarState"].accelPressed and sm["carControl"].longActive) or params_memory.get_bool("SpeedLimitAccepted")
speed_limit_denied = sm["frogpilotCarState"].decelPressed or (self.speed_limit_changed_timer >= 30)
if speed_limit_accepted:
self.overridden_speed = 0
self.source = desired_source
self.target = desired_target
params_memory.remove("SpeedLimitAccepted")
elif speed_limit_denied:
self.denied_target = desired_target
self.previous_source = desired_source
self.previous_target = desired_target
elif desired_target < self.target and not self.frogpilot_toggles.speed_limit_confirmation_lower:
self.source = desired_source
self.target = desired_target
elif desired_target > self.target and not self.frogpilot_toggles.speed_limit_confirmation_higher:
self.source = desired_source
self.target = desired_target
else:
self.source = "None"
self.unconfirmed_speed_limit = desired_target
if self.target != self.previous_target and self.target > 0 and not speed_limit_denied:
self.denied_target = 0
self.previous_source = self.source
self.previous_target = self.target
params.put_float_nonblocking("PreviousSpeedLimit", self.target)
def update_limits(self, dashboard_speed_limit, gps_position, navigation_speed_limit, now, time_validated, v_cruise, v_ego, sm):
self.update_map_speed_limit(gps_position, v_ego)
limits = {
"Dashboard": dashboard_speed_limit,
"Map Data": self.map_speed_limit,
"Navigation": navigation_speed_limit
}
filtered_limits = {source: limit for source, limit in limits.items() if limit >= 1}
if self.frogpilot_toggles.speed_limit_priority_highest:
desired_source = max(filtered_limits, key=filtered_limits.get, default="None")
desired_target = filtered_limits.get(desired_source, 0)
elif self.frogpilot_toggles.speed_limit_priority_lowest:
desired_source = min(filtered_limits, key=filtered_limits.get, default="None")
desired_target = filtered_limits.get(desired_source, 0)
elif filtered_limits:
for priority in [
self.frogpilot_toggles.speed_limit_priority1,
self.frogpilot_toggles.speed_limit_priority2,
self.frogpilot_toggles.speed_limit_priority3
]:
if priority in filtered_limits:
desired_source = priority
desired_target = filtered_limits[desired_source]
break
else:
desired_source = "None"
desired_target = 0
else:
desired_source = "None"
desired_target = 0
if desired_target == 0 or self.target == 0:
if self.mapbox_requests["total_requests"] < self.mapbox_requests["max_requests"] and self.frogpilot_toggles.slc_mapbox_filler:
self.get_mapbox_speed_limit(gps_position, now, time_validated, v_ego, sm)
if self.mapbox_limit >= 1:
desired_source = "Mapbox"
desired_target = self.mapbox_limit
if desired_target == 0 or self.target == 0:
if self.denied_target != self.previous_target > 0 and self.frogpilot_toggles.slc_fallback_previous_speed_limit:
desired_source = self.previous_source
desired_target = self.previous_target
self.target = desired_target
elif sm["controlsState"].enabled and self.frogpilot_toggles.slc_fallback_set_speed:
desired_source = "None"
desired_target = v_cruise
else:
self.mapbox_limit = 0
self.segment_distance = 0
if abs(desired_target - self.previous_target) >= 1:
self.handle_limit_change(desired_source, desired_target, sm)
elif desired_source != self.source and abs(desired_target - self.target) < 1:
self.source = desired_source
else:
self.speed_limit_changed_timer = 0
self.unconfirmed_speed_limit = 0
def update_map_speed_limit(self, gps_position, v_ego):
if not gps_position:
return
self.map_speed_limit = params_memory.get_float("MapSpeedLimit")
next_map_speed_limit = json.loads(params_memory.get("NextMapSpeedLimit") or "{}")
self.next_speed_limit = next_map_speed_limit.get("speedlimit", 0)
if self.next_speed_limit:
current_latitude = gps_position.get("latitude")
current_longitude = gps_position.get("longitude")
next_latitude = next_map_speed_limit.get("latitude")
next_longitude = next_map_speed_limit.get("longitude")
distance_to_upcoming = calculate_distance_to_point(current_latitude * CV.DEG_TO_RAD, current_longitude * CV.DEG_TO_RAD, next_latitude * CV.DEG_TO_RAD, next_longitude * CV.DEG_TO_RAD)
if self.map_speed_limit < self.next_speed_limit:
max_lookahead = self.frogpilot_toggles.map_speed_lookahead_higher * v_ego
elif self.map_speed_limit > self.next_speed_limit:
max_lookahead = self.frogpilot_toggles.map_speed_lookahead_lower * v_ego
else:
max_lookahead = 0
if distance_to_upcoming < max_lookahead:
self.map_speed_limit = self.next_speed_limit
def update_override(self, v_cruise, v_cruise_diff, v_ego, v_ego_diff, sm):
self.override_slc = self.overridden_speed > self.target + self.offset > 0
self.override_slc |= sm["carState"].gasPressed and v_ego > self.target + self.offset > 0
self.override_slc &= sm["controlsState"].enabled
if self.override_slc:
if self.frogpilot_toggles.speed_limit_controller_override_manual:
if sm["carState"].gasPressed:
self.overridden_speed = max(v_ego + v_ego_diff, self.overridden_speed)
self.overridden_speed = float(np.clip(self.overridden_speed, self.target + self.offset, v_cruise + v_cruise_diff))
elif self.frogpilot_toggles.speed_limit_controller_override_set_speed:
self.overridden_speed = v_cruise + v_cruise_diff
self.source = "None"
else:
self.overridden_speed = 0