refactor: extract methods

This commit is contained in:
2026-03-05 23:33:29 +08:00
parent 8993b8fb88
commit 9466fb73ef
7 changed files with 258 additions and 202 deletions

View File

@ -138,28 +138,12 @@ def create_parser() -> argparse.ArgumentParser:
return parser return parser
def main() -> None: def handle_subscription_command(args, subscription_manager: SubscriptionManager, parser: argparse.ArgumentParser) -> None:
"""Main CLI entry point.""" """Handle subscription related commands."""
parser = create_parser()
args = parser.parse_args()
if not args.command:
parser.print_help()
return
storage = StorageManager()
subscription_manager = SubscriptionManager(storage)
core_config_manager = CoreConfigManager(subscription_manager)
core_manager = CoreManager(core_config_manager)
hook_manager = HookManager(storage)
try:
if args.command == 'subscription':
if not hasattr(args, 'subcommand') or not args.subcommand: if not hasattr(args, 'subcommand') or not args.subcommand:
parser.parse_args(['subscription', '--help']) parser.parse_args(['subscription', '--help'])
return return
if args.subcommand == 'add': if args.subcommand == 'add':
subscription_manager.add_subscription(args.name, args.url) subscription_manager.add_subscription(args.name, args.url)
elif args.subcommand == 'refresh': elif args.subcommand == 'refresh':
@ -179,7 +163,9 @@ def main() -> None:
else: else:
parser.parse_args(['subscription', '--help']) parser.parse_args(['subscription', '--help'])
elif args.command == 'core':
def handle_core_command(args, core_manager: CoreManager, core_config_manager: CoreConfigManager, parser: argparse.ArgumentParser) -> None:
"""Handle core/config/service related commands."""
if not hasattr(args, 'core_command') or not args.core_command: if not hasattr(args, 'core_command') or not args.core_command:
parser.parse_args(['core', '--help']) parser.parse_args(['core', '--help'])
return return
@ -187,6 +173,15 @@ def main() -> None:
if args.core_command == 'update': if args.core_command == 'update':
core_manager.update(version=args.version, force=args.force) core_manager.update(version=args.version, force=args.force)
elif args.core_command == 'config': elif args.core_command == 'config':
handle_config_command(args, core_config_manager, parser)
elif args.core_command == 'service':
handle_service_command(args, core_manager, parser)
else:
parser.parse_args(['core', '--help'])
def handle_config_command(args, core_config_manager: CoreConfigManager, parser: argparse.ArgumentParser) -> None:
"""Handle configuration commands."""
if not hasattr(args, 'config_command') or not args.config_command: if not hasattr(args, 'config_command') or not args.config_command:
parser.parse_args(['core', 'config', '--help']) parser.parse_args(['core', 'config', '--help'])
return return
@ -205,7 +200,10 @@ def main() -> None:
core_config_manager.apply() core_config_manager.apply()
else: else:
parser.parse_args(['core', 'config', '--help']) parser.parse_args(['core', 'config', '--help'])
elif args.core_command == 'service':
def handle_service_command(args, core_manager: CoreManager, parser: argparse.ArgumentParser) -> None:
"""Handle service commands."""
if not hasattr(args, 'service_command') or not args.service_command: if not hasattr(args, 'service_command') or not args.service_command:
parser.parse_args(['core', 'service', '--help']) parser.parse_args(['core', 'service', '--help'])
return return
@ -238,19 +236,18 @@ def main() -> None:
print(f"Service '{args.name}' status: {status}") print(f"Service '{args.name}' status: {status}")
else: else:
parser.parse_args(['core', 'service', '--help']) parser.parse_args(['core', 'service', '--help'])
else:
parser.parse_args(['core', '--help'])
elif args.command == 'hook':
def handle_hook_command(args, hook_manager: HookManager, parser: argparse.ArgumentParser) -> None:
"""Handle hook commands."""
if not hasattr(args, 'hook_command') or not args.hook_command: if not hasattr(args, 'hook_command') or not args.hook_command:
parser.parse_args(['hook', '--help']) parser.parse_args(['hook', '--help'])
return return
if args.hook_command == 'init': if args.hook_command == 'init':
hook_manager.init() hook_manager.init()
elif args.hook_command == 'list': elif args.hook_command == 'list':
hook_manager.list() hook_manager.list_hooks()
elif args.hook_command == 'edit': elif args.hook_command == 'edit':
hook_manager.edit(args.script) hook_manager.edit(args.script)
elif args.hook_command == 'rm': elif args.hook_command == 'rm':
@ -258,6 +255,29 @@ def main() -> None:
else: else:
parser.parse_args(['hook', '--help']) parser.parse_args(['hook', '--help'])
def main() -> None:
"""Main CLI entry point."""
parser = create_parser()
args = parser.parse_args()
if not args.command:
parser.print_help()
return
storage = StorageManager()
subscription_manager = SubscriptionManager(storage)
core_config_manager = CoreConfigManager(subscription_manager)
core_manager = CoreManager(core_config_manager)
hook_manager = HookManager(storage)
try:
if args.command == 'subscription':
handle_subscription_command(args, subscription_manager, parser)
elif args.command == 'core':
handle_core_command(args, core_manager, core_config_manager, parser)
elif args.command == 'hook':
handle_hook_command(args, hook_manager, parser)
else: else:
parser.print_help() parser.print_help()
except KeyboardInterrupt: except KeyboardInterrupt:

