""" User configuration manager for scientific-surfing. Handles user preferences with import, export, and edit operations. """ import os import shutil import subprocess import sys from pathlib import Path import yaml from scientific_surfing.models import Config from scientific_surfing.storage import StorageManager class CoreConfigManager: """Manages user configuration with import, export, and edit operations.""" def __init__(self): self.storage = StorageManager() self.config_file = self.storage.config_dir / "core-config.yaml" self.default_config_path = Path(__file__).parent / "templates" / "default-core-config.yaml" def _ensure_config_exists(self) -> bool: """Ensure config.yaml exists, create from default if not.""" if not self.config_file.exists(): if self.default_config_path.exists(): self.storage.config_dir.mkdir(parents=True, exist_ok=True) shutil.copy2(self.default_config_path, self.config_file) print(f"✅ Created default config at: {self.config_file}") return True else: print("❌ Default config template not found") return False return True def load_config(self) -> dict: """Load configuration from YAML file.""" if not self.config_file.exists(): return {} try: with open(self.config_file, 'r', encoding='utf-8') as f: data = yaml.safe_load(f) if isinstance(data, dict): return data return {} except (yaml.YAMLError, IOError) as e: print(f"Warning: Failed to load config: {e}") return {} def save_config(self, config: dict) -> bool: """Save configuration to YAML file.""" try: with open(self.config_file, 'w', encoding='utf-8') as f: # Convert Pydantic model to dict for YAML serialization data = config yaml.dump(data, f, default_flow_style=False, allow_unicode=True) return True except (yaml.YAMLError, IOError, ValueError) as e: print(f"Error: Failed to save config: {e}") return False def import_config(self, source_path: str) -> bool: """Import configuration from a YAML file.""" source = Path(source_path) if not source.exists(): print(f"❌ Source file not found: {source_path}") return False try: with open(source, 'r', encoding='utf-8') as f: data = yaml.safe_load(f) if not isinstance(data, dict): print("❌ Invalid YAML format") return False # Validate with Pydantic model config = Config(**data) # Save to user config self.save_config(config) print(f"✅ Imported configuration from: {source_path}") return True except yaml.YAMLError as e: print(f"❌ Invalid YAML: {e}") return False except Exception as e: print(f"❌ Failed to import: {e}") return False def export_config(self, destination_path: str) -> bool: """Export current configuration to a YAML file.""" destination = Path(destination_path) try: config = self.load_config() # Ensure destination directory exists destination.parent.mkdir(parents=True, exist_ok=True) # Export as YAML with open(destination, 'w', encoding='utf-8') as f: data = config.dict() yaml.dump(data, f, default_flow_style=False, allow_unicode=True) print(f"✅ Exported configuration to: {destination_path}") return True except Exception as e: print(f"❌ Failed to export: {e}") return False def edit_config(self) -> bool: """Edit configuration using system default editor.""" if not self._ensure_config_exists(): return False # Get system editor editor = os.environ.get('EDITOR') or os.environ.get('VISUAL') if not editor: # Try common editors for cmd in ['code', 'subl', 'atom', 'vim', 'nano', 'notepad']: if shutil.which(cmd): editor = cmd break if not editor: print("❌ No editor found. Please set EDITOR or VISUAL environment variable") return False try: # Backup current config backup_path = self.config_file.with_suffix('.yaml.backup') if self.config_file.exists(): shutil.copy2(self.config_file, backup_path) # Open editor subprocess.run([editor, str(self.config_file)], check=True) # Validate edited config try: config = self.load_config() print("✅ Configuration edited successfully") return True except Exception as e: # Restore backup if validation fails if backup_path.exists(): shutil.copy2(backup_path, self.config_file) print(f"❌ Invalid configuration: {e}") print("🔄 Restored previous configuration") return False except subprocess.CalledProcessError: print("❌ Editor command failed") return False except Exception as e: print(f"❌ Failed to edit configuration: {e}") return False def reset_config(self) -> bool: """Reset configuration to default values.""" if self.default_config_path.exists(): shutil.copy2(self.default_config_path, self.config_file) print("✅ Configuration reset to default values") return True else: print("❌ Default config template not found") return False def show_config(self) -> None: """Display current configuration.""" config = self.load_config() print("⚙️ Current Configuration:") print(f" Auto-refresh: {config.auto_refresh}") print(f" Refresh interval: {config.refresh_interval_hours} hours") print(f" User-Agent: {config.default_user_agent}") print(f" Timeout: {config.timeout_seconds} seconds") def get_config(self) -> Config: """Get current configuration.""" return self.load_config() def update_config(self, **kwargs) -> bool: """Update specific configuration values.""" config = self.load_config() for key, value in kwargs.items(): if hasattr(config, key): setattr(config, key, value) else: print(f"⚠️ Unknown configuration key: {key}") return False return self.save_config(config) def _execute_hook(self, hook_path: Path, config_file_path: Path) -> bool: """Execute a hook script with the generated config file path.""" if not hook_path.exists(): return False try: # Determine the interpreter based on file extension and platform if hook_path.suffix.lower() == '.py': cmd = [sys.executable, str(hook_path), str(config_file_path)] elif hook_path.suffix.lower() == '.js': cmd = ['node', str(hook_path), str(config_file_path)] elif hook_path.suffix.lower() == '.nu': cmd = ['nu', str(hook_path), str(config_file_path)] else: # On Unix-like systems, execute directly if os.name != 'nt': cmd = [str(hook_path), str(config_file_path)] # Make sure the script is executable os.chmod(hook_path, 0o755) else: # On Windows, try to execute directly (batch files, etc.) cmd = [str(hook_path), str(config_file_path)] print(f"🔧 Executing hook: {hook_path.name}") result = subprocess.run( cmd, cwd=hook_path.parent, capture_output=True, text=True, timeout=30 ) if result.returncode == 0: print(f"✅ Hook executed successfully: {hook_path.name}") if result.stdout.strip(): print(f" Output: {result.stdout.strip()}") return True else: print(f"❌ Hook failed: {hook_path.name}") if result.stderr.strip(): print(f" Error: {result.stderr.strip()}") return False except subprocess.TimeoutExpired: print(f"⏰ Hook timed out: {hook_path.name}") return False except Exception as e: print(f"❌ Failed to execute hook {hook_path.name}: {e}") return False def _execute_hooks(self, config_file_path: Path) -> None: """Execute all hooks in the hooks directory after config generation.""" hooks_dir = self.storage.config_dir / "hooks" if not hooks_dir.exists(): return # Look for core_config_generated.* files hook_pattern = "core_config_generated.*" hook_files = list(hooks_dir.glob(hook_pattern)) if not hook_files: return print(f"🔧 Found {len(hook_files)} hook(s) to execute") # Sort hooks for consistent execution order hook_files.sort() for hook_file in hook_files: self._execute_hook(hook_file, config_file_path) def apply(self) -> bool: """Apply active subscription to generate final config file.""" from scientific_surfing.subscription_manager import SubscriptionManager # Load current configuration config = self.load_config() # Load subscriptions to get active subscription subscription_manager = SubscriptionManager() active_subscription = subscription_manager.subscriptions_data.get_active_subscription() if not active_subscription: print("❌ No active subscription found") return False if not active_subscription.file_path or not Path(active_subscription.file_path).exists(): print("❌ Active subscription file not found. Please refresh the subscription first.") return False try: # Load the subscription content with open(active_subscription.file_path, 'r', encoding='utf-8') as f: subscription_content = f.read() # Parse subscription YAML subscription_data = yaml.safe_load(subscription_content) if not isinstance(subscription_data, dict): subscription_data = {} # Create final config by merging subscription with user config final_config = deep_merge(subscription_data, config) external_ui = final_config.get("external-ui") if external_ui: final_config["external-ui"] = os.path.join(self.storage.config_dir, external_ui) # Define essential defaults that should be present in any Clash config essential_defaults = { 'port': 7890, 'socks-port': 7891, 'mixed-port': 7890, 'allow-lan': False, 'mode': 'rule', 'log-level': 'info', 'external-controller': '127.0.0.1:9090', 'ipv6': True, } # Add missing essential keys from subscription for key, default_value in essential_defaults.items(): if key not in final_config: final_config[key] = default_value # Ensure basic DNS configuration exists if not provided by subscription if 'dns' not in final_config: final_config['dns'] = { 'enable': True, 'listen': '0.0.0.0:53', 'enhanced-mode': 'fake-ip', 'fake-ip-range': '198.18.0.1/16', 'nameserver': [ 'https://doh.pub/dns-query', 'https://dns.alidns.com/dns-query' ], 'fallback': [ 'https://1.1.1.1/dns-query', 'https://8.8.8.8/dns-query' ] } # Generate final config file generated_path = self.storage.config_dir / "generated_config.yaml" with open(generated_path, 'w', encoding='utf-8') as f: yaml.dump(final_config, f, default_flow_style=False, allow_unicode=True) print(f"✅ Generated final configuration: {generated_path}") print(f" Active subscription: {active_subscription.name}") print(f" Source file: {active_subscription.file_path}") # Execute hooks after successful config generation self._execute_hooks(generated_path) return True except yaml.YAMLError as e: print(f"❌ Invalid YAML in subscription: {e}") return False except Exception as e: print(f"❌ Failed to apply configuration: {e}") return False def deep_merge(dict1, dict2): for k, v in dict2.items(): if k in dict1 and isinstance(dict1[k], dict) and isinstance(v, dict): dict1[k] = deep_merge(dict1[k], v) elif k in dict1 and isinstance(dict1[k], list) and isinstance(v, list): dict1[k].extend(v) # Example: extend lists. Adjust logic for other list merging needs. else: dict1[k] = v return dict1