From 8993b8fb8805c821735745ff7a086f055b44ecc4 Mon Sep 17 00:00:00 2001 From: Klesh Wong Date: Thu, 5 Mar 2026 23:06:45 +0800 Subject: [PATCH] feat: support v2ray subscription format --- ss/subscription_manager.py | 225 +++++++++++++++++++++++++++++++++++-- 1 file changed, 216 insertions(+), 9 deletions(-) diff --git a/ss/subscription_manager.py b/ss/subscription_manager.py index 70a1c88..3df40dd 100644 --- a/ss/subscription_manager.py +++ b/ss/subscription_manager.py @@ -4,6 +4,9 @@ Handles subscription operations with persistent storage. """ from datetime import datetime +import base64 +import urllib.parse +import yaml import requests from .storage import StorageManager @@ -32,6 +35,213 @@ class SubscriptionManager: else: print("❌ Failed to save subscription") + + def _parse_ss(self, uri: str) -> dict: + """Parse ss:// URI to Clash proxy config""" + if not uri.startswith('ss://'): + return None + + # Check if there is a tag (fragment) + name = "ss" + if '#' in uri: + uri, name_part = uri.split('#', 1) + name = urllib.parse.unquote(name_part) + + content = uri[5:] + + # Check if userinfo is base64 encoded + if '@' in content: + userinfo, hostport = content.split('@', 1) + + # Try to decode userinfo first as it might be base64(method:password) + method = password = None + try: + # handle padding + padding = len(userinfo) % 4 + if padding: + u_temp = userinfo + '=' * (4 - padding) + else: + u_temp = userinfo + + decoded_userinfo = base64.urlsafe_b64decode(u_temp).decode('utf-8') + if ':' in decoded_userinfo: + method, password = decoded_userinfo.split(':', 1) + except Exception: + pass + + if not method and ':' in userinfo: + method, password = userinfo.split(':', 1) + + if not method or not password: + return None + + if ':' not in hostport: + return None + try: + server, port = hostport.split(':') + port = int(port) + except ValueError: + return None + else: + # entire body is likely base64 encoded + try: + padding = len(content) % 4 + if padding: + content += '=' * (4 - padding) + decoded = base64.urlsafe_b64decode(content).decode('utf-8') + + if '@' not in decoded: + return None + + # recursive call with decoded URI (prepend ss://) + # but usually decoded is method:password@host:port + method_pass, host_port = decoded.split('@', 1) + if ':' not in method_pass: + return None + method, password = method_pass.split(':', 1) + + if ':' not in host_port: + return None + server, port = host_port.split(':', 1) + port = int(port) + except Exception: + return None + + return { + 'name': name, + 'type': 'ss', + 'server': server, + 'port': port, + 'cipher': method, + 'password': password + } + + def _parse_trojan(self, uri: str) -> dict: + """Parse trojan:// URI to Clash proxy config""" + if not uri.startswith('trojan://'): + return None + + parsed = urllib.parse.urlparse(uri) + query = urllib.parse.parse_qs(parsed.query) + + name = urllib.parse.unquote(parsed.fragment) if parsed.fragment else 'trojan' + + config = { + 'name': name, + 'type': 'trojan', + 'server': parsed.hostname, + 'port': parsed.port, + 'password': parsed.username, + 'udp': True + } + + if 'sni' in query: + config['sni'] = query['sni'][0] + elif 'peer' in query: + config['sni'] = query['peer'][0] + + if 'allowInsecure' in query: + config['skip-cert-verify'] = query['allowInsecure'][0] == '1' + + return config + + def _parse_vless(self, uri: str) -> dict: + """Parse vless:// URI to Clash proxy config""" + if not uri.startswith('vless://'): + return None + + parsed = urllib.parse.urlparse(uri) + query = urllib.parse.parse_qs(parsed.query) + + name = urllib.parse.unquote(parsed.fragment) if parsed.fragment else 'vless' + + config = { + 'name': name, + 'type': 'vless', + 'server': parsed.hostname, + 'port': parsed.port, + 'uuid': parsed.username, + 'udp': True, + 'tls': False, + 'network': 'tcp' + } + + if 'security' in query: + sec = query['security'][0] + if sec == 'tls': + config['tls'] = True + elif sec == 'reality': + config['tls'] = True + config['reality-opts'] = {} + if 'pbk' in query: + config['reality-opts']['public-key'] = query['pbk'][0] + if 'sid' in query: + config['reality-opts']['short-id'] = query['sid'][0] + if 'sni' in query: + config['servername'] = query['sni'][0] + + if 'flow' in query: + config['flow'] = query['flow'][0] + + if 'type' in query: + config['network'] = query['type'][0] + elif 'headerType' in query: + # Sometimes type is omitted? + pass + + if 'sni' in query: + config['servername'] = query['sni'][0] + + if 'fp' in query: + config['client-fingerprint'] = query['fp'][0] + + return config + + def _convert_content(self, content: str) -> str: + """Convert subscription content to Clash YAML format if needed.""" + # Check if content is already valid YAML + try: + yaml_content = yaml.safe_load(content) + if isinstance(yaml_content, dict) and 'proxies' in yaml_content: + return content + except yaml.YAMLError: + pass + + lines = [] + # Try treating as Base64 encoded list first + if 'ss://' not in content and 'trojan://' not in content and 'vless://' not in content: + try: + decoded = base64.b64decode(content).decode('utf-8') + lines = decoded.splitlines() + except Exception: + pass + + # Fallback to reading lines directly + if not lines: + lines = content.splitlines() + + proxies = [] + for line in lines: + line = line.strip() + if not line: + continue + + proxy = None + if line.startswith('ss://'): + proxy = self._parse_ss(line) + elif line.startswith('trojan://'): + proxy = self._parse_trojan(line) + elif line.startswith('vless://'): + proxy = self._parse_vless(line) + + if proxy: + proxies.append(proxy) + + if proxies: + return yaml.dump({'proxies': proxies}, allow_unicode=True, sort_keys=False) + + return content + def refresh_subscription(self, name: str) -> None: """Refresh a subscription by downloading from URL.""" if name not in self.subscriptions_data.subscriptions: @@ -45,12 +255,6 @@ class SubscriptionManager: try: # Download the subscription content - headers = { - # 'User-Agent': self.config.default_user_agent - } - # timeout = self.config.timeout_seconds - - # response = requests.get(url, headers=headers, timeout=timeout) response = requests.get(url) response.raise_for_status() @@ -75,15 +279,18 @@ class SubscriptionManager: file_path.rename(backup_path) print(f" 🔄 Backed up existing file to: {backup_name}") + # Check if response.text is valid yaml, or use convert_sub.py to transform + content = self._convert_content(response.text) + # Save the new downloaded content with open(file_path, 'w', encoding='utf-8') as f: - f.write(response.text) + f.write(content) # Update subscription metadata subscription.last_refresh = datetime.now() - subscription.file_size = len(response.text) + subscription.file_size = len(content) subscription.status_code = response.status_code - subscription.content_hash = hash(response.text) + subscription.content_hash = hash(content) subscription.last_error = None if self.storage.save_subscriptions(self.subscriptions_data):