Compare commits

...

11 Commits

31 changed files with 792 additions and 2294 deletions

View File

@ -1,15 +0,0 @@
{
"permissions": {
"allow": [
"Bash(poetry init:*)",
"Bash(mkdir:*)",
"Bash(python:*)",
"Bash(poetry run python:*)",
"Bash(del test_upgrade.py)",
"Bash(git add .)",
"Bash(git commit -m \"Initial commit with Python .gitignore\")"
],
"deny": [],
"ask": []
}
}

View File

@ -1,8 +0,0 @@
# Code style
- Use type hinting everywhere
- Use pydantic based models instead of dict
- Adopt Inversion of Control pattern whenever possible, use constructor injection for class, extract pure function if it has to depend on some global variable
# Workflow
- Be sure to typecheck when youre done making a series of code changes
- Be sure to update README.md after making code changes

View File

@ -13,9 +13,11 @@ A Python package for surfing internet scientifically.
### 1. Clone into local ### 1. Clone into local
```bash ```bash
git clone https://github.com/klesh/scientific-surfing.git git clone ssh://git@gitea.epss.net.cn:2223/klesh/ss.git
cd scientific-surfing cd ss
poetry install python -m venv .venv
./.venv/Scripts/activate
pip install -r requirements.txt
``` ```
### 2. Add the root directory to system PATH ### 2. Add the root directory to system PATH
@ -25,77 +27,77 @@ poetry install
### Subscription Management ### Subscription Management
```bash ```bash
# add a subscription # add a subscription
python -m scientific_surfing subscription add <name> <clash-rss-subscription-url> python -m ss subscription add <name> <clash-rss-subscription-url>
# refresh a subscription # refresh a subscription
python -m scientific_surfing subscription refresh <name> python -m ss subscription refresh <name>
# delete a subscription # delete a subscription
python -m scientific_surfing subscription rm <name> python -m ss subscription rm <name>
# rename a subscription # rename a subscription
python -m scientific_surfing subscription rename <name> <new-name> python -m ss subscription rename <name> <new-name>
# update subscription URL # update subscription URL
python -m scientific_surfing subscription set-url <name> <new-url> python -m ss subscription set-url <name> <new-url>
# activate a subscription # activate a subscription
python -m scientific_surfing subscription activate <name> python -m ss subscription activate <name>
# list all subscriptions # list all subscriptions
python -m scientific_surfing subscription list python -m ss subscription list
# show storage information # show storage information
python -m scientific_surfing subscription storage python -m ss subscription storage
``` ```
### Hook Management ### Hook Management
```bash ```bash
# initialize hooks directory with template scripts # initialize hooks directory with template scripts
python -m scientific_surfing hook init python -m ss hook init
# show hooks directory location and list all scripts # show hooks directory location and list all scripts
python -m scientific_surfing hook list python -m ss hook list
# edit a hook script with system editor # edit a hook script with system editor
python -m scientific_surfing hook edit <script-name> python -m ss hook edit <script-name>
# remove a hook script # remove a hook script
python -m scientific_surfing hook rm <script-name> python -m ss hook rm <script-name>
``` ```
### Core Configuration Management ### Core Configuration Management
```bash ```bash
# import configuration from file # import configuration from file
python -m scientific_surfing core config import <file-path> python -m ss core config import <file-path>
# export configuration to file # export configuration to file
python -m scientific_surfing core config export <file-path> python -m ss core config export <file-path>
# edit configuration with system editor # edit configuration with system editor
python -m scientific_surfing core config edit python -m ss core config edit
# reset configuration to default values # reset configuration to default values
python -m scientific_surfing core config reset python -m ss core config reset
# show current configuration # show current configuration
python -m scientific_surfing core config show python -m ss core config show
# apply active subscription to generate final config # apply active subscription to generate final config
python -m scientific_surfing core config apply python -m ss core config apply
``` ```
### Core Management ### Core Management
```bash ```bash
# update scientific-surfing core components # update scientific-surfing core components
python -m scientific_surfing core update [--version <version>] [--force] python -m ss core update [--version <version>] [--force]
``` ```
### Service Management ### Service Management
Linux / macOS Linux / macOS
```nushell ```nushell
sudo env SF_CONFIG_DIR=(readlink -f ~/basicfiles/cli/scientific_surfing) .venv/bin/python -m scientific_surfing core service install sudo env SF_CONFIG_DIR=$HOME/basicfiles/cli/ss python3 -m ss core service install
``` ```
## Development ## Development
@ -109,4 +111,4 @@ poetry run pytest
## License ## License
MIT License - see LICENSE file for details. MIT License - see LICENSE file for details.

View File

@ -1,96 +0,0 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"id": "4dbde0c5",
"metadata": {},
"outputs": [],
"source": [
"import yaml\n",
"\n",
"\n",
"with open(r'C:\\Users\\Klesh\\basicfiles\\cli\\scientific_surfing\\generated_config.yaml', 'r', encoding=\"utf-8\") as f:\n",
" config = yaml.safe_load(f)"
]
},
{
"cell_type": "code",
"execution_count": 8,
"id": "16e45ae8",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"{'cipher': 'rc4-md5', 'name': 'taiwan06', 'obfs': 'plain', 'obfs-param': '2c9120876.douyin.com', 'password': 'di15PV', 'port': 6506, 'protocol': 'auth_aes128_md5', 'protocol-param': '120876:VCgmuD', 'server': 'cdn02.0821.meituan88.com', 'type': 'ssr', 'udp': True}\n"
]
}
],
"source": [
"server = next(filter(lambda p: \"台湾06\" in p[\"name\"], config[\"proxies\"]))\n",
"server[\"name\"] = \"taiwan06\"\n",
"print(server)\n"
]
},
{
"cell_type": "code",
"execution_count": 5,
"id": "cc472edc",
"metadata": {},
"outputs": [],
"source": [
"config2 = config.copy()"
]
},
{
"cell_type": "code",
"execution_count": 12,
"id": "3db89abe",
"metadata": {},
"outputs": [],
"source": [
"config2[\"proxies\"] = [server]\n",
"config2[\"proxy-groups\"] = {\n",
" \"name\": \"defaultgroup\",\n",
" \"type\": \"select\",\n",
" \"proxies\": [server[\"name\"]],\n",
"}\n",
"config2[\"rules\"] = config[\"rules\"][:17]"
]
},
{
"cell_type": "code",
"execution_count": 13,
"id": "2630b0fc",
"metadata": {},
"outputs": [],
"source": [
"with open(r'C:\\Users\\Klesh\\basicfiles\\cli\\scientific_surfing\\simple.yaml', 'w', encoding=\"utf-8\") as f:\n",
" yaml.dump(config2, f, default_flow_style=False, allow_unicode=True)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "scientific-surfing-4fYWmyKm-py3.12",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.12.10"
}
},
"nbformat": 4,
"nbformat_minor": 5
}

1600
poetry.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -1,28 +1,3 @@
[tool.poetry]
name = "scientific-surfing"
version = "0.1.0"
description = "A Python package for surfing the internet scientifically"
authors = ["Scientific Surfing Team <team@scientific-surfing.com>"]
readme = "README.md"
packages = [{include = "scientific_surfing"}]
[tool.poetry.dependencies]
python = "^3.10"
requests = "^2.25.0"
PyYAML = "^6.0.0"
pydantic = "^2.0.0"
ipykernel = "^7.0.1"
[tool.poetry.group.dev.dependencies]
pytest = "^6.0.0"
pytest-cov = "^2.0.0"
black = "^21.0.0"
flake8 = "^3.8.0"
[build-system]
requires = ["poetry-core>=2.0.0,<3.0.0"]
build-backend = "poetry.core.masonry.api"
[tool.black] [tool.black]
line-length = 88 line-length = 88
target-version = ['py38'] target-version = ['py38']
@ -31,4 +6,4 @@ target-version = ['py38']
testpaths = ["tests"] testpaths = ["tests"]
python_files = ["test_*.py"] python_files = ["test_*.py"]
python_classes = ["Test*"] python_classes = ["Test*"]
python_functions = ["test_*"] python_functions = ["test_*"]

