feat: support v2ray subscription format

This commit is contained in:
2026-03-05 23:06:45 +08:00
parent db13c68161
commit 8993b8fb88

View File

@ -4,6 +4,9 @@ Handles subscription operations with persistent storage.
"""
from datetime import datetime
import base64
import urllib.parse
import yaml
import requests
from .storage import StorageManager
@ -32,6 +35,213 @@ class SubscriptionManager:
else:
print("❌ Failed to save subscription")
def _parse_ss(self, uri: str) -> dict:
"""Parse ss:// URI to Clash proxy config"""
if not uri.startswith('ss://'):
return None
# Check if there is a tag (fragment)
name = "ss"
if '#' in uri:
uri, name_part = uri.split('#', 1)
name = urllib.parse.unquote(name_part)
content = uri[5:]
# Check if userinfo is base64 encoded
if '@' in content:
userinfo, hostport = content.split('@', 1)
# Try to decode userinfo first as it might be base64(method:password)
method = password = None
try:
# handle padding
padding = len(userinfo) % 4
if padding:
u_temp = userinfo + '=' * (4 - padding)
else:
u_temp = userinfo
decoded_userinfo = base64.urlsafe_b64decode(u_temp).decode('utf-8')
if ':' in decoded_userinfo:
method, password = decoded_userinfo.split(':', 1)
except Exception:
pass
if not method and ':' in userinfo:
method, password = userinfo.split(':', 1)
if not method or not password:
return None
if ':' not in hostport:
return None
try:
server, port = hostport.split(':')
port = int(port)
except ValueError:
return None
else:
# entire body is likely base64 encoded
try:
padding = len(content) % 4
if padding:
content += '=' * (4 - padding)
decoded = base64.urlsafe_b64decode(content).decode('utf-8')
if '@' not in decoded:
return None
# recursive call with decoded URI (prepend ss://)
# but usually decoded is method:password@host:port
method_pass, host_port = decoded.split('@', 1)
if ':' not in method_pass:
return None
method, password = method_pass.split(':', 1)
if ':' not in host_port:
return None
server, port = host_port.split(':', 1)
port = int(port)
except Exception:
return None
return {
'name': name,
'type': 'ss',
'server': server,
'port': port,
'cipher': method,
'password': password
}
def _parse_trojan(self, uri: str) -> dict:
"""Parse trojan:// URI to Clash proxy config"""
if not uri.startswith('trojan://'):
return None
parsed = urllib.parse.urlparse(uri)
query = urllib.parse.parse_qs(parsed.query)
name = urllib.parse.unquote(parsed.fragment) if parsed.fragment else 'trojan'
config = {
'name': name,
'type': 'trojan',
'server': parsed.hostname,
'port': parsed.port,
'password': parsed.username,
'udp': True
}
if 'sni' in query:
config['sni'] = query['sni'][0]
elif 'peer' in query:
config['sni'] = query['peer'][0]
if 'allowInsecure' in query:
config['skip-cert-verify'] = query['allowInsecure'][0] == '1'
return config
def _parse_vless(self, uri: str) -> dict:
"""Parse vless:// URI to Clash proxy config"""
if not uri.startswith('vless://'):
return None
parsed = urllib.parse.urlparse(uri)
query = urllib.parse.parse_qs(parsed.query)
name = urllib.parse.unquote(parsed.fragment) if parsed.fragment else 'vless'
config = {
'name': name,
'type': 'vless',
'server': parsed.hostname,
'port': parsed.port,
'uuid': parsed.username,
'udp': True,
'tls': False,
'network': 'tcp'
}
if 'security' in query:
sec = query['security'][0]
if sec == 'tls':
config['tls'] = True
elif sec == 'reality':
config['tls'] = True
config['reality-opts'] = {}
if 'pbk' in query:
config['reality-opts']['public-key'] = query['pbk'][0]
if 'sid' in query:
config['reality-opts']['short-id'] = query['sid'][0]
if 'sni' in query:
config['servername'] = query['sni'][0]
if 'flow' in query:
config['flow'] = query['flow'][0]
if 'type' in query:
config['network'] = query['type'][0]
elif 'headerType' in query:
# Sometimes type is omitted?
pass
if 'sni' in query:
config['servername'] = query['sni'][0]
if 'fp' in query:
config['client-fingerprint'] = query['fp'][0]
return config
def _convert_content(self, content: str) -> str:
"""Convert subscription content to Clash YAML format if needed."""
# Check if content is already valid YAML
try:
yaml_content = yaml.safe_load(content)
if isinstance(yaml_content, dict) and 'proxies' in yaml_content:
return content
except yaml.YAMLError:
pass
lines = []
# Try treating as Base64 encoded list first
if 'ss://' not in content and 'trojan://' not in content and 'vless://' not in content:
try:
decoded = base64.b64decode(content).decode('utf-8')
lines = decoded.splitlines()
except Exception:
pass
# Fallback to reading lines directly
if not lines:
lines = content.splitlines()
proxies = []
for line in lines:
line = line.strip()
if not line:
continue
proxy = None
if line.startswith('ss://'):
proxy = self._parse_ss(line)
elif line.startswith('trojan://'):
proxy = self._parse_trojan(line)
elif line.startswith('vless://'):
proxy = self._parse_vless(line)
if proxy:
proxies.append(proxy)
if proxies:
return yaml.dump({'proxies': proxies}, allow_unicode=True, sort_keys=False)
return content
def refresh_subscription(self, name: str) -> None:
"""Refresh a subscription by downloading from URL."""
if name not in self.subscriptions_data.subscriptions:
@ -45,12 +255,6 @@ class SubscriptionManager:
try:
# Download the subscription content
headers = {
# 'User-Agent': self.config.default_user_agent
}
# timeout = self.config.timeout_seconds
# response = requests.get(url, headers=headers, timeout=timeout)
response = requests.get(url)
response.raise_for_status()
@ -75,15 +279,18 @@ class SubscriptionManager:
file_path.rename(backup_path)
print(f" 🔄 Backed up existing file to: {backup_name}")
# Check if response.text is valid yaml, or use convert_sub.py to transform
content = self._convert_content(response.text)
# Save the new downloaded content
with open(file_path, 'w', encoding='utf-8') as f:
f.write(response.text)
f.write(content)
# Update subscription metadata
subscription.last_refresh = datetime.now()
subscription.file_size = len(response.text)
subscription.file_size = len(content)
subscription.status_code = response.status_code
subscription.content_hash = hash(response.text)
subscription.content_hash = hash(content)
subscription.last_error = None
if self.storage.save_subscriptions(self.subscriptions_data):