Files
scientific-surfing/scientific_surfing/corecfg_manager.py

376 lines
14 KiB
Python

"""
User configuration manager for scientific-surfing.
Handles user preferences with import, export, and edit operations.
"""
import os
import shutil
import subprocess
import sys
from pathlib import Path
import yaml
from scientific_surfing.models import Config
from scientific_surfing.subscription_manager import SubscriptionManager
class CoreConfigManager:
"""Manages user configuration with import, export, and edit operations."""
def __init__(self, subscription_manager: SubscriptionManager):
self.subscription_manager = subscription_manager
self.storage = subscription_manager.storage
self.config_file = self.storage.config_dir / "core-config.yaml"
self.default_config_path = Path(__file__).parent / "templates" / "default-core-config.yaml"
def _ensure_config_exists(self) -> bool:
"""Ensure config.yaml exists, create from default if not."""
if not self.config_file.exists():
if self.default_config_path.exists():
self.storage.config_dir.mkdir(parents=True, exist_ok=True)
shutil.copy2(self.default_config_path, self.config_file)
print(f"✅ Created default config at: {self.config_file}")
return True
else:
print("❌ Default config template not found")
return False
return True
def load_config(self) -> dict:
"""Load configuration from YAML file."""
if not self.config_file.exists():
return {}
try:
with open(self.config_file, 'r', encoding='utf-8') as f:
data = yaml.safe_load(f)
if isinstance(data, dict):
return data
return {}
except (yaml.YAMLError, IOError) as e:
print(f"Warning: Failed to load config: {e}")
return {}
def save_config(self, config: dict) -> bool:
"""Save configuration to YAML file."""
try:
with open(self.config_file, 'w', encoding='utf-8') as f:
# Convert Pydantic model to dict for YAML serialization
data = config
yaml.dump(data, f, default_flow_style=False, allow_unicode=True)
return True
except (yaml.YAMLError, IOError, ValueError) as e:
print(f"Error: Failed to save config: {e}")
return False
def import_config(self, source_path: str) -> bool:
"""Import configuration from a YAML file."""
source = Path(source_path)
if not source.exists():
print(f"❌ Source file not found: {source_path}")
return False
try:
with open(source, 'r', encoding='utf-8') as f:
data = yaml.safe_load(f)
if not isinstance(data, dict):
print("❌ Invalid YAML format")
return False
# Validate with Pydantic model
config = Config(**data)
# Save to user config
self.save_config(config)
print(f"✅ Imported configuration from: {source_path}")
return True
except yaml.YAMLError as e:
print(f"❌ Invalid YAML: {e}")
return False
except Exception as e:
print(f"❌ Failed to import: {e}")
return False
def export_config(self, destination_path: str) -> bool:
"""Export current configuration to a YAML file."""
destination = Path(destination_path)
try:
config = self.load_config()
# Ensure destination directory exists
destination.parent.mkdir(parents=True, exist_ok=True)
# Export as YAML
with open(destination, 'w', encoding='utf-8') as f:
data = config.dict()
yaml.dump(data, f, default_flow_style=False, allow_unicode=True)
print(f"✅ Exported configuration to: {destination_path}")
return True
except Exception as e:
print(f"❌ Failed to export: {e}")
return False
def edit_config(self) -> bool:
"""Edit configuration using system default editor."""
if not self._ensure_config_exists():
return False
# Get system editor
editor = os.environ.get('EDITOR') or os.environ.get('VISUAL')
if not editor:
# Try common editors
for cmd in ['code', 'subl', 'atom', 'vim', 'nano', 'notepad']:
if shutil.which(cmd):
editor = cmd
break
if not editor:
print("❌ No editor found. Please set EDITOR or VISUAL environment variable")
return False
try:
# Backup current config
backup_path = self.config_file.with_suffix('.yaml.backup')
if self.config_file.exists():
shutil.copy2(self.config_file, backup_path)
# Open editor
subprocess.run([editor, str(self.config_file)], check=True)
# Validate edited config
try:
config = self.load_config()
print("✅ Configuration edited successfully")
return True
except Exception as e:
# Restore backup if validation fails
if backup_path.exists():
shutil.copy2(backup_path, self.config_file)
print(f"❌ Invalid configuration: {e}")
print("🔄 Restored previous configuration")
return False
except subprocess.CalledProcessError:
print("❌ Editor command failed")
return False
except Exception as e:
print(f"❌ Failed to edit configuration: {e}")
return False
def reset_config(self) -> bool:
"""Reset configuration to default values."""
if self.default_config_path.exists():
shutil.copy2(self.default_config_path, self.config_file)
print("✅ Configuration reset to default values")
return True
else:
print("❌ Default config template not found")
return False
def show_config(self) -> None:
"""Display current configuration."""
config = self.load_config()
print("⚙️ Current Configuration:")
print(f" Auto-refresh: {config.auto_refresh}")
print(f" Refresh interval: {config.refresh_interval_hours} hours")
print(f" User-Agent: {config.default_user_agent}")
print(f" Timeout: {config.timeout_seconds} seconds")
def get_config(self) -> Config:
"""Get current configuration."""
return self.load_config()
def update_config(self, **kwargs) -> bool:
"""Update specific configuration values."""
config = self.load_config()
for key, value in kwargs.items():
if hasattr(config, key):
setattr(config, key, value)
else:
print(f"⚠️ Unknown configuration key: {key}")
return False
return self.save_config(config)
def _execute_hook(self, hook_path: Path, config_file_path: Path) -> bool:
"""Execute a hook script with the generated config file path."""
if not hook_path.exists():
return False
try:
# Determine the interpreter based on file extension and platform
if hook_path.suffix.lower() == '.py':
cmd = [sys.executable, str(hook_path), str(config_file_path)]
elif hook_path.suffix.lower() == '.js':
cmd = ['node', str(hook_path), str(config_file_path)]
elif hook_path.suffix.lower() == '.nu':
cmd = ['nu', str(hook_path), str(config_file_path)]
else:
# On Unix-like systems, execute directly
if os.name != 'nt':
cmd = [str(hook_path), str(config_file_path)]
# Make sure the script is executable
os.chmod(hook_path, 0o755)
else:
# On Windows, try to execute directly (batch files, etc.)
cmd = [str(hook_path), str(config_file_path)]
print(f"🔧 Executing hook: {hook_path.name}")
env = os.environ.copy()
env['PYTHONIOENCODING'] = 'utf-8'
result = subprocess.run(
cmd,
cwd=hook_path.parent,
capture_output=True,
text=True,
timeout=30,
encoding="utf-8",
shell=True,
env=env,
)
if result.returncode == 0:
print(f"✅ Hook executed successfully: {hook_path.name}")
if result.stdout.strip():
print(f" Output: {result.stdout.strip()}")
return True
else:
print(f"❌ Hook failed: {hook_path.name}")
if result.stderr.strip():
print(f" Error: {result.stderr.strip()}")
return False
except subprocess.TimeoutExpired:
print(f"⏰ Hook timed out: {hook_path.name}")
return False
except Exception as e:
print(f"❌ Failed to execute hook {hook_path.name}: {e}")
return False
def _execute_hooks(self, config_file_path: Path) -> None:
"""Execute all hooks in the hooks directory after config generation."""
hooks_dir = self.storage.config_dir / "hooks"
if not hooks_dir.exists():
return
# Look for core_config_generated.* files
hook_pattern = "core_config_generated.*"
hook_files = list(hooks_dir.glob(hook_pattern))
if not hook_files:
return
print(f"🔧 Found {len(hook_files)} hook(s) to execute")
# Sort hooks for consistent execution order
hook_files.sort()
for hook_file in hook_files:
self._execute_hook(hook_file, config_file_path)
def apply(self) -> bool:
"""Apply active subscription to generate final config file."""
from scientific_surfing.subscription_manager import SubscriptionManager
# Load current configuration
config = self.load_config()
# Load subscriptions to get active subscription
active_subscription = self.subscription_manager.subscriptions_data.get_active_subscription()
if not active_subscription:
print("❌ No active subscription found")
return False
file_path = active_subscription.get_file_path(self.storage.config_dir)
if not file_path or not Path(file_path).exists():
print("❌ Active subscription file not found. Please refresh the subscription first.")
return False
try:
# Load the subscription content
with open(file_path, 'r', encoding='utf-8') as f:
subscription_content = f.read()
# Parse subscription YAML
subscription_data = yaml.safe_load(subscription_content)
if not isinstance(subscription_data, dict):
subscription_data = {}
# Create final config by merging subscription with user config
final_config = deep_merge(subscription_data, config)
external_ui = final_config.get("external-ui")
if external_ui:
final_config["external-ui"] = os.path.join(self.storage.config_dir, external_ui)
# Define essential defaults that should be present in any Clash config
essential_defaults = {
'port': 7890,
'socks-port': 7891,
'mixed-port': 7890,
'allow-lan': False,
'mode': 'rule',
'log-level': 'info',
'external-controller': '127.0.0.1:9090',
'ipv6': True,
}
# Add missing essential keys from subscription
for key, default_value in essential_defaults.items():
if key not in final_config:
final_config[key] = default_value
# Ensure basic DNS configuration exists if not provided by subscription
if 'dns' not in final_config:
final_config['dns'] = {
'enable': True,
'listen': '0.0.0.0:53',
'enhanced-mode': 'fake-ip',
'fake-ip-range': '198.18.0.1/16',
'nameserver': [
'https://doh.pub/dns-query',
'https://dns.alidns.com/dns-query'
],
'fallback': [
'https://1.1.1.1/dns-query',
'https://8.8.8.8/dns-query'
]
}
# Generate final config file
generated_path = self.storage.config_dir / "generated_config.yaml"
with open(generated_path, 'w', encoding='utf-8') as f:
yaml.dump(final_config, f, default_flow_style=False, allow_unicode=True)
print(f"✅ Generated final configuration: {generated_path}")
print(f" Active subscription: {active_subscription.name}")
# Execute hooks after successful config generation
self._execute_hooks(generated_path)
return True
except yaml.YAMLError as e:
print(f"❌ Invalid YAML in subscription: {e}")
return False
except Exception as e:
print(f"❌ Failed to apply configuration: {e}")
return False
def deep_merge(dict1, dict2):
for k, v in dict2.items():
if k in dict1 and isinstance(dict1[k], dict) and isinstance(v, dict):
dict1[k] = deep_merge(dict1[k], v)
elif k in dict1 and isinstance(dict1[k], list) and isinstance(v, list):
dict1[k].extend(v) # Example: extend lists. Adjust logic for other list merging needs.
else:
dict1[k] = v
return dict1