View File

@ -25,19 +25,12 @@ class CoreManager:
self.storage = core_config_manager.storage self.storage = core_config_manager.storage
self.core_config_manager = core_config_manager self.core_config_manager = core_config_manager
def update(self, version: Optional[str] = None, force: bool = False) -> bool: def _get_platform_info(self) -> tuple[Optional[str], Optional[str], Optional[str]]:
""" """
Download and update mihomo binary from GitHub releases. Get platform, normalized architecture, and binary name base.
Args:
version: Specific version to download (e.g., 'v1.18.5'). If None, downloads latest.
force: Force download even if binary already exists.
Returns: Returns:
bool: True if update successful, False otherwise. tuple: (system, normalized_arch, binary_name_base) or (None, None, None) if unsupported.
""" """
try:
# Determine current OS and architecture
system = platform.system().lower() system = platform.system().lower()
machine = platform.machine().lower() machine = platform.machine().lower()
@ -75,14 +68,30 @@ class CoreManager:
if system not in platform_map: if system not in platform_map:
print(f"❌ Unsupported operating system: {system}") print(f"❌ Unsupported operating system: {system}")
return False return None, None, None
normalized_arch = arch_map.get(machine, machine) normalized_arch = arch_map.get(machine, machine)
if normalized_arch not in platform_map[system]: if normalized_arch not in platform_map[system]:
print(f"❌ Unsupported architecture: {machine} ({normalized_arch})") print(f"❌ Unsupported architecture: {machine} ({normalized_arch})")
return False return None, None, None
binary_name = platform_map[system][normalized_arch] return system, normalized_arch, platform_map[system][normalized_arch]
def update(self, version: Optional[str] = None, force: bool = False) -> bool:
"""
Download and update mihomo binary from GitHub releases.
Args:
version: Specific version to download (e.g., 'v1.18.5'). If None, downloads latest.
force: Force download even if binary already exists.
Returns:
bool: True if update successful, False otherwise.
"""
try:
system, normalized_arch, binary_name = self._get_platform_info()
if not system:
return False
# Setup directories # Setup directories
binary_dir = self.core_config_manager.storage.config_dir / "bin" binary_dir = self.core_config_manager.storage.config_dir / "bin"

View File