4
requirements-dev.txt Normal file
View File

@ -0,0 +1,4 @@
pytest>=6.0.0
pytest-cov>=2.0.0
black>=21.0.0
flake8>=3.8.0

1
requirements-win32.txt Normal file
View File

@ -0,0 +1 @@
pywin32

4
requirements.txt Normal file
View File

@ -0,0 +1,4 @@
requests>=2.25.0
PyYAML>=6.0.0
pydantic>=2.0.0
ipykernel>=6.31.0

View File

@ -1,8 +0,0 @@
"""
Entry point for python -m scientific_surfing
"""
from scientific_surfing.cli import main
if __name__ == '__main__':
main()

View File

@ -1,168 +0,0 @@
"""
Subscription management module for scientific-surfing.
Handles subscription operations with persistent storage.
"""
from datetime import datetime
import requests
from scientific_surfing.storage import StorageManager
class SubscriptionManager:
"""Manages clash RSS subscriptions with persistent storage."""
storage: StorageManager = None
def __init__(self, storage: StorageManager):
self.storage = storage
self.subscriptions_data = self.storage.load_subscriptions()
self.config = self.storage.load_config()
# Create subscriptions directory for storing downloaded files
self.subscriptions_dir = self.storage.config_dir / "subscriptions"
self.subscriptions_dir.mkdir(exist_ok=True)
def add_subscription(self, name: str, url: str) -> None:
"""Add a new subscription."""
subscription = self.subscriptions_data.add_subscription(name, url)
if self.storage.save_subscriptions(self.subscriptions_data):
self.refresh_subscription(subscription.name)
self.activate_subscription(subscription.name)
print(f"✅ Added subscription: {name} -> {url}")
else:
print("❌ Failed to save subscription")
def refresh_subscription(self, name: str) -> None:
"""Refresh a subscription by downloading from URL."""
if name not in self.subscriptions_data.subscriptions:
print(f"❌ Subscription '{name}' not found")
return
subscription = self.subscriptions_data.subscriptions[name]
url = subscription.url
print(f"🔄 Refreshing subscription: {name}")
try:
# Download the subscription content
headers = {
# 'User-Agent': self.config.default_user_agent
}
# timeout = self.config.timeout_seconds
# response = requests.get(url, headers=headers, timeout=timeout)
response = requests.get(url)
response.raise_for_status()
# File path without timestamp
file_path = self.subscriptions_dir / f"{name}.yml"
# Handle existing file by renaming with creation date
if file_path.exists():
# Get creation time of existing file
stat = file_path.stat()
try:
# Try st_birthtime first (macOS/Unix)
creation_time = datetime.fromtimestamp(stat.st_birthtime)
except AttributeError:
# Fallback to st_ctime (Windows)
creation_time = datetime.fromtimestamp(stat.st_ctime)
backup_name = f"{name}_{creation_time.strftime('%Y%m%d_%H%M%S')}.yml"
backup_path = self.subscriptions_dir / backup_name
# Rename existing file
file_path.rename(backup_path)
print(f" 🔄 Backed up existing file to: {backup_name}")
# Save the new downloaded content
with open(file_path, 'w', encoding='utf-8') as f:
f.write(response.text)
# Update subscription metadata
subscription.last_refresh = datetime.now()
subscription.file_size = len(response.text)
subscription.status_code = response.status_code
subscription.content_hash = hash(response.text)
subscription.last_error = None
if self.storage.save_subscriptions(self.subscriptions_data):
print(f"✅ Subscription '{name}' refreshed successfully")
print(f" 📁 Saved to: {file_path}")
print(f" 📊 Size: {len(response.text)} bytes")
else:
print("❌ Failed to save subscription metadata")
except requests.exceptions.RequestException as e:
print(f"❌ Failed to download subscription: {e}")
subscription.last_error = str(e)
self.storage.save_subscriptions(self.subscriptions_data)
except IOError as e:
print(f"❌ Failed to save file: {e}")
subscription.last_error = str(e)
self.storage.save_subscriptions(self.subscriptions_data)
def delete_subscription(self, name: str) -> None:
"""Delete a subscription."""
if self.subscriptions_data.remove_subscription(name):
if self.storage.save_subscriptions(self.subscriptions_data):
print(f"🗑️ Deleted subscription: {name}")
else:
print("❌ Failed to delete subscription")
else:
print(f"❌ Subscription '{name}' not found")
def rename_subscription(self, old_name: str, new_name: str) -> None:
"""Rename a subscription."""
if self.subscriptions_data.rename_subscription(old_name, new_name):
if self.storage.save_subscriptions(self.subscriptions_data):
print(f"✅ Renamed subscription: {old_name} -> {new_name}")
else:
print("❌ Failed to rename subscription")
else:
print(f"❌ Failed to rename subscription: '{old_name}' not found or '{new_name}' already exists")
def set_subscription_url(self, name: str, url: str) -> None:
"""Update the URL for a subscription."""
if self.subscriptions_data.set_subscription_url(name, url):
if self.storage.save_subscriptions(self.subscriptions_data):
print(f"✅ Updated URL for subscription '{name}': {url}")
else:
print("❌ Failed to save subscription")
else:
print(f"❌ Subscription '{name}' not found")
def activate_subscription(self, name: str) -> None:
"""Activate a subscription."""
if self.subscriptions_data.set_active(name):
if self.storage.save_subscriptions(self.subscriptions_data):
print(f"✅ Activated subscription: {name}")
else:
print("❌ Failed to activate subscription")
else:
print(f"❌ Subscription '{name}' not found")
def list_subscriptions(self) -> None:
"""List all subscriptions."""
if not self.subscriptions_data.subscriptions:
print("No subscriptions found")
return
print("📋 Subscriptions:")
for name, subscription in self.subscriptions_data.subscriptions.items():
active_marker = "" if subscription.status == 'active' else " "
last_refresh_str = ""
if subscription.last_refresh:
last_refresh_str = f" (last: {subscription.last_refresh.strftime('%Y-%m-%d %H:%M:%S')})"
print(f" {active_marker} {name}: {subscription.url} ({subscription.status}){last_refresh_str}")
def show_storage_info(self) -> None:
"""Show storage information."""
info = self.storage.get_storage_info()
print("📁 Storage Information:")
print(f" Platform: {info['platform']}")
print(f" Config Directory: {info['config_dir']}")
print(f" Config File: {info['config_file']}")
print(f" Subscriptions File: {info['subscriptions_file']}")
print(f" Directory Exists: {info['exists']}")

8
ss/__main__.py Normal file
View File

@ -0,0 +1,8 @@
"""
Entry point for python -m ss
"""
from .cli import main
if __name__ == '__main__':
main()

View File

