diff --git a/README.md b/README.md index dd6a361..d8fb2af 100644 --- a/README.md +++ b/README.md @@ -9,6 +9,14 @@ The application supports multiple sources like myrient and 1fichier. These sourc --- +## 🧰 Command-line usage (CLI) + +RGSX also offers a headless command-line interface to list platforms/games and download ROMs: + +- English guide: see `https://github.com/RetroGameSets/RGSX/blob/main/README_CLI.md` + +--- + ## ✨ Features - **Game downloads** : Support for ZIP files and handling of unsupported extensions based on EmulationStation's `es_systems.cfg` (and custom `es_systems_*.cfg` on Batocera). RGSX reads allowed extensions per system from these configs and will automatically extract archives when a system doesn't support them. diff --git a/README_CLI.md b/README_CLI.md new file mode 100644 index 0000000..6d3ddc1 --- /dev/null +++ b/README_CLI.md @@ -0,0 +1,159 @@ +# RGSX CLI — Guide d’utilisation + +Ce guide couvre toutes les commandes disponibles du CLI et fournit des exemples prêts à copier (Windows PowerShell). + +## Prérequis +- Python installé et accessible (le projet utilise un mode headless; aucune fenêtre ne s’ouvrira). +- Exécuter depuis le dossier contenant `rgsx_cli.py`. + +## Syntaxe générale +Les options globales peuvent être placées avant ou après la sous-commande. + +- Forme 1: + ```powershell + python rgsx_cli.py [--verbose] [--force-update|-force-update] [options] + ``` +- Forme 2: + ```powershell + python rgsx_cli.py [options] [--verbose] [--force-update|-force-update] + ``` + +- `--verbose` active les logs détaillés (DEBUG) sur la sortie standard d’erreur. +- `--force-update` (ou `-force-update`) purge les données locales et force le re-téléchargement du pack de données (systems_list, games/*.json, images). + +Lorsque les données sources sont manquantes, le CLI télécharge et extrait automatiquement le pack (avec une barre de progression). + +## Commandes + +### 1) platforms — lister les plateformes +- Options: + - `--json`: sortie JSON (objets `{ name, folder }`). + +Exemples: +```powershell +python rgsx_cli.py platforms +python rgsx_cli.py platforms --json +python rgsx_cli.py --verbose platforms +python rgsx_cli.py platforms --verbose +``` + +Sortie texte: une ligne par plateforme, au format `NomDossier`. + +### 2) games — lister les jeux d’une plateforme +- Options: + - `--platform ` (ex: `n64` ou "Nintendo 64"). + - `--search `: filtre par sous-chaîne dans le nom du jeu. + +Exemples: +```powershell +python rgsx_cli.py games --platform n64 +python rgsx_cli.py games --platform "Nintendo 64" --search zelda +python rgsx_cli.py games --platform n64 --verbose +``` + +Remarques: +- La plateforme est résolue par nom affiché (platform_name) ou par dossier (folder), sans tenir compte de la casse. + +### 3) download — télécharger un jeu +- Options: + - `--platform ` + - `--game ""` + - `--force`: ignorer l’avertissement si l’extension du fichier n’est pas répertoriée comme supportée pour la plateforme. + +Exemples: +```powershell +# Titre exact +python rgsx_cli.py download --platform n64 --game "Legend of Zelda, The - Ocarina of Time (USA) (Beta).zip" + +# Correspondance partielle +# Si aucun titre exact n’est trouvé, le CLI n’autosélectionne plus. Il affiche des correspondances possibles. +python rgsx_cli.py download --platform n64 --game "Ocarina of Time (Beta)" +# ➜ Le CLI proposera une liste de titres potentiels (à relancer ensuite avec le titre exact). + +Mode interactif par défaut: +- Si aucun titre exact n’est trouvé et que vous êtes dans un terminal interactif (TTY), une liste numérotée s’affiche automatiquement pour choisir un match et lancer le téléchargement. + +# Forcer si l’extension semble non supportée (ex: .rar) +python rgsx_cli.py download --platform snes --game "pack_roms.rar" --force + +# Verbose positionné après la sous-commande +python rgsx_cli.py download --platform n64 --game "Legend of Zelda, The - Ocarina of Time (USA) (Beta).zip" --verbose +``` + +Pendant le téléchargement, une progression en pourcentage, taille (MB) et vitesse (MB/s) s’affiche. Le résultat final est également écrit dans l’historique. + +Notes: +- Les ROMs sont enregistrées dans le dossier de la plateforme correspondante (ex: `R:\roms\n64`). +- Si le fichier est une archive (zip/rar) et que la plateforme ne supporte pas l’extension, un avertissement est affiché (vous pouvez utiliser `--force`). + +### 4) history — afficher l’historique +- Options: + - `--tail `: n dernières entrées (défaut: 50) + - `--json`: sortie JSON + +Exemples: +```powershell +python rgsx_cli.py history +python rgsx_cli.py history --tail 20 +python rgsx_cli.py history --json +``` + +### 5) clear-history — vider l’historique +Exemple: +```powershell +python rgsx_cli.py clear-history +``` + +### Option globale: --force-update — purge + re-téléchargement des données +- Supprime `systems_list.json`, le dossier `games/` et `images/`, puis télécharge/extrait à nouveau le pack de données. + +Exemples: +```powershell +# Sans sous-commande: purge + re-téléchargement puis sortie +python rgsx_cli.py --force-update + +# Placé après une sous-commande (accepté aussi) +python rgsx_cli.py platforms --force-update +``` + +## Comportements et conseils +- Résolution de plateforme: par nom affiché ou dossier, insensible à la casse. Pour la commande `games` et `download`, une recherche par sous-chaîne est utilisée si la correspondance exacte n’est pas trouvée. +- Logs `--verbose`: principalement utiles lors des téléchargements/extractions; émis en DEBUG. +- Téléchargement de données manquantes: automatique avec progression harmonisée (téléchargement puis extraction). +- Codes de sortie (indicatif): + - `0`: succès + - `1`: échec du téléchargement/erreur générique + - `2`: plateforme introuvable + - `3`: jeu introuvable + - `4`: extension non supportée (sans `--force`) + +## Exemples rapides (copier-coller) +```powershell +# Lister plateformes (texte) +python rgsx_cli.py platforms + +# Lister plateformes (JSON) +python rgsx_cli.py platforms --json + +# Lister jeux N64 avec filtre +python rgsx_cli.py games --platform n64 --search zelda + +# Télécharger un jeu N64 (titre exact) +python rgsx_cli.py download --platform n64 --game "Legend of Zelda, The - Ocarina of Time (USA) (Beta).zip" + +# Télécharger un jeu N64 (titre aproximatif) +python rgsx_cli.py download --platform n64 --game "Ocarina of Time" +Resultat (exemple) : +No exact result found for this game: Ocarina of Time +Select a match to download: + 1. Legend of Zelda, The - Ocarina of Time (Europe) (Beta) (2003-02-13) (GameCube).zip + 2. Legend of Zelda, The - Ocarina of Time (Europe) (Beta) (2003-02-21) (GameCube) (Debug).zip + ... + 15. F-Zero X (USA) (Beta) (The Legend of Zelda - Ocarina of Time leftover data).zip + +# Voir l’historique (20 dernières entrées) +python rgsx_cli.py history --tail 20 + +# Purger et recharger les données de listes des systèmes et des jeux +python rgsx_cli.py --force-update +``` diff --git a/README_CLI_EN.md b/README_CLI_EN.md new file mode 100644 index 0000000..b2f5426 --- /dev/null +++ b/README_CLI_EN.md @@ -0,0 +1,152 @@ +# RGSX CLI — Usage Guide + +This guide covers all available CLI commands with copy-ready Windows PowerShell examples. + +## Prerequisites +- Python installed and on PATH (the app runs in headless mode; no window will open). +- Run commands from the folder that contains `rgsx_cli.py`. + +## General syntax +Global options can be placed before or after the subcommand. + +- Form 1: + ```powershell + python rgsx_cli.py [--verbose] [--force-update|-force-update] [options] + ``` +- Form 2: + ```powershell + python rgsx_cli.py [options] [--verbose] [--force-update|-force-update] + ``` + +- `--verbose` enables detailed logs (DEBUG) on stderr. +- `--force-update` (or `-force-update`) purges local data and re-downloads the data pack (systems_list, games/*.json, images). + +When source data is missing, the CLI will automatically download and extract the data pack (with progress). + +## Commands + +### 1) platforms — list platforms +- Options: + - `--json`: JSON output (objects `{ name, folder }`). + +Examples: +```powershell +python rgsx_cli.py platforms +python rgsx_cli.py platforms --json +python rgsx_cli.py --verbose platforms +python rgsx_cli.py platforms --verbose +``` + +Text output: one line per platform, formatted as `NameFolder`. + +### 2) games — list games for a platform +- Options: + - `--platform ` (e.g., `n64` or "Nintendo 64"). + - `--search `: filter by substring in game title. + +Examples: +```powershell +python rgsx_cli.py games --platform n64 +python rgsx_cli.py games --platform "Nintendo 64" --search zelda +python rgsx_cli.py games --platform n64 --verbose +``` + +Notes: +- The platform is resolved by display name (platform_name) or folder, case-insensitively. + +### 3) download — download a game +- Options: + - `--platform ` + - `--game ""` + - `--force`: ignore unsupported-extension warning for the platform. + +Examples: +```powershell +# Exact title +python rgsx_cli.py download --platform n64 --game "Legend of Zelda, The - Ocarina of Time (USA) (Beta).zip" + +# Partial match +# If no exact title is found, the CLI no longer auto-selects; it displays suggestions. +python rgsx_cli.py download --platform n64 --game "Ocarina of Time (Beta)" +# ➜ The CLI shows a list of candidates (then run again with the exact title). + +Interactive mode by default: +- If no exact title is found and you are in an interactive terminal (TTY), a numbered list is shown automatically so you can pick and start the download. + +# Force if the file extension seems unsupported (e.g., .rar) +python rgsx_cli.py download --platform snes --game "pack_roms.rar" --force + +# Verbose placed after the subcommand +python rgsx_cli.py download --platform n64 --game "Legend of Zelda, The - Ocarina of Time (USA) (Beta).zip" --verbose +``` + +During download, progress %, size (MB) and speed (MB/s) are shown. The final result is also written to history. + +Notes: +- ROMs are saved into the corresponding platform directory (e.g., `R:\roms\n64`). +- If the file is an archive (zip/rar) and the platform doesn’t support that extension, a warning is shown (you can use `--force`). + +### 4) history — show history +- Options: + - `--tail `: last N entries (default: 50) + - `--json`: JSON output + +Examples: +```powershell +python rgsx_cli.py history +python rgsx_cli.py history --tail 20 +python rgsx_cli.py history --json +``` + +### 5) clear-history — clear history +Example: +```powershell +python rgsx_cli.py clear-history +``` + +### Global option: --force-update — purge + re-download data +- Removes `systems_list.json`, the `games/` and `images/` folders, then downloads/extracts the data pack again. + +Examples: +```powershell +# Without subcommand: purge + re-download then exit +python rgsx_cli.py --force-update + +# Placed after a subcommand (also accepted) +python rgsx_cli.py platforms --force-update +``` + +## Behavior and tips +- Platform resolution: by display name or folder, case-insensitive. For `games` and `download`, if no exact match is found a search-like suggestion list is shown. +- `--verbose` logs: most useful during downloads/extraction; printed at DEBUG level. +- Missing data download: automatic, with consistent progress (download then extraction). +- Exit codes (indicative): + - `0`: success + - `1`: download failure/generic error + - `2`: platform not found + - `3`: game not found + - `4`: unsupported extension (without `--force`) + +## Quick examples (copy/paste) +```powershell +# List platforms (text) +python rgsx_cli.py platforms + +# List platforms (JSON) +python rgsx_cli.py platforms --json + +# List N64 games with filter +python rgsx_cli.py games --platform n64 --search zelda + +# Download an N64 game (exact title) +python rgsx_cli.py download --platform n64 --game "Legend of Zelda, The - Ocarina of Time (USA) (Beta).zip" + +# Download with approximate title (suggestions + interactive pick) +python rgsx_cli.py download --platform n64 --game "Ocarina of Time" + +# View last 20 history entries +python rgsx_cli.py history --tail 20 + +# Purge and refresh data pack +python rgsx_cli.py --force-update +``` diff --git a/README_FR.md b/README_FR.md index a0e2252..8df649d 100644 --- a/README_FR.md +++ b/README_FR.md @@ -8,6 +8,14 @@ L'application prend en charge plusieurs sources comme myrient, 1fichier. Ces sou --- +## 🧰 Utilisation en ligne de commande (CLI) + +RGSX propose aussi une interface en ligne de commande (sans interface graphique) pour lister les plateformes/jeux et télécharger des ROMs : + +- Guide FR: voir `https://github.com/RetroGameSets/RGSX/blob/main/README_CLI.md` + +--- + ## ✨ Fonctionnalités - **Téléchargement de jeux** : Prise en charge des fichiers ZIP et gestion des extensions non supportées à partir du fichier `es_systems.cfg` d'EmulationStation (et des `es_systems_*.cfg` personnalisés sur Batocera). RGSX lit les extensions autorisées par système depuis ces configurations et extrait automatiquement les archives si le système ne les supporte pas. diff --git a/ports/RGSX/config.py b/ports/RGSX/config.py index b28829c..237a007 100644 --- a/ports/RGSX/config.py +++ b/ports/RGSX/config.py @@ -1,16 +1,26 @@ -import pygame # type: ignore import os import logging import platform +# Headless mode for CLI: set env RGSX_HEADLESS=1 to avoid pygame and noisy prints +HEADLESS = os.environ.get("RGSX_HEADLESS") == "1" +try: + if not HEADLESS: + import pygame # type: ignore + else: + pygame = None # type: ignore +except Exception: + pygame = None # type: ignore + # Version actuelle de l'application app_version = "2.2.0.3" def get_operating_system(): """Renvoie le nom du système d'exploitation.""" return platform.system() -#log dans la console le système d'exploitation -print(f"Système d'exploitation : {get_operating_system()}") +#log dans la console le système d'exploitation (désactivé en headless) +if not HEADLESS: + print(f"Système d'exploitation : {get_operating_system()}") def get_application_root(): @@ -20,7 +30,8 @@ def get_application_root(): current_file = os.path.abspath(__file__) # Remonter au dossier parent de config.py (par exemple, dossier de l'application) app_root = os.path.dirname(os.path.dirname(current_file)) - print(f"Dossier de l'application : {app_root}") + if not HEADLESS: + print(f"Dossier de l'application : {app_root}") return app_root except NameError: # Si __file__ n'est pas défini (par exemple, exécution dans un REPL) @@ -35,7 +46,8 @@ def get_system_root(): current_path = os.path.abspath(__file__) drive, _ = os.path.splitdrive(current_path) system_root = drive + os.sep - print(f"Dossier racine du système : {system_root}") + if not HEADLESS: + print(f"Dossier racine du système : {system_root}") return system_root elif OPERATING_SYSTEM == "Linux": # tester si c'est batocera : @@ -48,7 +60,8 @@ def get_system_root(): parent_dir = os.path.dirname(current_dir) if os.path.basename(parent_dir) == "userdata": # Vérifier si le parent est userdata system_root = parent_dir - print(f"Dossier racine du système : {system_root}") + if not HEADLESS: + print(f"Dossier racine du système : {system_root}") return system_root current_dir = parent_dir # Si userdata n'est pas trouvé, retourner / @@ -109,25 +122,16 @@ xdvdfs_download_exe = os.path.join(OTA_SERVER_URL, "xdvdfs.exe") xdvdfs_download_linux = os.path.join(OTA_SERVER_URL, "xdvdfs") -# Print des chemins pour debug -print(f"RETROBAT_DATA_FOLDER: {RETROBAT_DATA_FOLDER}") -print(f"ROMS_FOLDER: {ROMS_FOLDER}") -print(f"SAVE_FOLDER: {SAVE_FOLDER}") -print(f"RGSX APP_FOLDER: {APP_FOLDER}") -print(f"RGSX LOGS_FOLDER: {log_dir}") -print(f"RGSX SETTINGS PATH: {RGSX_SETTINGS_PATH}") -print(f"GAMELISTXML: {GAMELISTXML}") -print(f"GAMELISTXML_WINDOWS: {GAMELISTXML_WINDOWS}") -print(f"UPDATE_FOLDER: {UPDATE_FOLDER}") -print(f"LANGUAGES_FOLDER: {LANGUAGES_FOLDER}") -print(f"JSON_EXTENSIONS: {JSON_EXTENSIONS}") -print(f"MUSIC_FOLDER: {MUSIC_FOLDER}") -print(f"IMAGES_FOLDER: {IMAGES_FOLDER}") -print(f"GAMES_FOLDER: {GAMES_FOLDER}") -print(f"SOURCES_FILE: {SOURCES_FILE}") -print(f"CONTROLS_CONFIG_PATH: {CONTROLS_CONFIG_PATH}") -print(f"HISTORY_PATH: {HISTORY_PATH}") - +if not HEADLESS: + # Print des chemins pour debug + print(f"ROMS_FOLDER: {ROMS_FOLDER}") + print(f"SAVE_FOLDER: {SAVE_FOLDER}") + print(f"RGSX LOGS_FOLDER: {log_dir}") + print(f"RGSX SETTINGS PATH: {RGSX_SETTINGS_PATH}") + print(f"JSON_EXTENSIONS: {JSON_EXTENSIONS}") + print(f"IMAGES_FOLDER: {IMAGES_FOLDER}") + print(f"GAMES_FOLDER: {GAMES_FOLDER}") + print(f"SOURCES_FILE: {SOURCES_FILE}") # Constantes pour la répétition automatique dans pause_menu @@ -199,7 +203,7 @@ selected_key = (0, 0) # Position du curseur dans le clavier virtuel redownload_confirm_selection = 0 # Sélection pour la confirmation de redownload popup_message = "" # Message à afficher dans les popups popup_timer = 0 # Temps restant pour le popup en millisecondes (0 = inactif) -last_frame_time = pygame.time.get_ticks() +last_frame_time = pygame.time.get_ticks() if pygame is not None else 0 current_music_name = None music_popup_start_time = 0 selected_games = set() # Indices des jeux sélectionnés pour un téléchargement multiple (menu game) @@ -282,6 +286,8 @@ update_checked = False def validate_resolution(): """Valide la résolution de l'écran par rapport aux capacités de l'écran.""" + if pygame is None: + return SCREEN_WIDTH, SCREEN_HEIGHT display_info = pygame.display.Info() if SCREEN_WIDTH > display_info.current_w or SCREEN_HEIGHT > display_info.current_h: logger.warning(f"Résolution {SCREEN_WIDTH}x{SCREEN_HEIGHT} dépasse les limites de l'écran") diff --git a/ports/RGSX/language.py b/ports/RGSX/language.py index 65675f6..5728591 100644 --- a/ports/RGSX/language.py +++ b/ports/RGSX/language.py @@ -1,8 +1,15 @@ import os import json -import pygame #type: ignore import logging import config +from config import HEADLESS +try: + if not HEADLESS: + import pygame # type: ignore + else: + pygame = None # type: ignore +except Exception: + pygame = None # type: ignore import subprocess from rgsx_settings import load_rgsx_settings, save_rgsx_settings diff --git a/ports/RGSX/network.py b/ports/RGSX/network.py index cf7db4c..a0c4ede 100644 --- a/ports/RGSX/network.py +++ b/ports/RGSX/network.py @@ -3,10 +3,17 @@ import subprocess import os import sys import threading -import pygame # type: ignore import zipfile import asyncio import config +from config import HEADLESS +try: + if not HEADLESS: + import pygame # type: ignore + else: + pygame = None # type: ignore +except Exception: + pygame = None # type: ignore from config import OTA_VERSION_ENDPOINT,APP_FOLDER, UPDATE_FOLDER, OTA_UPDATE_ZIP from utils import sanitize_filename, extract_zip, extract_rar, load_api_key_1fichier, normalize_platform_name from history import save_history @@ -189,7 +196,7 @@ async def check_for_updates(): config.popup_message = config.update_result_message config.popup_timer = 2000 config.update_result_error = False - config.update_result_start_time = pygame.time.get_ticks() + config.update_result_start_time = pygame.time.get_ticks() if pygame is not None else 0 config.needs_redraw = True logger.debug(f"Affichage de la popup de mise à jour réussie, redémarrage imminent") @@ -211,7 +218,7 @@ async def check_for_updates(): config.popup_message = config.update_result_message config.popup_timer = 5000 config.update_result_error = True - config.update_result_start_time = pygame.time.get_ticks() + config.update_result_start_time = pygame.time.get_ticks() if pygame is not None else 0 config.needs_redraw = True return False, _("network_check_update_error").format(str(e)) @@ -534,6 +541,24 @@ async def download_rom(url, platform, game_name, is_zip_non_supported=False, tas logger.error(f"Erreur mise à jour progression: {str(e)}") thread.join() + # Drain any remaining final message to ensure history is saved + try: + task_queue = progress_queues.get(task_id) + if task_queue: + while not task_queue.empty(): + data = task_queue.get() + if isinstance(data[1], bool): + success, message = data[1], data[2] + if isinstance(config.history, list): + for entry in config.history: + if "url" in entry and entry["url"] == url and entry["status"] in ["downloading", "Téléchargement", "Extracting"]: + entry["status"] = "Download_OK" if success else "Erreur" + entry["progress"] = 100 if success else 0 + entry["message"] = message + save_history(config.history) + break + except Exception: + pass # Nettoyer la queue if task_id in progress_queues: del progress_queues[task_id] @@ -801,6 +826,24 @@ async def download_from_1fichier(url, platform, game_name, is_zip_non_supported= logger.debug(f"Fin boucle de progression, attente fin thread pour task_id={task_id}") thread.join() logger.debug(f"Thread terminé, nettoyage queue pour task_id={task_id}") + # Drain any remaining final message to ensure history is saved + try: + task_queue = progress_queues.get(task_id) + if task_queue: + while not task_queue.empty(): + data = task_queue.get() + if isinstance(data[1], bool): + success, message = data[1], data[2] + if isinstance(config.history, list): + for entry in config.history: + if "url" in entry and entry["url"] == url and entry["status"] in ["downloading", "Téléchargement", "Extracting"]: + entry["status"] = "Download_OK" if success else "Erreur" + entry["progress"] = 100 if success else 0 + entry["message"] = message + save_history(config.history) + break + except Exception: + pass # Nettoyer la queue if task_id in progress_queues: del progress_queues[task_id] diff --git a/ports/RGSX/rgsx_cli.py b/ports/RGSX/rgsx_cli.py new file mode 100644 index 0000000..ec84979 --- /dev/null +++ b/ports/RGSX/rgsx_cli.py @@ -0,0 +1,549 @@ +#!/usr/bin/env python3 +import os +# Force headless mode before any project imports +os.environ.setdefault("RGSX_HEADLESS", "1") +import sys +import argparse +import asyncio +import json +import logging +import requests +import time +import zipfile +import shutil +import re + +# IMPORTANT: Avoid importing display/pygame modules for headless mode +import config # paths, settings, SAVE_FOLDER, etc. +import network as network_mod # for progress_queues access +from utils import load_sources, load_games, is_extension_supported, load_extensions_json, sanitize_filename, extract_zip_data +from history import load_history, save_history, add_to_history +from network import download_rom, download_from_1fichier, is_1fichier_url +from rgsx_settings import get_sources_zip_url + +logger = logging.getLogger("rgsx.cli") + + +def setup_logging(verbose: bool): + level = logging.DEBUG if verbose else logging.WARNING + logging.basicConfig(level=level, format='%(levelname)s: %(message)s') + + +def ensure_data_present(verbose: bool = False): + """Ensure systems list and games data exist; if missing, download OTA data ZIP and extract it.""" + # If systems_list exists and games folder has some json files, nothing to do + has_sources = os.path.exists(config.SOURCES_FILE) + has_games = os.path.isdir(config.GAMES_FOLDER) and any( + f.lower().endswith('.json') for f in os.listdir(config.GAMES_FOLDER) + ) + if has_sources and has_games: + return True + + url = get_sources_zip_url(config.OTA_data_ZIP) + if not url: + print("No sources URL configured; cannot auto-download data.", file=sys.stderr) + return False + + zip_path = os.path.join(config.SAVE_FOLDER, "data_download.zip") + os.makedirs(config.SAVE_FOLDER, exist_ok=True) + headers = {"User-Agent": "Mozilla/5.0"} + # Always show progress when we're in the 'missing data' path + show = True or verbose + print("Source data not found, downloading...") + print(f"Downloading data from {url}...") + try: + with requests.get(url, stream=True, headers=headers, timeout=60) as r: + r.raise_for_status() + total = int(r.headers.get('content-length', 0)) + downloaded = 0 + last_t = time.time() + last_d = 0 + last_line = "" + with open(zip_path, 'wb') as f: + for chunk in r.iter_content(chunk_size=8192): + if not chunk: + continue + f.write(chunk) + downloaded += len(chunk) + if show and total: + now = time.time() + dt = max(1e-6, now - last_t) + delta = downloaded - last_d + speed = delta / dt / (1024*1024) + pct = int(downloaded/total*100) + mb = downloaded/(1024*1024) + tot = total/(1024*1024) + line = f"Downloading data: {pct:3d}% ({mb:.1f}/{tot:.1f} MB) @ {speed:.1f} MB/s" + if line != last_line: + print("\r" + line, end="", flush=True) + last_line = line + last_t = now + last_d = downloaded + if show: + print() + except Exception as e: + print(f"Failed to download data: {e}", file=sys.stderr) + return False + + # Extract + if show: + print("Extracting data...") + try: + # Custom extraction with progress + total_size = 0 + with zipfile.ZipFile(zip_path, 'r') as zip_ref: + total_size = sum(info.file_size for info in zip_ref.infolist() if not info.is_dir()) + extracted = 0 + chunk = 2048 + last_line = "" + with zipfile.ZipFile(zip_path, 'r') as zip_ref: + for info in zip_ref.infolist(): + if info.is_dir(): + continue + file_path = os.path.join(config.SAVE_FOLDER, info.filename) + os.makedirs(os.path.dirname(file_path), exist_ok=True) + with zip_ref.open(info) as src, open(file_path, 'wb') as dst: + remaining = info.file_size + while remaining > 0: + buf = src.read(min(chunk, remaining)) + if not buf: + break + dst.write(buf) + remaining -= len(buf) + extracted += len(buf) + if show and total_size: + pct = int(extracted/total_size*100) + mb = extracted/(1024*1024) + tot = total_size/(1024*1024) + line = f"Extracting data: {pct:3d}% ({mb:.1f}/{tot:.1f} MB)" + if line != last_line: + print("\r" + line, end="", flush=True) + last_line = line + try: + os.chmod(file_path, 0o644) + except Exception: + pass + if show and last_line: + print() + ok, msg = True, "OK" + except Exception as ee: + ok, msg = False, str(ee) + try: + if os.path.exists(zip_path): + os.remove(zip_path) + except Exception: + pass + if not ok: + print(f"Failed to extract data: {msg}", file=sys.stderr) + return False + if show: + print("Data downloaded and extracted.") + return True + + +def cmd_platforms(args): + ensure_data_present(getattr(args, 'verbose', False)) + sources = load_sources() + items = [] + for s in sources: + name = s.get("platform_name") or s.get("name") or s.get("platform") or "" + folder = s.get("folder") or s.get("dossier") or "" + if name: + items.append({"name": name, "folder": folder}) + if getattr(args, 'json', False): + print(json.dumps(items, ensure_ascii=False, indent=2)) + else: + for it in items: + # name TAB folder (folder may be empty for BIOS/virtual) + print(f"{it['name']}\t{it['folder']}") + + +def _resolve_platform(sources, platform_name: str): + # match by display name or key, case-insensitive + pn = platform_name.strip().lower() + for s in sources: + display = (s.get("platform_name") or s.get("name") or "").lower() + key = (s.get("platform") or s.get("folder") or "").lower() + if pn == display or pn == key: + return s + # fallback: substring + for s in sources: + display = (s.get("platform_name") or s.get("name") or "").lower() + if pn in display: + return s + return None + + +def cmd_games(args): + ensure_data_present(getattr(args, 'verbose', False)) + sources = load_sources() + platform = _resolve_platform(sources, args.platform) + if not platform: + print(f"Platform not found: {args.platform}", file=sys.stderr) + sys.exit(2) + platform_id = ( + platform.get('platform_name') + or platform.get('platform') + or platform.get('folder') + or args.platform + ) + games = load_games(platform_id) + if args.search: + q = args.search.lower() + games = [g for g in games if q in (g[0] or '').lower()] + for g in games: + # games items can be (name, url) or (name, url, size) + title = g[0] if isinstance(g, (list, tuple)) and g else str(g) + print(title) + + +def cmd_history(args): + hist = load_history() + if args.json: + print(json.dumps(hist, ensure_ascii=False, indent=2)) + else: + for e in hist[-args.tail:]: + print(f"[{e.get('status')}] {e.get('platform')} - {e.get('game_name')} ({e.get('progress','?')}%) {e.get('message','')}") + + +def cmd_clear_history(args): + save_history([]) + print("History cleared") + + +async def _run_download_with_progress(url: str, platform_name: str, game_name: str, force_extract_zip: bool = False): + """Run download and display live progress in the terminal.""" + task_id = f"cli-{os.getpid()}" + # Start download coroutine + coro = download_from_1fichier(url, platform_name, game_name, force_extract_zip, task_id) if is_1fichier_url(url) else download_rom(url, platform_name, game_name, force_extract_zip, task_id) + task = asyncio.create_task(coro) + + last_line = "" + def print_progress(pct: int, speed_mb_s: float | None, downloaded: int, total: int): + nonlocal last_line + # Build a concise one-line status + total_mb = total / (1024*1024) if total else 0 + dl_mb = downloaded / (1024*1024) + spd = f" @ {speed_mb_s:.1f} MB/s" if speed_mb_s is not None and speed_mb_s > 0 else "" + line = f"Downloading: {pct:3d}% ({dl_mb:.1f}/{total_mb:.1f} MB){spd}" + # Avoid overly chatty output + if line != last_line: + print("\r" + line, end="", flush=True) + last_line = line + + # Poll shared in-memory history for progress (non-intrusive) + while not task.done(): + try: + if isinstance(config.history, list): + for e in config.history: + if e.get('url') == url and e.get('status') in ("downloading", "Téléchargement", "Extracting"): + downloaded = int(e.get('downloaded_size') or 0) + total = int(e.get('total_size') or 0) + speed = e.get('speed') + if total > 0: + pct = int(downloaded/total*100) + else: + pct = 0 + # speed might be None or 0 when unknown + print_progress(pct, float(speed) if isinstance(speed, (int, float)) else None, downloaded, total) + break + except Exception: + pass + await asyncio.sleep(0.2) + + success, message = await task + if last_line: + # End the progress line + print() + if success: + print(message or "Download completed") + return 0 + else: + print(message or "Download failed", file=sys.stderr) + return 1 + + +def cmd_download(args): + ensure_data_present(getattr(args, 'verbose', False)) + sources = load_sources() + platform = _resolve_platform(sources, args.platform) + if not platform: + print(f"Platform not found: {args.platform}", file=sys.stderr) + sys.exit(2) + platform_id = ( + platform.get('platform_name') + or platform.get('platform') + or platform.get('folder') + or args.platform + ) + games = load_games(platform_id) + query_raw = args.game.strip() + + def _strip_ext(name: str) -> str: + try: + base, _ = os.path.splitext(name) + return base + except Exception: + return name + + def _tokens(s: str) -> list[str]: + return re.findall(r"[a-z0-9]+", s.lower()) + + def _game_title(g) -> str | None: + return g[0] if isinstance(g, (list, tuple)) and g else None + + def _game_url(g) -> str | None: + return g[1] if isinstance(g, (list, tuple)) and len(g) > 1 else None + + # 1) Exact match (case-insensitive), with and without extension + match = None + q_lower = query_raw.lower() + q_no_ext = _strip_ext(query_raw).lower() + for g in games: + title = _game_title(g) + if not title: + continue + t_lower = title.strip().lower() + if t_lower == q_lower or _strip_ext(t_lower) == q_no_ext: + match = (title, _game_url(g)) + break + + # Si pas d'exact, ne pas auto-sélectionner; proposer des correspondances possibles + suggestions = [] # (priority, score, title, url) + if not match: + # 2) Sous-chaîne sur le titre (ou sans extension) + for g in games: + title = _game_title(g) + if not title: + continue + t_lower = title.lower() + t_no_ext = _strip_ext(t_lower) + pos_full = t_lower.find(q_lower) + pos_noext = t_no_ext.find(q_no_ext) + if pos_full != -1 or (q_no_ext and pos_noext != -1): + # priorité 0 = sous-chaîne; score = position trouvée (plus petit est mieux) + pos = pos_full if pos_full != -1 else pos_noext + suggestions.append((0, max(0, pos), title, _game_url(g))) + + # 3) Tokens en ordre non-contigu, avec score de proximité + def ordered_gap_score(qt: list[str], tt: list[str]): + pos = [] + start = 0 + for tok in qt: + try: + i = next(i for i in range(start, len(tt)) if tt[i] == tok) + except StopIteration: + return None + pos.append(i) + start = i + 1 + gap = (pos[-1] - pos[0]) - (len(qt) - 1) + return max(0, gap) + + q_tokens = _tokens(query_raw) + if q_tokens: + for g in games: + title = _game_title(g) + if not title: + continue + tt = _tokens(title) + score = ordered_gap_score(q_tokens, tt) + if score is not None: + suggestions.append((1, score, title, _game_url(g))) + + # 4) Tokens présents (ordre libre) + if q_tokens: + for g in games: + title = _game_title(g) + if not title: + continue + t_tokens = set(_tokens(title)) + if all(tok in t_tokens for tok in q_tokens): + suggestions.append((2, len(t_tokens), title, _game_url(g))) + + # Dédupliquer en gardant la meilleure (priorité/score) pour chaque titre + best_by_title = {} + for prio, score, title, url in suggestions: + key = title.lower() + cur = best_by_title.get(key) + if cur is None or (prio, score) < (cur[0], cur[1]): + best_by_title[key] = (prio, score, title, url) + suggestions = sorted(best_by_title.values(), key=lambda x: (x[0], x[1], x[2].lower())) + if not match: + # Afficher les correspondances possibles, et en mode interactif proposer un choix + print(f"No exact result found for this game: {args.game}") + if suggestions: + limit = 20 + shown = suggestions[:limit] + # Mode interactif par défaut si TTY détecté, ou si --interactive explicite + interactive = False + try: + interactive = bool(getattr(args, 'interactive', False) or sys.stdin.isatty()) + except Exception: + interactive = bool(getattr(args, 'interactive', False)) + if interactive: + print("Select a match to download:") + for i, s in enumerate(shown, start=1): + print(f" {i}. {s[2]}") + if len(suggestions) > limit: + print(f" ... and {len(suggestions) - limit} more not shown") + try: + choice = input("Enter number (or press Enter to cancel): ").strip() + except EOFError: + choice = "" + if choice: + try: + idx = int(choice) + if 1 <= idx <= len(shown): + sel = shown[idx-1] + match = (sel[2], sel[3]) + except Exception: + pass + if not match: + print("Here are potential matches (use the exact title with --game):") + for i, s in enumerate(shown, start=1): + print(f" {i}. {s[2]}") + if len(suggestions) > limit: + print(f" ... and {len(suggestions) - limit} more") + print("Tip: list games with: python rgsx_cli.py games --platform \"%s\" --search \"%s\"" % (args.platform, query_raw)) + sys.exit(3) + else: + print("No similar titles found.") + print("Tip: list games with: python rgsx_cli.py games --platform \"%s\" --search \"%s\"" % (args.platform, query_raw)) + sys.exit(3) + + title, url = match + # Determine if we should force ZIP extraction (only when we can safely check extensions) + is_zip_non_supported = False + exts = None + try: + if os.path.exists(config.JSON_EXTENSIONS) and os.path.getsize(config.JSON_EXTENSIONS) > 2: + exts = load_extensions_json() + except Exception: + exts = None + if exts is not None: + # If extension unsupported for this platform, either block or allow with --force + if not is_extension_supported(sanitize_filename(title), platform.get('platform') or '', exts): + import os as _os + ext = _os.path.splitext(title)[1].lower() + is_zip_non_supported = ext in ('.zip', '.rar') + if not args.force and not is_zip_non_supported: + print("Unsupported extension for this platform. Use --force to override.", file=sys.stderr) + sys.exit(4) + + # Add entry to history and run + hist = load_history() + hist.append({ + "platform": platform.get('platform_name') or platform.get('platform') or args.platform, + "game_name": title, + "status": "downloading", + "url": url, + "progress": 0, + "message": "Téléchargement en cours", + "timestamp": None, + }) + save_history(hist) + # Important: share the same list object with network module so it can update history in place + try: + config.history = hist + except Exception: + pass + + # Run download with live progress + exit_code = asyncio.run(_run_download_with_progress(url, platform_id, title, is_zip_non_supported)) + if exit_code != 0: + sys.exit(exit_code) + + +def build_parser(): + p = argparse.ArgumentParser(prog="rgsx-cli", description="RGSX headless CLI") + p.add_argument("--verbose", action="store_true", help="Verbose logging") + p.add_argument("--force-update", "-force-update", action="store_true", help="Purge data (games/images/systems_list) and redownload") + sub = p.add_subparsers(dest="cmd") + + sp = sub.add_parser("platforms", help="List available platforms") + sp.add_argument("--json", action="store_true", help="Output JSON with name and folder") + # Also accept global flags after the subcommand + sp.add_argument("--verbose", action="store_true", help="Verbose logging") + sp.add_argument("--force-update", "-force-update", action="store_true", help="Purge data (games/images/systems_list) and redownload") + sp.set_defaults(func=cmd_platforms) + + sg = sub.add_parser("games", help="List games for a platform") + sg.add_argument("--platform", required=True, help="Platform name or key") + sg.add_argument("--search", help="Filter by name contains") + # Also accept global flags after the subcommand + sg.add_argument("--verbose", action="store_true", help="Verbose logging") + sg.add_argument("--force-update", "-force-update", action="store_true", help="Purge data (games/images/systems_list) and redownload") + sg.set_defaults(func=cmd_games) + + sd = sub.add_parser("download", help="Download a game by title") + sd.add_argument("--platform", required=True) + sd.add_argument("--game", required=True) + sd.add_argument("--force", action="store_true", help="Override unsupported extension warning") + sd.add_argument("--interactive", "-i", action="store_true", help="Prompt to choose from matches when no exact title is found") + # Also accept global flags after the subcommand + sd.add_argument("--verbose", action="store_true", help="Verbose logging") + sd.add_argument("--force-update", "-force-update", action="store_true", help="Purge data (games/images/systems_list) and redownload") + sd.set_defaults(func=cmd_download) + + sh = sub.add_parser("history", help="Show recent history") + sh.add_argument("--tail", type=int, default=50, help="Last N entries") + sh.add_argument("--json", action="store_true") + # Also accept global flags after the subcommand + sh.add_argument("--verbose", action="store_true", help="Verbose logging") + sh.add_argument("--force-update", "-force-update", action="store_true", help="Purge data (games/images/systems_list) and redownload") + sh.set_defaults(func=cmd_history) + + sc = sub.add_parser("clear-history", help="Clear history") + # Also accept global flags after the subcommand + sc.add_argument("--verbose", action="store_true", help="Verbose logging") + sc.add_argument("--force-update", "-force-update", action="store_true", help="Purge data (games/images/systems_list) and redownload") + sc.set_defaults(func=cmd_clear_history) + + return p + + +def main(argv=None): + argv = argv or sys.argv[1:] + # Force headless mode for CLI + os.environ.setdefault("RGSX_HEADLESS", "1") + parser = build_parser() + args = parser.parse_args(argv) + setup_logging(args.verbose) + + # Ensure SAVE_FOLDER exists (for history/download outputs, etc.) + try: + os.makedirs(config.SAVE_FOLDER, exist_ok=True) + except Exception: + pass + + # Handle global force-update (can run without a subcommand) + if getattr(args, 'force_update', False): + # Purge + try: + if os.path.exists(config.SOURCES_FILE): + os.remove(config.SOURCES_FILE) + except Exception: + pass + try: + shutil.rmtree(config.GAMES_FOLDER, ignore_errors=True) + except Exception: + pass + try: + shutil.rmtree(config.IMAGES_FOLDER, ignore_errors=True) + except Exception: + pass + # Redownload + ok = ensure_data_present(verbose=True) + if not ok: + sys.exit(1) + # If no subcommand, exit now + if not getattr(args, 'cmd', None): + return + + # If a subcommand is provided, run it + if getattr(args, 'cmd', None): + args.func(args) + + +if __name__ == "__main__": + main() diff --git a/ports/RGSX/utils.py b/ports/RGSX/utils.py index 37dcf42..7592b14 100644 --- a/ports/RGSX/utils.py +++ b/ports/RGSX/utils.py @@ -1,5 +1,4 @@ import shutil -import pygame # type: ignore import re import json import os @@ -7,6 +6,14 @@ import logging import platform import subprocess import config +from config import HEADLESS +try: + if not HEADLESS: + import pygame # type: ignore + else: + pygame = None # type: ignore +except Exception: + pygame = None # type: ignore import glob import threading from rgsx_settings import load_rgsx_settings, save_rgsx_settings