feat: hook manager

This commit is contained in:
2025-10-16 14:59:06 +08:00
parent 90719b8416
commit a5299f2455
5 changed files with 198 additions and 33 deletions

View File

@ -6,7 +6,8 @@
"Bash(python:*)",
"Bash(poetry run python:*)",
"Bash(del test_upgrade.py)",
"Bash(git add .)"
"Bash(git add .)",
"Bash(git commit -m \"Initial commit with Python .gitignore\")"
],
"deny": [],
"ask": []

View File

@ -4,8 +4,12 @@ Command-line interface for scientific-surfing package.
import argparse
import sys
from scientific_surfing.storage import StorageManager
from scientific_surfing.subscription_manager import SubscriptionManager
from scientific_surfing.corecfg_manager import CoreConfigManager
from scientific_surfing.corecfg_manager import CoreConfigManager
from scientific_surfing.core_manager import CoreManager
from scientific_surfing.hook_manager import HookManager
def create_parser() -> argparse.ArgumentParser:
"""Create the argument parser."""
@ -85,6 +89,24 @@ def create_parser() -> argparse.ArgumentParser:
update_parser.add_argument('--version', help='Specific version to download (e.g., v1.18.5). If not specified, downloads latest')
update_parser.add_argument('--force', action='store_true', help='Force update even if binary already exists')
# Hook commands
hook_parser = subparsers.add_parser('hook', help='Manage hook scripts')
hook_subparsers = hook_parser.add_subparsers(dest='hook_command', help='Hook operations')
# Init hooks command
init_parser = hook_subparsers.add_parser('init', help='Initialize hooks directory with template scripts')
# Show hooks command
list_hooks_parser = hook_subparsers.add_parser('list', help='Show hooks directory location and list scripts')
# Edit hook command
edit_hook_parser = hook_subparsers.add_parser('edit', help='Edit a hook script')
edit_hook_parser.add_argument('script', help='Name of the script to edit')
# Remove hook command
rm_hook_parser = hook_subparsers.add_parser('rm', help='Remove a hook script')
rm_hook_parser.add_argument('script', help='Name of the script to remove')
return parser
@ -97,27 +119,33 @@ def main() -> None:
parser.print_help()
return
storage = StorageManager()
subscription_manager = SubscriptionManager(storage)
core_config_manager = CoreConfigManager(subscription_manager)
core_manager = CoreManager(core_config_manager)
hook_manager = HookManager(storage)
try:
if args.command == 'subscription':
if not hasattr(args, 'subcommand') or not args.subcommand:
parser.parse_args(['subscription', '--help'])
return
manager = SubscriptionManager()
if args.subcommand == 'add':
manager.add_subscription(args.name, args.url)
subscription_manager.add_subscription(args.name, args.url)
elif args.subcommand == 'refresh':
manager.refresh_subscription(args.name)
subscription_manager.refresh_subscription(args.name)
elif args.subcommand == 'rm':
manager.delete_subscription(args.name)
subscription_manager.delete_subscription(args.name)
elif args.subcommand == 'rename':
manager.rename_subscription(args.name, args.new_name)
subscription_manager.rename_subscription(args.name, args.new_name)
elif args.subcommand == 'activate':
manager.activate_subscription(args.name)
subscription_manager.activate_subscription(args.name)
elif args.subcommand == 'list':
manager.list_subscriptions()
subscription_manager.list_subscriptions()
elif args.subcommand == 'storage':
manager.show_storage_info()
subscription_manager.show_storage_info()
else:
parser.parse_args(['subscription', '--help'])
@ -126,10 +154,6 @@ def main() -> None:
parser.parse_args(['core-config', '--help'])
return
from scientific_surfing.corecfg_manager import CoreConfigManager
from scientific_surfing.core_manager import CoreManager
core_config_manager = CoreConfigManager()
core_manager = CoreManager(core_config_manager)
if args.core_config_command == 'import':
core_config_manager.import_config(args.source)
@ -143,8 +167,6 @@ def main() -> None:
core_config_manager.show_config()
elif args.core_config_command == 'apply':
core_config_manager.apply()
elif args.core_config_command == 'upgrade':
core_manager.update(version=args.version, force=args.force)
else:
parser.parse_args(['core-config', '--help'])
@ -153,16 +175,29 @@ def main() -> None:
parser.parse_args(['core', '--help'])
return
from scientific_surfing.corecfg_manager import CoreConfigManager
from scientific_surfing.core_manager import CoreManager
core_config_manager = CoreConfigManager()
core_manager = CoreManager(core_config_manager)
if args.core_command == 'update':
core_manager.update(version=args.version, force=args.force)
else:
parser.parse_args(['core', '--help'])
elif args.command == 'hook':
if not hasattr(args, 'hook_command') or not args.hook_command:
parser.parse_args(['hook', '--help'])
return
if args.hook_command == 'init':
hook_manager.init()
elif args.hook_command == 'list':
hook_manager.list()
elif args.hook_command == 'edit':
hook_manager.edit(args.script)
elif args.hook_command == 'rm':
hook_manager.rm(args.script)
else:
parser.parse_args(['hook', '--help'])
else:
parser.print_help()
except KeyboardInterrupt:
@ -170,6 +205,7 @@ def main() -> None:
sys.exit(1)
except Exception as e:
print(f"❌ Error: {e}")
raise
sys.exit(1)

View File