@ -4,11 +4,11 @@ Command-line interface for scientific-surfing package.
import argparse import argparse
import sys import sys
from scientific_surfing.storage import StorageManager from .storage import StorageManager
from scientific_surfing.subscription_manager import SubscriptionManager from .subscription_manager import SubscriptionManager
from scientific_surfing.corecfg_manager import CoreConfigManager from .corecfg_manager import CoreConfigManager
from scientific_surfing.core_manager import CoreManager from .core_manager import CoreManager
from scientific_surfing.hook_manager import HookManager from .hook_manager import HookManager
def create_parser() -> argparse.ArgumentParser: def create_parser() -> argparse.ArgumentParser:
"""Create the argument parser.""" """Create the argument parser."""
@ -65,7 +65,7 @@ def create_parser() -> argparse.ArgumentParser:
update_parser.add_argument('--force', action='store_true', help='Force update even if binary already exists') update_parser.add_argument('--force', action='store_true', help='Force update even if binary already exists')
# Config commands # Config commands
config_parser = core_subparsers.add_parser('config', help='Manage core configuration') config_parser = subparsers.add_parser('config', help='Manage core configuration')
config_subparsers = config_parser.add_subparsers(dest='config_command', help='Configuration operations') config_subparsers = config_parser.add_subparsers(dest='config_command', help='Configuration operations')
# Import config # Import config
@ -89,7 +89,7 @@ def create_parser() -> argparse.ArgumentParser:
apply_parser = config_subparsers.add_parser('apply', help='Apply active subscription to generate final config') apply_parser = config_subparsers.add_parser('apply', help='Apply active subscription to generate final config')
# Service management commands # Service management commands
service_parser = core_subparsers.add_parser('service', help='Manage mihomo as a system service') service_parser = subparsers.add_parser('service', help='Manage mihomo as a system service')
service_subparsers = service_parser.add_subparsers(dest='service_command', help='Service operations') service_subparsers = service_parser.add_subparsers(dest='service_command', help='Service operations')
# Install service command # Install service command
@ -113,6 +113,10 @@ def create_parser() -> argparse.ArgumentParser:
restart_service_parser = service_subparsers.add_parser('restart', help='Restart mihomo system service') restart_service_parser = service_subparsers.add_parser('restart', help='Restart mihomo system service')
restart_service_parser.add_argument('--name', default='mihomo', help='Service name (default: mihomo)') restart_service_parser.add_argument('--name', default='mihomo', help='Service name (default: mihomo)')
# Reload service command
reload_service_parser = service_subparsers.add_parser('reload', help='Reload mihomo service configuration (via API)')
reload_service_parser.add_argument('--name', default='mihomo', help='Service name (default: mihomo)')
# Status service command # Status service command
status_service_parser = service_subparsers.add_parser('status', help='Check mihomo system service status') status_service_parser = service_subparsers.add_parser('status', help='Check mihomo system service status')
status_service_parser.add_argument('--name', default='mihomo', help='Service name (default: mihomo)') status_service_parser.add_argument('--name', default='mihomo', help='Service name (default: mihomo)')
@ -138,6 +142,127 @@ def create_parser() -> argparse.ArgumentParser:
return parser return parser
def handle_subscription_command(args, subscription_manager: SubscriptionManager, core_config_manager: CoreConfigManager, core_manager: CoreManager, parser: argparse.ArgumentParser) -> None:
"""Handle subscription related commands."""
if not hasattr(args, 'subcommand') or not args.subcommand:
parser.parse_args(['subscription', '--help'])
return
if args.subcommand == 'add':
subscription_manager.add_subscription(args.name, args.url)
elif args.subcommand == 'refresh':
subscription_manager.refresh_subscription(args.name)
elif args.subcommand == 'rm':
subscription_manager.delete_subscription(args.name)
elif args.subcommand == 'rename':
subscription_manager.rename_subscription(args.name, args.new_name)
elif args.subcommand == 'set-url':
subscription_manager.set_subscription_url(args.name, args.url)
elif args.subcommand == 'activate':
subscription_manager.activate_subscription(args.name)
if core_config_manager.apply():
# Reload service if config applied successfully
core_manager.reload_service()
elif args.subcommand == 'list':
subscription_manager.list_subscriptions()
elif args.subcommand == 'storage':
subscription_manager.show_storage_info()
else:
parser.parse_args(['subscription', '--help'])
def handle_core_command(args, core_manager: CoreManager, core_config_manager: CoreConfigManager, parser: argparse.ArgumentParser) -> None:
"""Handle core/config/service related commands."""
if not hasattr(args, 'core_command') or not args.core_command:
parser.parse_args(['core', '--help'])
return
if args.core_command == 'update':
core_manager.update(version=args.version, force=args.force)
else:
parser.parse_args(['core', '--help'])
def handle_config_command(args, core_config_manager: CoreConfigManager, parser: argparse.ArgumentParser) -> None:
"""Handle configuration commands."""
if not hasattr(args, 'config_command') or not args.config_command:
parser.parse_args(['config', '--help'])
return
if args.config_command == 'import':
core_config_manager.import_config(args.source)
elif args.config_command == 'export':
core_config_manager.export_config(args.destination)
elif args.config_command == 'edit':
core_config_manager.edit_config()
elif args.config_command == 'reset':
core_config_manager.reset_config()
elif args.config_command == 'show':
core_config_manager.show_config()
elif args.config_command == 'apply':
core_config_manager.apply()
else:
parser.parse_args(['config', '--help'])
def handle_service_command(args, core_manager: CoreManager, parser: argparse.ArgumentParser) -> None:
"""Handle service commands."""
if not hasattr(args, 'service_command') or not args.service_command:
parser.parse_args(['service', '--help'])
return
if args.service_command == 'install':
success = core_manager.install_service(
service_name=args.name,
description=args.description
)
if not success:
sys.exit(1)
elif args.service_command == 'uninstall':
success = core_manager.uninstall_service(service_name=args.name)
if not success:
sys.exit(1)
elif args.service_command == 'start':
success = core_manager.start_service(service_name=args.name)
if not success:
sys.exit(1)
elif args.service_command == 'stop':
success = core_manager.stop_service(service_name=args.name)
if not success:
sys.exit(1)
elif args.service_command == 'restart':
success = core_manager.restart_service(service_name=args.name)
if not success:
sys.exit(1)
elif args.service_command == 'reload':
success = core_manager.reload_service(service_name=args.name)
if not success:
sys.exit(1)
elif args.service_command == 'status':
status = core_manager.get_service_status(service_name=args.name)
print(f"Service '{args.name}' status: {status}")
else:
parser.parse_args(['service', '--help'])
def handle_hook_command(args, hook_manager: HookManager, parser: argparse.ArgumentParser) -> None:
"""Handle hook commands."""
if not hasattr(args, 'hook_command') or not args.hook_command:
parser.parse_args(['hook', '--help'])
return
if args.hook_command == 'init':
hook_manager.init()
elif args.hook_command == 'list':
hook_manager.list_hooks()
elif args.hook_command == 'edit':
hook_manager.edit(args.script)
elif args.hook_command == 'rm':
hook_manager.rm(args.script)
else:
parser.parse_args(['hook', '--help'])
def main() -> None: def main() -> None:
"""Main CLI entry point.""" """Main CLI entry point."""
parser = create_parser() parser = create_parser()
@ -155,109 +280,15 @@ def main() -> None:
try: try:
if args.command == 'subscription': if args.command == 'subscription':
if not hasattr(args, 'subcommand') or not args.subcommand: handle_subscription_command(args, subscription_manager, core_config_manager, core_manager, parser)
parser.parse_args(['subscription', '--help'])
return
if args.subcommand == 'add':
subscription_manager.add_subscription(args.name, args.url)
elif args.subcommand == 'refresh':
subscription_manager.refresh_subscription(args.name)
elif args.subcommand == 'rm':
subscription_manager.delete_subscription(args.name)
elif args.subcommand == 'rename':
subscription_manager.rename_subscription(args.name, args.new_name)
elif args.subcommand == 'set-url':
subscription_manager.set_subscription_url(args.name, args.url)
elif args.subcommand == 'activate':
subscription_manager.activate_subscription(args.name)
elif args.subcommand == 'list':
subscription_manager.list_subscriptions()
elif args.subcommand == 'storage':
subscription_manager.show_storage_info()
else:
parser.parse_args(['subscription', '--help'])
elif args.command == 'core': elif args.command == 'core':
if not hasattr(args, 'core_command') or not args.core_command: handle_core_command(args, core_manager, core_config_manager, parser)
parser.parse_args(['core', '--help']) elif args.command == 'config':
return handle_config_command(args, core_config_manager, parser)
elif args.command == 'service':
if args.core_command == 'update': handle_service_command(args, core_manager, parser)
core_manager.update(version=args.version, force=args.force)
elif args.core_command == 'config':
if not hasattr(args, 'config_command') or not args.config_command:
parser.parse_args(['core', 'config', '--help'])
return
if args.config_command == 'import':
core_config_manager.import_config(args.source)
elif args.config_command == 'export':
core_config_manager.export_config(args.destination)
elif args.config_command == 'edit':
core_config_manager.edit_config()
elif args.config_command == 'reset':
core_config_manager.reset_config()
elif args.config_command == 'show':
core_config_manager.show_config()
elif args.config_command == 'apply':
core_config_manager.apply()
else:
parser.parse_args(['core', 'config', '--help'])
elif args.core_command == 'service':
if not hasattr(args, 'service_command') or not args.service_command:
parser.parse_args(['core', 'service', '--help'])
return
if args.service_command == 'install':
success = core_manager.install_service(
service_name=args.name,
description=args.description
)
if not success:
sys.exit(1)
elif args.service_command == 'uninstall':
success = core_manager.uninstall_service(service_name=args.name)
if not success:
sys.exit(1)
elif args.service_command == 'start':
success = core_manager.start_service(service_name=args.name)
if not success:
sys.exit(1)
elif args.service_command == 'stop':
success = core_manager.stop_service(service_name=args.name)
if not success:
sys.exit(1)
elif args.service_command == 'restart':
success = core_manager.restart_service(service_name=args.name)
if not success:
sys.exit(1)
elif args.service_command == 'status':
status = core_manager.get_service_status(service_name=args.name)
print(f"Service '{args.name}' status: {status}")
else:
parser.parse_args(['core', 'service', '--help'])
else:
parser.parse_args(['core', '--help'])
elif args.command == 'hook': elif args.command == 'hook':
if not hasattr(args, 'hook_command') or not args.hook_command: handle_hook_command(args, hook_manager, parser)
parser.parse_args(['hook', '--help'])
return
if args.hook_command == 'init':
hook_manager.init()
elif args.hook_command == 'list':
hook_manager.list()
elif args.hook_command == 'edit':
hook_manager.edit(args.script)
elif args.hook_command == 'rm':
hook_manager.rm(args.script)
else:
parser.parse_args(['hook', '--help'])
else: else:
parser.print_help() parser.print_help()
except KeyboardInterrupt: except KeyboardInterrupt:
@ -269,4 +300,4 @@ def main() -> None:
if __name__ == '__main__': if __name__ == '__main__':
main() main()

