From 9466fb73ef7bdc25aef3a504ae794ef4b3d8a458 Mon Sep 17 00:00:00 2001 From: Klesh Wong Date: Thu, 5 Mar 2026 23:33:29 +0800 Subject: [PATCH] refactor: extract methods --- ss/cli.py | 222 ++++++++++++++++++++----------------- ss/core_manager.py | 99 +++++++++-------- ss/corecfg_manager.py | 50 +++------ ss/hook_manager.py | 12 +- ss/models.py | 19 ++-- ss/subscription_manager.py | 9 +- ss/utils.py | 49 ++++++++ 7 files changed, 258 insertions(+), 202 deletions(-) create mode 100644 ss/utils.py diff --git a/ss/cli.py b/ss/cli.py index d43f9a3..de71d8c 100644 --- a/ss/cli.py +++ b/ss/cli.py @@ -138,6 +138,124 @@ def create_parser() -> argparse.ArgumentParser: return parser +def handle_subscription_command(args, subscription_manager: SubscriptionManager, parser: argparse.ArgumentParser) -> None: + """Handle subscription related commands.""" + if not hasattr(args, 'subcommand') or not args.subcommand: + parser.parse_args(['subscription', '--help']) + return + + if args.subcommand == 'add': + subscription_manager.add_subscription(args.name, args.url) + elif args.subcommand == 'refresh': + subscription_manager.refresh_subscription(args.name) + elif args.subcommand == 'rm': + subscription_manager.delete_subscription(args.name) + elif args.subcommand == 'rename': + subscription_manager.rename_subscription(args.name, args.new_name) + elif args.subcommand == 'set-url': + subscription_manager.set_subscription_url(args.name, args.url) + elif args.subcommand == 'activate': + subscription_manager.activate_subscription(args.name) + elif args.subcommand == 'list': + subscription_manager.list_subscriptions() + elif args.subcommand == 'storage': + subscription_manager.show_storage_info() + else: + parser.parse_args(['subscription', '--help']) + + +def handle_core_command(args, core_manager: CoreManager, core_config_manager: CoreConfigManager, parser: argparse.ArgumentParser) -> None: + """Handle core/config/service related commands.""" + if not hasattr(args, 'core_command') or not args.core_command: + parser.parse_args(['core', '--help']) + return + + if args.core_command == 'update': + core_manager.update(version=args.version, force=args.force) + elif args.core_command == 'config': + handle_config_command(args, core_config_manager, parser) + elif args.core_command == 'service': + handle_service_command(args, core_manager, parser) + else: + parser.parse_args(['core', '--help']) + + +def handle_config_command(args, core_config_manager: CoreConfigManager, parser: argparse.ArgumentParser) -> None: + """Handle configuration commands.""" + if not hasattr(args, 'config_command') or not args.config_command: + parser.parse_args(['core', 'config', '--help']) + return + + if args.config_command == 'import': + core_config_manager.import_config(args.source) + elif args.config_command == 'export': + core_config_manager.export_config(args.destination) + elif args.config_command == 'edit': + core_config_manager.edit_config() + elif args.config_command == 'reset': + core_config_manager.reset_config() + elif args.config_command == 'show': + core_config_manager.show_config() + elif args.config_command == 'apply': + core_config_manager.apply() + else: + parser.parse_args(['core', 'config', '--help']) + + +def handle_service_command(args, core_manager: CoreManager, parser: argparse.ArgumentParser) -> None: + """Handle service commands.""" + if not hasattr(args, 'service_command') or not args.service_command: + parser.parse_args(['core', 'service', '--help']) + return + + if args.service_command == 'install': + success = core_manager.install_service( + service_name=args.name, + description=args.description + ) + if not success: + sys.exit(1) + elif args.service_command == 'uninstall': + success = core_manager.uninstall_service(service_name=args.name) + if not success: + sys.exit(1) + elif args.service_command == 'start': + success = core_manager.start_service(service_name=args.name) + if not success: + sys.exit(1) + elif args.service_command == 'stop': + success = core_manager.stop_service(service_name=args.name) + if not success: + sys.exit(1) + elif args.service_command == 'restart': + success = core_manager.restart_service(service_name=args.name) + if not success: + sys.exit(1) + elif args.service_command == 'status': + status = core_manager.get_service_status(service_name=args.name) + print(f"Service '{args.name}' status: {status}") + else: + parser.parse_args(['core', 'service', '--help']) + + +def handle_hook_command(args, hook_manager: HookManager, parser: argparse.ArgumentParser) -> None: + """Handle hook commands.""" + if not hasattr(args, 'hook_command') or not args.hook_command: + parser.parse_args(['hook', '--help']) + return + + if args.hook_command == 'init': + hook_manager.init() + elif args.hook_command == 'list': + hook_manager.list_hooks() + elif args.hook_command == 'edit': + hook_manager.edit(args.script) + elif args.hook_command == 'rm': + hook_manager.rm(args.script) + else: + parser.parse_args(['hook', '--help']) + + def main() -> None: """Main CLI entry point.""" parser = create_parser() @@ -155,109 +273,11 @@ def main() -> None: try: if args.command == 'subscription': - if not hasattr(args, 'subcommand') or not args.subcommand: - parser.parse_args(['subscription', '--help']) - return - - - if args.subcommand == 'add': - subscription_manager.add_subscription(args.name, args.url) - elif args.subcommand == 'refresh': - subscription_manager.refresh_subscription(args.name) - elif args.subcommand == 'rm': - subscription_manager.delete_subscription(args.name) - elif args.subcommand == 'rename': - subscription_manager.rename_subscription(args.name, args.new_name) - elif args.subcommand == 'set-url': - subscription_manager.set_subscription_url(args.name, args.url) - elif args.subcommand == 'activate': - subscription_manager.activate_subscription(args.name) - elif args.subcommand == 'list': - subscription_manager.list_subscriptions() - elif args.subcommand == 'storage': - subscription_manager.show_storage_info() - else: - parser.parse_args(['subscription', '--help']) - + handle_subscription_command(args, subscription_manager, parser) elif args.command == 'core': - if not hasattr(args, 'core_command') or not args.core_command: - parser.parse_args(['core', '--help']) - return - - if args.core_command == 'update': - core_manager.update(version=args.version, force=args.force) - elif args.core_command == 'config': - if not hasattr(args, 'config_command') or not args.config_command: - parser.parse_args(['core', 'config', '--help']) - return - - if args.config_command == 'import': - core_config_manager.import_config(args.source) - elif args.config_command == 'export': - core_config_manager.export_config(args.destination) - elif args.config_command == 'edit': - core_config_manager.edit_config() - elif args.config_command == 'reset': - core_config_manager.reset_config() - elif args.config_command == 'show': - core_config_manager.show_config() - elif args.config_command == 'apply': - core_config_manager.apply() - else: - parser.parse_args(['core', 'config', '--help']) - elif args.core_command == 'service': - if not hasattr(args, 'service_command') or not args.service_command: - parser.parse_args(['core', 'service', '--help']) - return - - if args.service_command == 'install': - success = core_manager.install_service( - service_name=args.name, - description=args.description - ) - if not success: - sys.exit(1) - elif args.service_command == 'uninstall': - success = core_manager.uninstall_service(service_name=args.name) - if not success: - sys.exit(1) - elif args.service_command == 'start': - success = core_manager.start_service(service_name=args.name) - if not success: - sys.exit(1) - elif args.service_command == 'stop': - success = core_manager.stop_service(service_name=args.name) - if not success: - sys.exit(1) - elif args.service_command == 'restart': - success = core_manager.restart_service(service_name=args.name) - if not success: - sys.exit(1) - elif args.service_command == 'status': - status = core_manager.get_service_status(service_name=args.name) - print(f"Service '{args.name}' status: {status}") - else: - parser.parse_args(['core', 'service', '--help']) - else: - parser.parse_args(['core', '--help']) - + handle_core_command(args, core_manager, core_config_manager, parser) elif args.command == 'hook': - if not hasattr(args, 'hook_command') or not args.hook_command: - parser.parse_args(['hook', '--help']) - return - - - if args.hook_command == 'init': - hook_manager.init() - elif args.hook_command == 'list': - hook_manager.list() - elif args.hook_command == 'edit': - hook_manager.edit(args.script) - elif args.hook_command == 'rm': - hook_manager.rm(args.script) - else: - parser.parse_args(['hook', '--help']) - + handle_hook_command(args, hook_manager, parser) else: parser.print_help() except KeyboardInterrupt: diff --git a/ss/core_manager.py b/ss/core_manager.py index bc2d8ec..87ebdff 100644 --- a/ss/core_manager.py +++ b/ss/core_manager.py @@ -25,6 +25,58 @@ class CoreManager: self.storage = core_config_manager.storage self.core_config_manager = core_config_manager + def _get_platform_info(self) -> tuple[Optional[str], Optional[str], Optional[str]]: + """ + Get platform, normalized architecture, and binary name base. + Returns: + tuple: (system, normalized_arch, binary_name_base) or (None, None, None) if unsupported. + """ + system = platform.system().lower() + machine = platform.machine().lower() + + # Map platform to mihomo binary naming (base name without extension) + platform_map = { + 'windows': { + 'amd64': 'mihomo-windows-amd64', + '386': 'mihomo-windows-386', + 'arm64': 'mihomo-windows-arm64', + 'arm': 'mihomo-windows-arm32v7' + }, + 'linux': { + 'amd64': 'mihomo-linux-amd64', + '386': 'mihomo-linux-386', + 'arm64': 'mihomo-linux-arm64', + 'arm': 'mihomo-linux-armv7' + }, + 'darwin': { + 'amd64': 'mihomo-darwin-amd64', + 'arm64': 'mihomo-darwin-arm64' + } + } + + # Normalize architecture names + arch_map = { + 'x86_64': 'amd64', + 'amd64': 'amd64', + 'i386': '386', + 'i686': '386', + 'arm64': 'arm64', + 'aarch64': 'arm64', + 'armv7l': 'arm', + 'arm': 'arm' + } + + if system not in platform_map: + print(f"❌ Unsupported operating system: {system}") + return None, None, None + + normalized_arch = arch_map.get(machine, machine) + if normalized_arch not in platform_map[system]: + print(f"❌ Unsupported architecture: {machine} ({normalized_arch})") + return None, None, None + + return system, normalized_arch, platform_map[system][normalized_arch] + def update(self, version: Optional[str] = None, force: bool = False) -> bool: """ Download and update mihomo binary from GitHub releases. @@ -37,53 +89,10 @@ class CoreManager: bool: True if update successful, False otherwise. """ try: - # Determine current OS and architecture - system = platform.system().lower() - machine = platform.machine().lower() - - # Map platform to mihomo binary naming (base name without extension) - platform_map = { - 'windows': { - 'amd64': 'mihomo-windows-amd64', - '386': 'mihomo-windows-386', - 'arm64': 'mihomo-windows-arm64', - 'arm': 'mihomo-windows-arm32v7' - }, - 'linux': { - 'amd64': 'mihomo-linux-amd64', - '386': 'mihomo-linux-386', - 'arm64': 'mihomo-linux-arm64', - 'arm': 'mihomo-linux-armv7' - }, - 'darwin': { - 'amd64': 'mihomo-darwin-amd64', - 'arm64': 'mihomo-darwin-arm64' - } - } - - # Normalize architecture names - arch_map = { - 'x86_64': 'amd64', - 'amd64': 'amd64', - 'i386': '386', - 'i686': '386', - 'arm64': 'arm64', - 'aarch64': 'arm64', - 'armv7l': 'arm', - 'arm': 'arm' - } - - if system not in platform_map: - print(f"❌ Unsupported operating system: {system}") + system, normalized_arch, binary_name = self._get_platform_info() + if not system: return False - normalized_arch = arch_map.get(machine, machine) - if normalized_arch not in platform_map[system]: - print(f"❌ Unsupported architecture: {machine} ({normalized_arch})") - return False - - binary_name = platform_map[system][normalized_arch] - # Setup directories binary_dir = self.core_config_manager.storage.config_dir / "bin" binary_dir.mkdir(parents=True, exist_ok=True) diff --git a/ss/corecfg_manager.py b/ss/corecfg_manager.py index 0190769..d8fdfa0 100644 --- a/ss/corecfg_manager.py +++ b/ss/corecfg_manager.py @@ -13,6 +13,7 @@ import yaml from .models import Config from .subscription_manager import SubscriptionManager +from .utils import open_file_in_editor class CoreConfigManager: @@ -120,46 +121,25 @@ class CoreConfigManager: if not self._ensure_config_exists(): return False - # Get system editor - editor = os.environ.get('EDITOR') or os.environ.get('VISUAL') - if not editor: - # Try common editors - for cmd in ['code', 'subl', 'atom', 'vim', 'nano', 'notepad']: - if shutil.which(cmd): - editor = cmd - break + # Backup current config + backup_path = self.config_file.with_suffix('.yaml.backup') + if self.config_file.exists(): + shutil.copy2(self.config_file, backup_path) - if not editor: - print("❌ No editor found. Please set EDITOR or VISUAL environment variable") + # Open editor + if not open_file_in_editor(self.config_file): return False + # Validate edited config try: - # Backup current config - backup_path = self.config_file.with_suffix('.yaml.backup') - if self.config_file.exists(): - shutil.copy2(self.config_file, backup_path) - - # Open editor - subprocess.run([editor, str(self.config_file)], check=True) - - # Validate edited config - try: - config = self.load_config() - print("✅ Configuration edited successfully") - return True - except Exception as e: - # Restore backup if validation fails - if backup_path.exists(): - shutil.copy2(backup_path, self.config_file) - print(f"❌ Invalid configuration: {e}") - print("🔄 Restored previous configuration") - return False - - except subprocess.CalledProcessError: - print("❌ Editor command failed") - return False + self.load_config() + print("✅ Configuration edited successfully") + return True except Exception as e: - print(f"❌ Failed to edit configuration: {e}") + print(f"❌ Configuration invalid: {e}") + print("Restoring backup...") + if backup_path.exists(): + shutil.copy2(backup_path, self.config_file) return False def reset_config(self) -> bool: diff --git a/ss/hook_manager.py b/ss/hook_manager.py index da7c734..21c98e5 100644 --- a/ss/hook_manager.py +++ b/ss/hook_manager.py @@ -7,6 +7,7 @@ from pathlib import Path from typing import List from .storage import StorageManager +from .utils import open_file_in_editor class HookManager: @@ -46,7 +47,7 @@ class HookManager: print(f"\nInitialized hooks directory with {copied_count} new scripts.") print(f"Location: {self.hooks_dir}") - def list(self) -> None: + def list_hooks(self) -> None: """Display hooks directory location and list all hook scripts.""" print(f"Hooks directory: {self.hooks_dir}") @@ -82,14 +83,7 @@ class HookManager: print(f"Available scripts: {', '.join(available)}") return - editor = os.environ.get('EDITOR', 'notepad' if os.name == 'nt' else 'nano') - - try: - subprocess.run([editor, str(script_path)], check=True) - except subprocess.CalledProcessError as e: - print(f"Failed to open editor: {e}") - except FileNotFoundError: - print(f"Editor '{editor}' not found. Please set EDITOR environment variable.") + open_file_in_editor(script_path) def rm(self, script_name: str) -> None: """Remove a hook script. diff --git a/ss/models.py b/ss/models.py index 0cb14a6..cf55ee4 100644 --- a/ss/models.py +++ b/ss/models.py @@ -4,28 +4,27 @@ Pydantic models for scientific-surfing data structures. from datetime import datetime import os +from enum import Enum from typing import Dict, List, Optional from pydantic import BaseModel, Field, validator +class SubscriptionStatus(str, Enum): + ACTIVE = "active" + INACTIVE = "inactive" + class Subscription(BaseModel): """Model for a single subscription.""" name: str = Field(..., description="Name of the subscription") url: str = Field(..., description="Clash RSS subscription URL") - status: str = Field(default="inactive", description="Status: active or inactive") + status: SubscriptionStatus = Field(default=SubscriptionStatus.INACTIVE, description="Status: active or inactive") last_refresh: Optional[datetime] = Field(default=None, description="Last refresh timestamp") file_size: Optional[int] = Field(default=None, description="Size of downloaded file in bytes") status_code: Optional[int] = Field(default=None, description="HTTP status code of last refresh") content_hash: Optional[int] = Field(default=None, description="Hash of downloaded content") last_error: Optional[str] = Field(default=None, description="Last error message if any") - @validator('status') - def validate_status(cls, v): - if v not in ['active', 'inactive']: - raise ValueError('Status must be either "active" or "inactive"') - return v - class Config: json_encoders = { datetime: lambda v: v.isoformat() if v else None @@ -66,7 +65,7 @@ class SubscriptionsData(BaseModel): def get_active_subscription(self) -> Optional[Subscription]: """Get the currently active subscription.""" for subscription in self.subscriptions.values(): - if subscription.status == 'active': + if subscription.status == SubscriptionStatus.ACTIVE: return subscription return None @@ -76,7 +75,7 @@ class SubscriptionsData(BaseModel): return False for sub_name, subscription in self.subscriptions.items(): - subscription.status = 'active' if sub_name == name else 'inactive' + subscription.status = SubscriptionStatus.ACTIVE if sub_name == name else SubscriptionStatus.INACTIVE return True def add_subscription(self, name: str, url: str) -> Subscription: @@ -85,7 +84,7 @@ class SubscriptionsData(BaseModel): # If this is the first subscription, set it as active if not self.subscriptions: - subscription.status = 'active' + subscription.status = SubscriptionStatus.ACTIVE self.subscriptions[name] = subscription return subscription diff --git a/ss/subscription_manager.py b/ss/subscription_manager.py index 3df40dd..6330bcc 100644 --- a/ss/subscription_manager.py +++ b/ss/subscription_manager.py @@ -9,6 +9,7 @@ import urllib.parse import yaml import requests from .storage import StorageManager +from .models import SubscriptionStatus class SubscriptionManager: @@ -358,11 +359,15 @@ class SubscriptionManager: print("📋 Subscriptions:") for name, subscription in self.subscriptions_data.subscriptions.items(): - active_marker = "✅" if subscription.status == 'active' else " " + is_active = subscription.status == SubscriptionStatus.ACTIVE + active_marker = "✅" if is_active else " " + last_refresh_str = "" if subscription.last_refresh: last_refresh_str = f" (last: {subscription.last_refresh.strftime('%Y-%m-%d %H:%M:%S')})" - print(f" {active_marker} {name}: {subscription.url} ({subscription.status}){last_refresh_str}") + + status_str = "active" if is_active else "inactive" + print(f" {active_marker} {name}: {subscription.url} ({status_str}){last_refresh_str}") def show_storage_info(self) -> None: """Show storage information.""" diff --git a/ss/utils.py b/ss/utils.py new file mode 100644 index 0000000..eb9400d --- /dev/null +++ b/ss/utils.py @@ -0,0 +1,49 @@ +import os +import sys +import shutil +import subprocess +from pathlib import Path +from typing import Optional + +def get_editor_command() -> Optional[str]: + """ + Get the command for the system's default editor. + Prioritizes EDITOR/VISUAL env vars, then falls back to common editors. + """ + editor = os.environ.get('EDITOR') or os.environ.get('VISUAL') + if editor: + return editor + + # Try common editors + common_editors = ['code', 'subl', 'atom', 'vim', 'nano', 'notepad'] + if os.name == 'nt': + common_editors.insert(0, 'notepad') + + for cmd in common_editors: + if shutil.which(cmd): + return cmd + + return None + +def open_file_in_editor(file_path: Path) -> bool: + """ + Open a file in the system default editor. + Returns True if successful, False otherwise. + """ + editor = get_editor_command() + if not editor: + print("❌ No editor found. Please set EDITOR or VISUAL environment variable") + return False + + try: + subprocess.run([editor, str(file_path)], check=True) + return True + except subprocess.CalledProcessError as e: + print(f"❌ Failed to open editor: {e}") + return False + except FileNotFoundError: + print(f"❌ Editor '{editor}' not found.") + return False + except Exception as e: + print(f"❌ Error opening editor: {e}") + return False