@ -13,6 +13,7 @@ import yaml
from .models import Config from .models import Config
from .subscription_manager import SubscriptionManager from .subscription_manager import SubscriptionManager
from .utils import open_file_in_editor
class CoreConfigManager: class CoreConfigManager:
@ -120,46 +121,25 @@ class CoreConfigManager:
if not self._ensure_config_exists(): if not self._ensure_config_exists():
return False return False
# Get system editor
editor = os.environ.get('EDITOR') or os.environ.get('VISUAL')
if not editor:
# Try common editors
for cmd in ['code', 'subl', 'atom', 'vim', 'nano', 'notepad']:
if shutil.which(cmd):
editor = cmd
break
if not editor:
print("❌ No editor found. Please set EDITOR or VISUAL environment variable")
return False
try:
# Backup current config # Backup current config
backup_path = self.config_file.with_suffix('.yaml.backup') backup_path = self.config_file.with_suffix('.yaml.backup')
if self.config_file.exists(): if self.config_file.exists():
shutil.copy2(self.config_file, backup_path) shutil.copy2(self.config_file, backup_path)
# Open editor # Open editor
subprocess.run([editor, str(self.config_file)], check=True) if not open_file_in_editor(self.config_file):
return False
# Validate edited config # Validate edited config
try: try:
config = self.load_config() self.load_config()
print("✅ Configuration edited successfully") print("✅ Configuration edited successfully")
return True return True
except Exception as e: except Exception as e:
# Restore backup if validation fails print(f"❌ Configuration invalid: {e}")
print("Restoring backup...")
if backup_path.exists(): if backup_path.exists():
shutil.copy2(backup_path, self.config_file) shutil.copy2(backup_path, self.config_file)
print(f"❌ Invalid configuration: {e}")
print("🔄 Restored previous configuration")
return False
except subprocess.CalledProcessError:
print("❌ Editor command failed")
return False
except Exception as e:
print(f"❌ Failed to edit configuration: {e}")
return False return False
def reset_config(self) -> bool: def reset_config(self) -> bool:

View File

@ -7,6 +7,7 @@ from pathlib import Path
from typing import List from typing import List
from .storage import StorageManager from .storage import StorageManager
from .utils import open_file_in_editor
class HookManager: class HookManager:
@ -46,7 +47,7 @@ class HookManager:
print(f"\nInitialized hooks directory with {copied_count} new scripts.") print(f"\nInitialized hooks directory with {copied_count} new scripts.")
print(f"Location: {self.hooks_dir}") print(f"Location: {self.hooks_dir}")
def list(self) -> None: def list_hooks(self) -> None:
"""Display hooks directory location and list all hook scripts.""" """Display hooks directory location and list all hook scripts."""
print(f"Hooks directory: {self.hooks_dir}") print(f"Hooks directory: {self.hooks_dir}")
@ -82,14 +83,7 @@ class HookManager:
print(f"Available scripts: {', '.join(available)}") print(f"Available scripts: {', '.join(available)}")
return return
editor = os.environ.get('EDITOR', 'notepad' if os.name == 'nt' else 'nano') open_file_in_editor(script_path)
try:
subprocess.run([editor, str(script_path)], check=True)
except subprocess.CalledProcessError as e:
print(f"Failed to open editor: {e}")
except FileNotFoundError:
print(f"Editor '{editor}' not found. Please set EDITOR environment variable.")
def rm(self, script_name: str) -> None: def rm(self, script_name: str) -> None:
"""Remove a hook script. """Remove a hook script.

View File