View File

@ -14,8 +14,8 @@ from typing import Optional, Dict, Any
import requests import requests
from pathlib import Path from pathlib import Path
from scientific_surfing.corecfg_manager import CoreConfigManager from .corecfg_manager import CoreConfigManager
from scientific_surfing.service_manager import ServiceManager from .service_manager import ServiceManager
class CoreManager: class CoreManager:
@ -25,6 +25,58 @@ class CoreManager:
self.storage = core_config_manager.storage self.storage = core_config_manager.storage
self.core_config_manager = core_config_manager self.core_config_manager = core_config_manager
def _get_platform_info(self) -> tuple[Optional[str], Optional[str], Optional[str]]:
"""
Get platform, normalized architecture, and binary name base.
Returns:
tuple: (system, normalized_arch, binary_name_base) or (None, None, None) if unsupported.
"""
system = platform.system().lower()
machine = platform.machine().lower()
# Map platform to mihomo binary naming (base name without extension)
platform_map = {
'windows': {
'amd64': 'mihomo-windows-amd64',
'386': 'mihomo-windows-386',
'arm64': 'mihomo-windows-arm64',
'arm': 'mihomo-windows-arm32v7'
},
'linux': {
'amd64': 'mihomo-linux-amd64',
'386': 'mihomo-linux-386',
'arm64': 'mihomo-linux-arm64',
'arm': 'mihomo-linux-armv7'
},
'darwin': {
'amd64': 'mihomo-darwin-amd64',
'arm64': 'mihomo-darwin-arm64'
}
}
# Normalize architecture names
arch_map = {
'x86_64': 'amd64',
'amd64': 'amd64',
'i386': '386',
'i686': '386',
'arm64': 'arm64',
'aarch64': 'arm64',
'armv7l': 'arm',
'arm': 'arm'
}
if system not in platform_map:
print(f"❌ Unsupported operating system: {system}")
return None, None, None
normalized_arch = arch_map.get(machine, machine)
if normalized_arch not in platform_map[system]:
print(f"❌ Unsupported architecture: {machine} ({normalized_arch})")
return None, None, None
return system, normalized_arch, platform_map[system][normalized_arch]
def update(self, version: Optional[str] = None, force: bool = False) -> bool: def update(self, version: Optional[str] = None, force: bool = False) -> bool:
""" """
Download and update mihomo binary from GitHub releases. Download and update mihomo binary from GitHub releases.
@ -37,53 +89,10 @@ class CoreManager:
bool: True if update successful, False otherwise. bool: True if update successful, False otherwise.
""" """
try: try:
# Determine current OS and architecture system, normalized_arch, binary_name = self._get_platform_info()
system = platform.system().lower() if not system:
machine = platform.machine().lower()
# Map platform to mihomo binary naming (base name without extension)
platform_map = {
'windows': {
'amd64': 'mihomo-windows-amd64',
'386': 'mihomo-windows-386',
'arm64': 'mihomo-windows-arm64',
'arm': 'mihomo-windows-arm32v7'
},
'linux': {
'amd64': 'mihomo-linux-amd64',
'386': 'mihomo-linux-386',
'arm64': 'mihomo-linux-arm64',
'arm': 'mihomo-linux-armv7'
},
'darwin': {
'amd64': 'mihomo-darwin-amd64',
'arm64': 'mihomo-darwin-arm64'
}
}
# Normalize architecture names
arch_map = {
'x86_64': 'amd64',
'amd64': 'amd64',
'i386': '386',
'i686': '386',
'arm64': 'arm64',
'aarch64': 'arm64',
'armv7l': 'arm',
'arm': 'arm'
}
if system not in platform_map:
print(f"❌ Unsupported operating system: {system}")
return False return False
normalized_arch = arch_map.get(machine, machine)
if normalized_arch not in platform_map[system]:
print(f"❌ Unsupported architecture: {machine} ({normalized_arch})")
return False
binary_name = platform_map[system][normalized_arch]
# Setup directories # Setup directories
binary_dir = self.core_config_manager.storage.config_dir / "bin" binary_dir = self.core_config_manager.storage.config_dir / "bin"
binary_dir.mkdir(parents=True, exist_ok=True) binary_dir.mkdir(parents=True, exist_ok=True)
@ -424,6 +433,73 @@ class CoreManager:
except Exception as e: except Exception as e:
return f"Error checking service status: {e}" return f"Error checking service status: {e}"
def reload_service(self, service_name: str = "mihomo") -> bool:
"""
Reload mihomo configuration via external controller API without restarting service.
Args:
service_name: Name of the service (default: "mihomo")
Returns:
bool: True if reload successful, False otherwise.
"""
config_path = self.core_config_manager.storage.config_dir / "generated_config.yaml"
if not config_path.exists():
print(f"❌ Configuration file not found: {config_path}")
return False
try:
# Parse generated config to find external-controller
import yaml
with open(config_path, 'r', encoding='utf-8') as f:
config = yaml.safe_load(f)
external_controller = config.get('external-controller', '127.0.0.1:9090')
secret = config.get('secret', '')
# Format API URL
if not external_controller.startswith('http'):
base_url = f"http://{external_controller}"
else:
base_url = external_controller
# Prepare request
url = f"{base_url}/configs?force=true"
headers = {
'Content-Type': 'application/json'
}
if secret:
headers['Authorization'] = f"Bearer {secret}"
payload = {
"path": str(config_path.absolute()),
"payload": "" # Empty payload suggests reload from path
}
# Send reload request
print(f"🔄 Reloading configuration via API: {url}")
# Short timeout as local controller should respond quickly
response = requests.put(url, json=payload, headers=headers, timeout=2)
if response.status_code == 204:
print(f"✅ Configuration reloaded successfully")
return True
else:
print(f"❌ Failed to reload configuration: HTTP {response.status_code}")
print(f" Response: {response.text}")
return False
except requests.exceptions.RequestException as e:
print(f"❌ Failed to connect to external controller: {e}")
print(" Is the service running?")
return False
except ImportError:
print("❌ PyYAML is required to parse configuration.")
return False
except Exception as e:
print(f"❌ Error reloading service: {e}")
return False
def deep_merge(dict1, dict2): def deep_merge(dict1, dict2):
for k, v in dict2.items(): for k, v in dict2.items():
@ -433,4 +509,4 @@ def deep_merge(dict1, dict2):
dict1[k].extend(v) # Example: extend lists. Adjust logic for other list merging needs. dict1[k].extend(v) # Example: extend lists. Adjust logic for other list merging needs.
else: else:
dict1[k] = v dict1[k] = v
return dict1 return dict1