@ -12,14 +12,15 @@ from pathlib import Path
import yaml
from scientific_surfing.models import Config
from scientific_surfing.storage import StorageManager
from scientific_surfing.subscription_manager import SubscriptionManager
class CoreConfigManager:
"""Manages user configuration with import, export, and edit operations."""
def __init__(self):
self.storage = StorageManager()
def __init__(self, subscription_manager: SubscriptionManager):
self.subscription_manager = subscription_manager
self.storage = subscription_manager.storage
self.config_file = self.storage.config_dir / "core-config.yaml"
self.default_config_path = Path(__file__).parent / "templates" / "default-core-config.yaml"
@ -221,12 +222,17 @@ class CoreConfigManager:
cmd = [str(hook_path), str(config_file_path)]
print(f"🔧 Executing hook: {hook_path.name}")
env = os.environ.copy()
env['PYTHONIOENCODING'] = 'utf-8'
result = subprocess.run(
cmd,
cwd=hook_path.parent,
capture_output=True,
text=True,
timeout=30
timeout=30,
encoding="utf-8",
shell=True,
env=env,
)
if result.returncode == 0:
@ -276,8 +282,7 @@ class CoreConfigManager:
config = self.load_config()
# Load subscriptions to get active subscription
subscription_manager = SubscriptionManager()
active_subscription = subscription_manager.subscriptions_data.get_active_subscription()
active_subscription = self.subscription_manager.subscriptions_data.get_active_subscription()
if not active_subscription:
print("❌ No active subscription found")

View File

@ -0,0 +1,128 @@
"""Hook manager for scientific-surfing application."""
import os
import shutil
import subprocess
from pathlib import Path
from typing import List
from scientific_surfing.storage import StorageManager
class HookManager:
"""Manages hook scripts for scientific-surfing."""
def __init__(self, storage: StorageManager) -> None:
"""Initialize hook manager.
Args:
config_dir: Optional configuration directory path.
If not provided, uses default user config directory.
"""
self.storage = storage
self.config_dir = storage.config_dir
self.hooks_dir = self.config_dir / "hooks"
self.template_hooks_dir = Path(__file__).parent / "templates" / "hooks"
def init(self) -> None:
"""Initialize hooks directory by copying template hooks."""
if not self.template_hooks_dir.exists():
print(f"Template hooks directory not found: {self.template_hooks_dir}")
return
self.hooks_dir.mkdir(parents=True, exist_ok=True)
copied_count = 0
for hook_file in self.template_hooks_dir.iterdir():
if hook_file.is_file():
dest_file = self.hooks_dir / hook_file.name
if not dest_file.exists():
shutil.copy2(hook_file, dest_file)
copied_count += 1
print(f"Copied: {hook_file.name}")
else:
print(f"Skipped (already exists): {hook_file.name}")
print(f"\nInitialized hooks directory with {copied_count} new scripts.")
print(f"Location: {self.hooks_dir}")
def list(self) -> None:
"""Display hooks directory location and list all hook scripts."""
print(f"Hooks directory: {self.hooks_dir}")
if not self.hooks_dir.exists():
print("Hooks directory does not exist. Run 'init' to create it.")
return
hook_files = self._get_hook_files()
if not hook_files:
print("No hook scripts found.")
else:
print(f"\nFound {len(hook_files)} hook script(s):")
for hook_file in hook_files:
print(f" - {hook_file}")
def edit(self, script_name: str) -> None:
"""Open a hook script with system default editor.
Args:
script_name: Name of the hook script to edit.
"""
if not self.hooks_dir.exists():
print("Hooks directory does not exist. Run 'init' to create it.")
return
script_path = self.hooks_dir / script_name
if not script_path.exists():
available = self._get_hook_files()
print(f"Script '{script_name}' not found.")
if available:
print(f"Available scripts: {', '.join(available)}")
return
editor = os.environ.get('EDITOR', 'notepad' if os.name == 'nt' else 'nano')
try:
subprocess.run([editor, str(script_path)], check=True)
except subprocess.CalledProcessError as e:
print(f"Failed to open editor: {e}")
except FileNotFoundError:
print(f"Editor '{editor}' not found. Please set EDITOR environment variable.")
def rm(self, script_name: str) -> None:
"""Remove a hook script.
Args:
script_name: Name of the hook script to remove.
"""
if not self.hooks_dir.exists():
print("Hooks directory does not exist.")
return
script_path = self.hooks_dir / script_name
if not script_path.exists():
available = self._get_hook_files()
print(f"Script '{script_name}' not found.")
if available:
print(f"Available scripts: {', '.join(available)}")
return
try:
script_path.unlink()
print(f"Removed: {script_name}")
except OSError as e:
print(f"Failed to remove script: {e}")
def _get_hook_files(self) -> List[str]:
"""Get list of hook script files.
Returns:
List of hook script filenames.
"""
if not self.hooks_dir.exists():
return []
return [f.name for f in self.hooks_dir.iterdir() if f.is_file()]

View File

@ -3,21 +3,16 @@ Subscription management module for scientific-surfing.
Handles subscription operations with persistent storage.
"""
import os
import requests
from datetime import datetime
from pathlib import Path
from typing import Optional
from scientific_surfing.storage import StorageManager
from scientific_surfing.models import Subscription, SubscriptionsData, Config
class SubscriptionManager:
"""Manages clash RSS subscriptions with persistent storage."""
storage: StorageManager = None
def __init__(self):
self.storage = StorageManager()
def __init__(self, storage: StorageManager):
self.storage = storage
self.subscriptions_data = self.storage.load_subscriptions()
self.config = self.storage.load_config()