@ -4,28 +4,27 @@ Pydantic models for scientific-surfing data structures.
from datetime import datetime from datetime import datetime
import os import os
from enum import Enum
from typing import Dict, List, Optional from typing import Dict, List, Optional
from pydantic import BaseModel, Field, validator from pydantic import BaseModel, Field, validator
class SubscriptionStatus(str, Enum):
ACTIVE = "active"
INACTIVE = "inactive"
class Subscription(BaseModel): class Subscription(BaseModel):
"""Model for a single subscription.""" """Model for a single subscription."""
name: str = Field(..., description="Name of the subscription") name: str = Field(..., description="Name of the subscription")
url: str = Field(..., description="Clash RSS subscription URL") url: str = Field(..., description="Clash RSS subscription URL")
status: str = Field(default="inactive", description="Status: active or inactive") status: SubscriptionStatus = Field(default=SubscriptionStatus.INACTIVE, description="Status: active or inactive")
last_refresh: Optional[datetime] = Field(default=None, description="Last refresh timestamp") last_refresh: Optional[datetime] = Field(default=None, description="Last refresh timestamp")
file_size: Optional[int] = Field(default=None, description="Size of downloaded file in bytes") file_size: Optional[int] = Field(default=None, description="Size of downloaded file in bytes")
status_code: Optional[int] = Field(default=None, description="HTTP status code of last refresh") status_code: Optional[int] = Field(default=None, description="HTTP status code of last refresh")
content_hash: Optional[int] = Field(default=None, description="Hash of downloaded content") content_hash: Optional[int] = Field(default=None, description="Hash of downloaded content")
last_error: Optional[str] = Field(default=None, description="Last error message if any") last_error: Optional[str] = Field(default=None, description="Last error message if any")
@validator('status')
def validate_status(cls, v):
if v not in ['active', 'inactive']:
raise ValueError('Status must be either "active" or "inactive"')
return v
class Config: class Config:
json_encoders = { json_encoders = {
datetime: lambda v: v.isoformat() if v else None datetime: lambda v: v.isoformat() if v else None
@ -66,7 +65,7 @@ class SubscriptionsData(BaseModel):
def get_active_subscription(self) -> Optional[Subscription]: def get_active_subscription(self) -> Optional[Subscription]:
"""Get the currently active subscription.""" """Get the currently active subscription."""
for subscription in self.subscriptions.values(): for subscription in self.subscriptions.values():
if subscription.status == 'active': if subscription.status == SubscriptionStatus.ACTIVE:
return subscription return subscription
return None return None
@ -76,7 +75,7 @@ class SubscriptionsData(BaseModel):
return False return False
for sub_name, subscription in self.subscriptions.items(): for sub_name, subscription in self.subscriptions.items():
subscription.status = 'active' if sub_name == name else 'inactive' subscription.status = SubscriptionStatus.ACTIVE if sub_name == name else SubscriptionStatus.INACTIVE
return True return True
def add_subscription(self, name: str, url: str) -> Subscription: def add_subscription(self, name: str, url: str) -> Subscription:
@ -85,7 +84,7 @@ class SubscriptionsData(BaseModel):
# If this is the first subscription, set it as active # If this is the first subscription, set it as active
if not self.subscriptions: if not self.subscriptions:
subscription.status = 'active' subscription.status = SubscriptionStatus.ACTIVE
self.subscriptions[name] = subscription self.subscriptions[name] = subscription
return subscription return subscription

View File

@ -9,6 +9,7 @@ import urllib.parse
import yaml import yaml
import requests import requests
from .storage import StorageManager from .storage import StorageManager
from .models import SubscriptionStatus
class SubscriptionManager: class SubscriptionManager:
@ -358,11 +359,15 @@ class SubscriptionManager:
print("📋 Subscriptions:") print("📋 Subscriptions:")
for name, subscription in self.subscriptions_data.subscriptions.items(): for name, subscription in self.subscriptions_data.subscriptions.items():
active_marker = "" if subscription.status == 'active' else " " is_active = subscription.status == SubscriptionStatus.ACTIVE
active_marker = "" if is_active else " "
last_refresh_str = "" last_refresh_str = ""
if subscription.last_refresh: if subscription.last_refresh:
last_refresh_str = f" (last: {subscription.last_refresh.strftime('%Y-%m-%d %H:%M:%S')})" last_refresh_str = f" (last: {subscription.last_refresh.strftime('%Y-%m-%d %H:%M:%S')})"
print(f" {active_marker} {name}: {subscription.url} ({subscription.status}){last_refresh_str}")
status_str = "active" if is_active else "inactive"
print(f" {active_marker} {name}: {subscription.url} ({status_str}){last_refresh_str}")
def show_storage_info(self) -> None: def show_storage_info(self) -> None:
"""Show storage information.""" """Show storage information."""

49
ss/utils.py Normal file
View File

@ -0,0 +1,49 @@
import os
import sys
import shutil
import subprocess
from pathlib import Path
from typing import Optional
def get_editor_command() -> Optional[str]:
"""
Get the command for the system's default editor.
Prioritizes EDITOR/VISUAL env vars, then falls back to common editors.
"""
editor = os.environ.get('EDITOR') or os.environ.get('VISUAL')
if editor:
return editor
# Try common editors
common_editors = ['code', 'subl', 'atom', 'vim', 'nano', 'notepad']
if os.name == 'nt':
common_editors.insert(0, 'notepad')
for cmd in common_editors:
if shutil.which(cmd):
return cmd
return None
def open_file_in_editor(file_path: Path) -> bool:
"""
Open a file in the system default editor.
Returns True if successful, False otherwise.
"""
editor = get_editor_command()
if not editor:
print("❌ No editor found. Please set EDITOR or VISUAL environment variable")
return False
try:
subprocess.run([editor, str(file_path)], check=True)
return True
except subprocess.CalledProcessError as e:
print(f"❌ Failed to open editor: {e}")
return False
except FileNotFoundError:
print(f"❌ Editor '{editor}' not found.")
return False
except Exception as e:
print(f"❌ Error opening editor: {e}")
return False