View File

@ -11,8 +11,9 @@ from pathlib import Path
import yaml import yaml
from scientific_surfing.models import Config from .models import Config
from scientific_surfing.subscription_manager import SubscriptionManager from .subscription_manager import SubscriptionManager
from .utils import open_file_in_editor
class CoreConfigManager: class CoreConfigManager:
@ -120,46 +121,25 @@ class CoreConfigManager:
if not self._ensure_config_exists(): if not self._ensure_config_exists():
return False return False
# Get system editor # Backup current config
editor = os.environ.get('EDITOR') or os.environ.get('VISUAL') backup_path = self.config_file.with_suffix('.yaml.backup')
if not editor: if self.config_file.exists():
# Try common editors shutil.copy2(self.config_file, backup_path)
for cmd in ['code', 'subl', 'atom', 'vim', 'nano', 'notepad']:
if shutil.which(cmd):
editor = cmd
break
if not editor: # Open editor
print("❌ No editor found. Please set EDITOR or VISUAL environment variable") if not open_file_in_editor(self.config_file):
return False return False
# Validate edited config
try: try:
# Backup current config self.load_config()
backup_path = self.config_file.with_suffix('.yaml.backup') print("✅ Configuration edited successfully")
if self.config_file.exists(): return True
shutil.copy2(self.config_file, backup_path)
# Open editor
subprocess.run([editor, str(self.config_file)], check=True)
# Validate edited config
try:
config = self.load_config()
print("✅ Configuration edited successfully")
return True
except Exception as e:
# Restore backup if validation fails
if backup_path.exists():
shutil.copy2(backup_path, self.config_file)
print(f"❌ Invalid configuration: {e}")
print("🔄 Restored previous configuration")
return False
except subprocess.CalledProcessError:
print("❌ Editor command failed")
return False
except Exception as e: except Exception as e:
print(f"Failed to edit configuration: {e}") print(f"Configuration invalid: {e}")
print("Restoring backup...")
if backup_path.exists():
shutil.copy2(backup_path, self.config_file)
return False return False
def reset_config(self) -> bool: def reset_config(self) -> bool:
@ -217,7 +197,7 @@ class CoreConfigManager:
# On Windows, try to execute directly (batch files, etc.) # On Windows, try to execute directly (batch files, etc.)
cmd = [str(hook_path), str(config_file_path)] cmd = [str(hook_path), str(config_file_path)]
print(f"🔧 Executing hook: {hook_path.name}") print(f"🔧 Executing hook: {hook_path}")
env = os.environ.copy() env = os.environ.copy()
env['PYTHONIOENCODING'] = 'utf-8' env['PYTHONIOENCODING'] = 'utf-8'
result = subprocess.run( result = subprocess.run(
@ -227,8 +207,9 @@ class CoreConfigManager:
text=True, text=True,
timeout=30, timeout=30,
encoding="utf-8", encoding="utf-8",
shell=True, # shell=True,
env=env, env=env,
# stdout=subprocess.PIPE, stderr=subprocess.STDOUT
) )
if result.returncode == 0: if result.returncode == 0:
@ -272,7 +253,7 @@ class CoreConfigManager:
def apply(self) -> bool: def apply(self) -> bool:
"""Apply active subscription to generate final config file.""" """Apply active subscription to generate final config file."""
from scientific_surfing.subscription_manager import SubscriptionManager from ss.subscription_manager import SubscriptionManager
# Load current configuration # Load current configuration
config = self.load_config() config = self.load_config()
@ -369,4 +350,4 @@ def deep_merge(dict1, dict2):
dict1[k].extend(v) # Example: extend lists. Adjust logic for other list merging needs. dict1[k].extend(v) # Example: extend lists. Adjust logic for other list merging needs.
else: else:
dict1[k] = v dict1[k] = v
return dict1 return dict1

View File

@ -6,7 +6,8 @@ import subprocess
from pathlib import Path from pathlib import Path
from typing import List from typing import List
from scientific_surfing.storage import StorageManager from .storage import StorageManager
from .utils import open_file_in_editor
class HookManager: class HookManager:
@ -46,7 +47,7 @@ class HookManager:
print(f"\nInitialized hooks directory with {copied_count} new scripts.") print(f"\nInitialized hooks directory with {copied_count} new scripts.")
print(f"Location: {self.hooks_dir}") print(f"Location: {self.hooks_dir}")
def list(self) -> None: def list_hooks(self) -> None:
"""Display hooks directory location and list all hook scripts.""" """Display hooks directory location and list all hook scripts."""
print(f"Hooks directory: {self.hooks_dir}") print(f"Hooks directory: {self.hooks_dir}")
@ -82,14 +83,7 @@ class HookManager:
print(f"Available scripts: {', '.join(available)}") print(f"Available scripts: {', '.join(available)}")
return return
editor = os.environ.get('EDITOR', 'notepad' if os.name == 'nt' else 'nano') open_file_in_editor(script_path)
try:
subprocess.run([editor, str(script_path)], check=True)
except subprocess.CalledProcessError as e:
print(f"Failed to open editor: {e}")
except FileNotFoundError:
print(f"Editor '{editor}' not found. Please set EDITOR environment variable.")
def rm(self, script_name: str) -> None: def rm(self, script_name: str) -> None:
"""Remove a hook script. """Remove a hook script.

View File

