openpilot/frogpilot/assets/download_functions.py
2025-11-01 12:00:00 -07:00

128 lines
4.6 KiB
Python

#!/usr/bin/env python3
import requests
import tempfile
from datetime import datetime
from pathlib import Path
from openpilot.frogpilot.common.frogpilot_utilities import delete_file, is_url_pingable
from openpilot.frogpilot.common.frogpilot_variables import RESOURCES_REPO, params_memory
GITHUB_URL = f"https://raw.githubusercontent.com/{RESOURCES_REPO}"
GITLAB_URL = f"https://gitlab.com/{RESOURCES_REPO}/-/raw"
def check_github_rate_limit(session):
try:
response = session.get("https://api.github.com/rate_limit", timeout=10)
response.raise_for_status()
rate_limit_info = response.json()
remaining = rate_limit_info["rate"]["remaining"]
print(f"GitHub API Requests Remaining: {remaining}")
if remaining > 0:
return True
reset_time = datetime.utcfromtimestamp(rate_limit_info["rate"]["reset"]).strftime("%Y-%m-%d %H:%M:%S")
print("GitHub rate limit reached")
print(f"GitHub Rate Limit Resets At (UTC): {reset_time}")
return False
except requests.exceptions.RequestException as exception:
print(f"Error checking GitHub rate limit: {exception}")
return False
def download_file(cancel_param, destination, progress_param, url, download_param, session, offset_bytes=0, total_bytes=0):
try:
destination.parent.mkdir(parents=True, exist_ok=True)
total_size = get_remote_file_size(url, session)
if total_size == 0:
if not url.endswith(".gif"):
handle_error(None, "Download invalid...", "Download invalid...", download_param, progress_param)
return
with session.get(url, stream=True, timeout=10) as response:
response.raise_for_status()
with tempfile.NamedTemporaryFile(delete=False, dir=destination.parent) as temp_file:
temp_file_path = Path(temp_file.name)
downloaded_size = 0
for chunk in response.iter_content(chunk_size=16384):
if params_memory.get_bool(cancel_param):
temp_file_path.unlink(missing_ok=True)
handle_error(None, "Download cancelled...", "Download cancelled...", download_param, progress_param)
return
if chunk:
temp_file.write(chunk)
downloaded_size += len(chunk)
if total_bytes:
overall_progress = (offset_bytes + downloaded_size) / total_bytes * 100
else:
overall_progress = downloaded_size / total_size * 100
if overall_progress != 100:
params_memory.put(progress_param, f"{overall_progress:.0f}%")
else:
params_memory.put(progress_param, "Verifying authenticity...")
temp_file_path.rename(destination)
except Exception as exception:
handle_request_error(exception, destination, download_param, progress_param)
def get_remote_file_size(url, session):
try:
response = session.head(url, headers={"Accept-Encoding": "identity"}, timeout=10)
response.raise_for_status()
return int(response.headers.get("Content-Length", 0))
except Exception as exception:
handle_request_error(exception, None, None, None)
return 0
def get_repository_url(session):
if is_url_pingable("https://github.com"):
if check_github_rate_limit(session):
return GITHUB_URL
if is_url_pingable("https://gitlab.com"):
return GITLAB_URL
return None
def handle_error(destination, error_message, error, download_param, progress_param):
if destination:
delete_file(destination)
if progress_param and "404" not in error_message:
print(f"Error occurred: {error}")
params_memory.put(progress_param, error_message)
params_memory.remove(download_param)
def handle_request_error(error, destination, download_param, progress_param):
error_map = {
requests.exceptions.ConnectionError: "Connection dropped",
requests.exceptions.HTTPError: lambda error: f"Server error ({error.response.status_code})" if error and getattr(error, "response", None) else "Server error",
requests.exceptions.RequestException: "Network request error. Check connection",
requests.exceptions.Timeout: "Download timed out",
}
error_message = error_map.get(type(error), "Unexpected error")
handle_error(destination, f"Failed: {error_message}", error, download_param, progress_param)
def verify_download(file_path, url, session):
remote_file_size = get_remote_file_size(url, session)
if remote_file_size == 0:
print(f"Error fetching remote size for {file_path}")
return False
if not file_path.is_file():
print(f"File not found: {file_path}")
return False
if remote_file_size != file_path.stat().st_size:
print(f"File size mismatch for {file_path}")
return False
return True