@ -4,28 +4,27 @@ Pydantic models for scientific-surfing data structures.
from datetime import datetime from datetime import datetime
import os import os
from enum import Enum
from typing import Dict, List, Optional from typing import Dict, List, Optional
from pydantic import BaseModel, Field, validator from pydantic import BaseModel, Field, validator
class SubscriptionStatus(str, Enum):
ACTIVE = "active"
INACTIVE = "inactive"
class Subscription(BaseModel): class Subscription(BaseModel):
"""Model for a single subscription.""" """Model for a single subscription."""
name: str = Field(..., description="Name of the subscription") name: str = Field(..., description="Name of the subscription")
url: str = Field(..., description="Clash RSS subscription URL") url: str = Field(..., description="Clash RSS subscription URL")
status: str = Field(default="inactive", description="Status: active or inactive") status: SubscriptionStatus = Field(default=SubscriptionStatus.INACTIVE, description="Status: active or inactive")
last_refresh: Optional[datetime] = Field(default=None, description="Last refresh timestamp") last_refresh: Optional[datetime] = Field(default=None, description="Last refresh timestamp")
file_size: Optional[int] = Field(default=None, description="Size of downloaded file in bytes") file_size: Optional[int] = Field(default=None, description="Size of downloaded file in bytes")
status_code: Optional[int] = Field(default=None, description="HTTP status code of last refresh") status_code: Optional[int] = Field(default=None, description="HTTP status code of last refresh")
content_hash: Optional[int] = Field(default=None, description="Hash of downloaded content") content_hash: Optional[int] = Field(default=None, description="Hash of downloaded content")
last_error: Optional[str] = Field(default=None, description="Last error message if any") last_error: Optional[str] = Field(default=None, description="Last error message if any")
@validator('status')
def validate_status(cls, v):
if v not in ['active', 'inactive']:
raise ValueError('Status must be either "active" or "inactive"')
return v
class Config: class Config:
json_encoders = { json_encoders = {
datetime: lambda v: v.isoformat() if v else None datetime: lambda v: v.isoformat() if v else None
@ -66,7 +65,7 @@ class SubscriptionsData(BaseModel):
def get_active_subscription(self) -> Optional[Subscription]: def get_active_subscription(self) -> Optional[Subscription]:
"""Get the currently active subscription.""" """Get the currently active subscription."""
for subscription in self.subscriptions.values(): for subscription in self.subscriptions.values():
if subscription.status == 'active': if subscription.status == SubscriptionStatus.ACTIVE:
return subscription return subscription
return None return None
@ -76,7 +75,7 @@ class SubscriptionsData(BaseModel):
return False return False
for sub_name, subscription in self.subscriptions.items(): for sub_name, subscription in self.subscriptions.items():
subscription.status = 'active' if sub_name == name else 'inactive' subscription.status = SubscriptionStatus.ACTIVE if sub_name == name else SubscriptionStatus.INACTIVE
return True return True
def add_subscription(self, name: str, url: str) -> Subscription: def add_subscription(self, name: str, url: str) -> Subscription:
@ -85,7 +84,7 @@ class SubscriptionsData(BaseModel):
# If this is the first subscription, set it as active # If this is the first subscription, set it as active
if not self.subscriptions: if not self.subscriptions:
subscription.status = 'active' subscription.status = SubscriptionStatus.ACTIVE
self.subscriptions[name] = subscription self.subscriptions[name] = subscription
return subscription return subscription

View File

@ -16,7 +16,7 @@ from typing import Optional, Protocol
from pydantic import BaseModel, Field, field_validator from pydantic import BaseModel, Field, field_validator
from scientific_surfing.storage import StorageManager from .storage import StorageManager
class ServiceConfig(BaseModel): class ServiceConfig(BaseModel):
@ -166,7 +166,7 @@ class WindowsServiceManager(ServiceManagerProtocol):
# Create permanent config file in a stable location # Create permanent config file in a stable location
config_dir = self.config_dir config_dir = self.config_dir
config_dir.mkdir(parents=True, exist_ok=True) config_dir.mkdir(parents=True, exist_ok=True)
config_file = config_dir / f"{config.name}_config.json" config_file = Path.home() / f"{config.name}_config.json"
with open(config_file, 'w') as f: with open(config_file, 'w') as f:
json.dump(asdict(windows_service_config), f, indent=2) json.dump(asdict(windows_service_config), f, indent=2)
@ -216,8 +216,6 @@ class WindowsServiceManager(ServiceManagerProtocol):
def uninstall(self, name: str) -> None: def uninstall(self, name: str) -> None:
"""Uninstall a Windows service.""" """Uninstall a Windows service."""
import json
from pathlib import Path
try: try:
# Stop the service first # Stop the service first
@ -231,7 +229,7 @@ class WindowsServiceManager(ServiceManagerProtocol):
self._run_as_admin(["sc", "delete", name], f"uninstall service '{name}'") self._run_as_admin(["sc", "delete", name], f"uninstall service '{name}'")
# Clean up configuration file # Clean up configuration file
config_dir = Path.home() / ".scientific_surfing" / "service_configs" config_dir = self.config_dir
config_file = config_dir / f"{name}_config.json" config_file = config_dir / f"{name}_config.json"
try: try:
config_file.unlink(missing_ok=True) config_file.unlink(missing_ok=True)
@ -627,4 +625,4 @@ if __name__ == "__main__":
sys.exit(1) sys.exit(1)
except Exception as e: except Exception as e:
print(f"Error: {e}") print(f"Error: {e}")
sys.exit(1) sys.exit(1)

View File

@ -9,7 +9,7 @@ import yaml
from pathlib import Path from pathlib import Path
from typing import Dict from typing import Dict
from scientific_surfing.models import SubscriptionsData from .models import SubscriptionsData
class StorageManager: class StorageManager:
@ -27,27 +27,7 @@ class StorageManager:
if config_dir: if config_dir:
return Path(config_dir) return Path(config_dir)
system = platform.system().lower() return Path.home() / "basicfiles" / "cli" / "ss"
if system == "windows":
# Windows: %APPDATA%/scientific_surfing
app_data = os.environ.get("APPDATA")
if app_data:
return Path(app_data) / "scientific_surfing"
else:
return Path.home() / "AppData" / "Roaming" / "scientific_surfing"
elif system == "darwin":
# macOS: ~/Library/Application Support/scientific_surfing
return Path.home() / "Library" / "Application Support" / "scientific_surfing"
else:
# Linux and other Unix-like systems: ~/.config/scientific_surfing
xdg_config_home = os.environ.get("XDG_CONFIG_HOME")
if xdg_config_home:
return Path(xdg_config_home) / "scientific_surfing"
else:
return Path.home() / ".config" / "scientific_surfing"
def _ensure_config_dir(self) -> None: def _ensure_config_dir(self) -> None:
"""Ensure the configuration directory exists.""" """Ensure the configuration directory exists."""
@ -73,7 +53,11 @@ class StorageManager:
try: try:
with open(self.subscriptions_file, 'w', encoding='utf-8') as f: with open(self.subscriptions_file, 'w', encoding='utf-8') as f:
# Convert Pydantic model to dict for YAML serialization # Convert Pydantic model to dict for YAML serialization
data = subscriptions.dict() if hasattr(subscriptions, 'model_dump'):
data = subscriptions.model_dump(mode='json')
else:
# Fallback for Pydantic v1
data = subscriptions.dict()
yaml.dump(data, f, default_flow_style=False, allow_unicode=True) yaml.dump(data, f, default_flow_style=False, allow_unicode=True)
return True return True
except (yaml.YAMLError, IOError, ValueError) as e: except (yaml.YAMLError, IOError, ValueError) as e:
@ -115,4 +99,4 @@ class StorageManager:
'subscriptions_file': str(self.subscriptions_file), 'subscriptions_file': str(self.subscriptions_file),
'platform': platform.system(), 'platform': platform.system(),
'exists': str(self.config_dir.exists()) 'exists': str(self.config_dir.exists())
} }

380
ss/subscription_manager.py Normal file
View File

@ -0,0 +1,380 @@
"""
Subscription management module for scientific-surfing.
Handles subscription operations with persistent storage.
"""
from datetime import datetime
import base64
import urllib.parse
import yaml
import requests
from .storage import StorageManager
from .models import SubscriptionStatus
class SubscriptionManager:
"""Manages clash RSS subscriptions with persistent storage."""
storage: StorageManager = None
def __init__(self, storage: StorageManager):
self.storage = storage
self.subscriptions_data = self.storage.load_subscriptions()
self.config = self.storage.load_config()
# Create subscriptions directory for storing downloaded files
self.subscriptions_dir = self.storage.config_dir / "subscriptions"
self.subscriptions_dir.mkdir(exist_ok=True)
def add_subscription(self, name: str, url: str) -> None:
"""Add a new subscription."""
subscription = self.subscriptions_data.add_subscription(name, url)
if self.storage.save_subscriptions(self.subscriptions_data):
self.refresh_subscription(subscription.name)
self.activate_subscription(subscription.name)
print(f"✅ Added subscription: {name} -> {url}")
else:
print("❌ Failed to save subscription")
def _parse_ss(self, uri: str) -> dict:
"""Parse ss:// URI to Clash proxy config"""
if not uri.startswith('ss://'):
return None
# Check if there is a tag (fragment)
name = "ss"
if '#' in uri:
uri, name_part = uri.split('#', 1)
name = urllib.parse.unquote(name_part)
content = uri[5:]
# Check if userinfo is base64 encoded
if '@' in content:
userinfo, hostport = content.split('@', 1)
# Try to decode userinfo first as it might be base64(method:password)
method = password = None
try:
# handle padding
padding = len(userinfo) % 4
if padding:
u_temp = userinfo + '=' * (4 - padding)
else:
u_temp = userinfo
decoded_userinfo = base64.urlsafe_b64decode(u_temp).decode('utf-8')
if ':' in decoded_userinfo:
method, password = decoded_userinfo.split(':', 1)
except Exception:
pass
if not method and ':' in userinfo:
method, password = userinfo.split(':', 1)
if not method or not password:
return None
if ':' not in hostport:
return None
try:
server, port = hostport.split(':')
port = int(port)
except ValueError:
return None
else:
# entire body is likely base64 encoded
try:
padding = len(content) % 4
if padding:
content += '=' * (4 - padding)
decoded = base64.urlsafe_b64decode(content).decode('utf-8')
if '@' not in decoded:
return None
# recursive call with decoded URI (prepend ss://)
# but usually decoded is method:password@host:port
method_pass, host_port = decoded.split('@', 1)
if ':' not in method_pass:
return None
method, password = method_pass.split(':', 1)
if ':' not in host_port:
return None
server, port = host_port.split(':', 1)
port = int(port)
except Exception:
return None
return {
'name': name,
'type': 'ss',
'server': server,
'port': port,
'cipher': method,
'password': password
}
def _parse_trojan(self, uri: str) -> dict:
"""Parse trojan:// URI to Clash proxy config"""
if not uri.startswith('trojan://'):
return None
parsed = urllib.parse.urlparse(uri)
query = urllib.parse.parse_qs(parsed.query)
name = urllib.parse.unquote(parsed.fragment) if parsed.fragment else 'trojan'
config = {
'name': name,
'type': 'trojan',
'server': parsed.hostname,
'port': parsed.port,
'password': parsed.username,
'udp': True
}
if 'sni' in query:
config['sni'] = query['sni'][0]
elif 'peer' in query:
config['sni'] = query['peer'][0]
if 'allowInsecure' in query:
config['skip-cert-verify'] = query['allowInsecure'][0] == '1'
return config
def _parse_vless(self, uri: str) -> dict:
"""Parse vless:// URI to Clash proxy config"""
if not uri.startswith('vless://'):
return None
parsed = urllib.parse.urlparse(uri)
query = urllib.parse.parse_qs(parsed.query)
name = urllib.parse.unquote(parsed.fragment) if parsed.fragment else 'vless'
config = {
'name': name,
'type': 'vless',
'server': parsed.hostname,
'port': parsed.port,
'uuid': parsed.username,
'udp': True,
'tls': False,
'network': 'tcp'
}
if 'security' in query:
sec = query['security'][0]
if sec == 'tls':
config['tls'] = True
elif sec == 'reality':
config['tls'] = True
config['reality-opts'] = {}
if 'pbk' in query:
config['reality-opts']['public-key'] = query['pbk'][0]
if 'sid' in query:
config['reality-opts']['short-id'] = query['sid'][0]
if 'sni' in query:
config['servername'] = query['sni'][0]
if 'flow' in query:
config['flow'] = query['flow'][0]
if 'type' in query:
config['network'] = query['type'][0]
elif 'headerType' in query:
# Sometimes type is omitted?
pass
if 'sni' in query:
config['servername'] = query['sni'][0]
if 'fp' in query:
config['client-fingerprint'] = query['fp'][0]
return config
def _convert_content(self, content: str) -> str:
"""Convert subscription content to Clash YAML format if needed."""
# Check if content is already valid YAML
try:
yaml_content = yaml.safe_load(content)
if isinstance(yaml_content, dict) and 'proxies' in yaml_content:
return content
except yaml.YAMLError:
pass
lines = []
# Try treating as Base64 encoded list first
if 'ss://' not in content and 'trojan://' not in content and 'vless://' not in content:
try:
decoded = base64.b64decode(content).decode('utf-8')
lines = decoded.splitlines()
except Exception:
pass
# Fallback to reading lines directly
if not lines:
lines = content.splitlines()
proxies = []
for line in lines:
line = line.strip()
if not line:
continue
proxy = None
if line.startswith('ss://'):
proxy = self._parse_ss(line)
elif line.startswith('trojan://'):
proxy = self._parse_trojan(line)
elif line.startswith('vless://'):
proxy = self._parse_vless(line)
if proxy:
proxies.append(proxy)
if proxies:
return yaml.dump({'proxies': proxies}, allow_unicode=True, sort_keys=False)
return content
def refresh_subscription(self, name: str) -> None:
"""Refresh a subscription by downloading from URL."""
if name not in self.subscriptions_data.subscriptions:
print(f"❌ Subscription '{name}' not found")
return
subscription = self.subscriptions_data.subscriptions[name]
url = subscription.url
print(f"🔄 Refreshing subscription: {name}")
try:
# Download the subscription content
response = requests.get(url)
response.raise_for_status()
# File path without timestamp
file_path = self.subscriptions_dir / f"{name}.yml"
# Handle existing file by renaming with creation date
if file_path.exists():
# Get creation time of existing file
stat = file_path.stat()
try:
# Try st_birthtime first (macOS/Unix)
creation_time = datetime.fromtimestamp(stat.st_birthtime)
except AttributeError:
# Fallback to st_ctime (Windows)
creation_time = datetime.fromtimestamp(stat.st_ctime)
backup_name = f"{name}_{creation_time.strftime('%Y%m%d_%H%M%S')}.yml"
backup_path = self.subscriptions_dir / backup_name
# Rename existing file
file_path.rename(backup_path)
print(f" 🔄 Backed up existing file to: {backup_name}")
# Check if response.text is valid yaml, or use convert_sub.py to transform
content = self._convert_content(response.text)
# Save the new downloaded content
with open(file_path, 'w', encoding='utf-8') as f:
f.write(content)
# Update subscription metadata
subscription.last_refresh = datetime.now()
subscription.file_size = len(content)
subscription.status_code = response.status_code
subscription.content_hash = hash(content)
subscription.last_error = None
if self.storage.save_subscriptions(self.subscriptions_data):
print(f"✅ Subscription '{name}' refreshed successfully")
print(f" 📁 Saved to: {file_path}")
print(f" 📊 Size: {len(response.text)} bytes")
else:
print("❌ Failed to save subscription metadata")
except requests.exceptions.RequestException as e:
print(f"❌ Failed to download subscription: {e}")
subscription.last_error = str(e)
self.storage.save_subscriptions(self.subscriptions_data)
except IOError as e:
print(f"❌ Failed to save file: {e}")
subscription.last_error = str(e)
self.storage.save_subscriptions(self.subscriptions_data)
def delete_subscription(self, name: str) -> None:
"""Delete a subscription."""
if self.subscriptions_data.remove_subscription(name):
if self.storage.save_subscriptions(self.subscriptions_data):
print(f"🗑️ Deleted subscription: {name}")
else:
print("❌ Failed to delete subscription")
else:
print(f"❌ Subscription '{name}' not found")
def rename_subscription(self, old_name: str, new_name: str) -> None:
"""Rename a subscription."""
if self.subscriptions_data.rename_subscription(old_name, new_name):
if self.storage.save_subscriptions(self.subscriptions_data):
print(f"✅ Renamed subscription: {old_name} -> {new_name}")
else:
print("❌ Failed to rename subscription")
else:
print(f"❌ Failed to rename subscription: '{old_name}' not found or '{new_name}' already exists")
def set_subscription_url(self, name: str, url: str) -> None:
"""Update the URL for a subscription."""
if self.subscriptions_data.set_subscription_url(name, url):
if self.storage.save_subscriptions(self.subscriptions_data):
print(f"✅ Updated URL for subscription '{name}': {url}")
else:
print("❌ Failed to save subscription")
else:
print(f"❌ Subscription '{name}' not found")
def activate_subscription(self, name: str) -> None:
"""Activate a subscription."""
if self.subscriptions_data.set_active(name):
if self.storage.save_subscriptions(self.subscriptions_data):
print(f"✅ Activated subscription: {name}")
else:
print("❌ Failed to activate subscription")
else:
print(f"❌ Subscription '{name}' not found")
def list_subscriptions(self) -> None:
"""List all subscriptions."""
if not self.subscriptions_data.subscriptions:
print("No subscriptions found")
return
print("📋 Subscriptions:")
for name, subscription in self.subscriptions_data.subscriptions.items():
is_active = subscription.status == SubscriptionStatus.ACTIVE
active_marker = "" if is_active else " "
last_refresh_str = ""
if subscription.last_refresh:
last_refresh_str = f" (last: {subscription.last_refresh.strftime('%Y-%m-%d %H:%M:%S')})"
status_str = "active" if is_active else "inactive"
print(f" {active_marker} {name}: {subscription.url} ({status_str}){last_refresh_str}")
def show_storage_info(self) -> None:
"""Show storage information."""
info = self.storage.get_storage_info()
print("📁 Storage Information:")
print(f" Platform: {info['platform']}")
print(f" Config Directory: {info['config_dir']}")
print(f" Config File: {info['config_file']}")
print(f" Subscriptions File: {info['subscriptions_file']}")
print(f" Directory Exists: {info['exists']}")

49
ss/utils.py Normal file
View File

@ -0,0 +1,49 @@
import os
import sys
import shutil
import subprocess
from pathlib import Path
from typing import Optional
def get_editor_command() -> Optional[str]:
"""
Get the command for the system's default editor.
Prioritizes EDITOR/VISUAL env vars, then falls back to common editors.
"""
editor = os.environ.get('EDITOR') or os.environ.get('VISUAL')
if editor:
return editor
# Try common editors
common_editors = ['code', 'subl', 'atom', 'vim', 'nano', 'notepad']
if os.name == 'nt':
common_editors.insert(0, 'notepad')
for cmd in common_editors:
if shutil.which(cmd):
return cmd
return None
def open_file_in_editor(file_path: Path) -> bool:
"""
Open a file in the system default editor.
Returns True if successful, False otherwise.
"""
editor = get_editor_command()
if not editor:
print("❌ No editor found. Please set EDITOR or VISUAL environment variable")
return False
try:
subprocess.run([editor, str(file_path)], check=True)
return True
except subprocess.CalledProcessError as e:
print(f"❌ Failed to open editor: {e}")
return False
except FileNotFoundError:
print(f"❌ Editor '{editor}' not found.")
return False
except Exception as e:
print(f"❌ Error opening editor: {e}")
return False

View File

@ -85,7 +85,7 @@ class WindowsServiceFramework(win32serviceutil.ServiceFramework):
except Exception as e: except Exception as e:
# Fallback to servicemanager logging # Fallback to servicemanager logging
servicemanager.LogInfoMsg(f"Failed to setup file logging: {e}") servicemanager.LogInfoMsg(f"Failed to setup file logging: {e}")
self.log = servicemanager #self.log = servicemanager
@classmethod @classmethod
def load_service_config(cls): def load_service_config(cls):

View File

@ -1,10 +0,0 @@
{
"executable_path": "test.exe",
"arguments": "--test",
"working_directory": "",
"restart_on_failure": true,
"max_restarts": 5,
"restart_delay": 10,
"log_level": "INFO",
"environment_variables": {}
}

View File

@ -1,89 +0,0 @@
#!/usr/bin/env python3
"""
Test script for the upgrade method in corecfg_manager.py
"""
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
from scientific_surfing.corecfg_manager import CoreConfigManager
def test_upgrade():
"""Test the upgrade method functionality."""
manager = CoreConfigManager()
print("Testing upgrade method...")
print("=" * 50)
# Test 1: Check if upgrade method exists
if hasattr(manager, 'upgrade'):
print("[OK] upgrade method exists")
else:
print("[FAIL] upgrade method not found")
return False
# Test 2: Test OS detection (without actually downloading)
import platform
system = platform.system().lower()
machine = platform.machine().lower()
print(f"Detected OS: {system}")
print(f"Detected Architecture: {machine}")
# Test 3: Test platform mapping
platform_map = {
'windows': {
'amd64': 'mihomo-windows-amd64.exe',
'386': 'mihomo-windows-386.exe',
'arm64': 'mihomo-windows-arm64.exe',
'arm': 'mihomo-windows-arm32v7.exe'
},
'linux': {
'amd64': 'mihomo-linux-amd64',
'386': 'mihomo-linux-386',
'arm64': 'mihomo-linux-arm64',
'arm': 'mihomo-linux-armv7'
},
'darwin': {
'amd64': 'mihomo-darwin-amd64',
'arm64': 'mihomo-darwin-arm64'
}
}
arch_map = {
'x86_64': 'amd64',
'amd64': 'amd64',
'i386': '386',
'i686': '386',
'arm64': 'arm64',
'aarch64': 'arm64',
'armv7l': 'arm',
'arm': 'arm'
}
normalized_arch = arch_map.get(machine, machine)
if system in platform_map and normalized_arch in platform_map[system]:
binary_name = platform_map[system][normalized_arch]
print(f"[OK] Would download: {binary_name}")
else:
print(f"[FAIL] Unsupported platform: {system}/{normalized_arch}")
return False
# Test 4: Test directory creation
from scientific_surfing.storage import StorageManager
storage = StorageManager()
binary_dir = storage.config_dir / "bin"
print(f"Binary directory: {binary_dir}")
print("\n[OK] All tests passed! The upgrade method is ready to use.")
print("\nUsage examples:")
print(" manager.upgrade() # Download latest version")
print(" manager.upgrade(version='v1.18.5') # Download specific version")
print(" manager.upgrade(force=True) # Force re-download")
return True
if __name__ == "__main__":
test_upgrade()

View File

@ -1 +0,0 @@
# Test package for scientific-surfing

7
windows.md Normal file
View File

@ -0,0 +1,7 @@
管理员
```powershell
pip install -r requirements.txt -r requirements-win32.txt
python -m ss core service install
```