From db1c6969a306d9a35f89642c733d0aa66a363fea Mon Sep 17 00:00:00 2001 From: casper Date: Mon, 8 Jun 2026 06:16:54 +0000 Subject: [PATCH] v1 --- addons.py | 1329 +++++++++++++++++++++++++++++++++++++++++++++++++ blocklists.py | 227 +++++++++ browser.py | 983 ++++++++++++++++++++++++++++++++++++ cdp.py | 714 ++++++++++++++++++++++++++ config.py | 536 ++++++++++++++++++++ 5 files changed, 3789 insertions(+) create mode 100644 addons.py create mode 100644 blocklists.py create mode 100644 browser.py create mode 100644 cdp.py create mode 100644 config.py diff --git a/addons.py b/addons.py new file mode 100644 index 0000000..c6e688a --- /dev/null +++ b/addons.py @@ -0,0 +1,1329 @@ + +from __future__ import annotations + +import json +import logging +import re +from pathlib import Path + +from PySide6.QtWebEngineCore import ( + QWebEngineProfile, + QWebEngineScript, +) + + +from config import ( + PLUGIN_BLOCKED_PATTERNS, + PLUGIN_CRASH_LIMIT, + PLUGIN_MAX_SIZE, + PLUGIN_PERMISSIONS, + profile_extension_dir, + profile_plugin_dir, + profile_theme_dir, + profile_vencord_dir, +) +log = logging.getLogger(__name__) + +SCRIPT_WORLD = ( + QWebEngineScript.ScriptWorldId.ApplicationWorld +) + +PLUGIN_FILE_PATTERN = ( + "*.plugin.js" +) + + +def _js_string( + value: str, +) -> str: + return json.dumps( + value, + ensure_ascii=False, + ) + + +class ThemeManager: + def __init__( + self, + profile_name: str, + ): + self.profile_name = profile_name + self.directory = profile_theme_dir( + profile_name + ) + + def discover( + self, + ) -> list[Path]: + return sorted( + self.directory.glob( + "*.css" + ) + ) + + def load_scripts( + self, + ) -> list[QWebEngineScript]: + scripts = [] + + for css_file in self.discover(): + try: + css = css_file.read_text( + encoding="utf-8" + ) + + script = ( + QWebEngineScript() + ) + + script.setName( + f"theme:{css_file.stem}" + ) + + script.setRunsOnSubFrames( + True + ) + + script.setInjectionPoint( + QWebEngineScript.InjectionPoint.DocumentReady + ) + + script.setWorldId( + SCRIPT_WORLD + ) + + script.setSourceCode( + f""" +(() => {{ + const id = "theme-{css_file.stem}"; + if (document.getElementById(id)) return; + + const style = document.createElement("style"); + style.id = id; + style.textContent = {_js_string(css)}; + + const install = () => {{ + if (!document.head) {{ + requestAnimationFrame(install); + return; + }} + + document.head.appendChild(style); + }}; + + install(); +}})(); +""" + ) + + scripts.append( + script + ) + + except Exception: + log.exception( + "theme load failed: %s", + css_file, + ) + + return scripts + + +class ExtensionManager: + def __init__( + self, + profile_name: str, + ): + self.profile_name = profile_name + self.directory = ( + profile_extension_dir( + profile_name + ) + ) + + def discover( + self, + ) -> list[Path]: + if not self.directory.exists(): + return [] + + return sorted( + path + for path in self.directory.iterdir() + if path.is_dir() + ) + + def load_scripts( + self, + ) -> list[QWebEngineScript]: + scripts = [] + + for extension in self.discover(): + manifest_file = ( + extension + / "manifest.json" + ) + + script_file = ( + extension + / "script.js" + ) + + if ( + not manifest_file.exists() + or not script_file.exists() + ): + continue + + try: + manifest = json.loads( + manifest_file.read_text( + encoding="utf-8" + ) + ) + + if not manifest.get( + "enabled", + True, + ): + continue + + source = ( + script_file.read_text( + encoding="utf-8" + ) + ) + + script = ( + QWebEngineScript() + ) + + script.setName( + manifest.get( + "name", + extension.name, + ) + ) + + script.setRunsOnSubFrames( + True + ) + + script.setInjectionPoint( + QWebEngineScript.InjectionPoint.DocumentCreation + ) + + script.setWorldId( + SCRIPT_WORLD + ) + + script.setSourceCode( + source + ) + + scripts.append( + script + ) + + except Exception: + log.exception( + "extension load failed: %s", + extension, + ) + + return scripts + + + +class PluginManager: + def __init__( + self, + profile_name: str, + db=None, + ): + self.profile_name = profile_name + self.db = db + self.directory = profile_plugin_dir( + profile_name + ) + + self.directory.mkdir( + parents=True, + exist_ok=True, + ) + self.quarantine_dir = ( + profile_vencord_dir( + profile_name + ) + / "quarantine" + ) + + self.quarantine_dir.mkdir( + parents=True, + exist_ok=True, + ) + def discover( + self, + ) -> list[Path]: + plugins = [] + + for plugin in sorted( + self.directory.glob( + PLUGIN_FILE_PATTERN + ) + ): + try: + meta = self.metadata( + plugin + ) + + if ( + self.db + and not self.db.plugin_enabled( + self.profile_name, + meta["name"], + True, + ) + ): + continue + + plugins.append( + plugin + ) + + except Exception: + log.exception( + "plugin discovery failed: %s", + plugin, + ) + + return plugins + + + def metadata( + self, + plugin_file: Path, + ) -> dict: + source = plugin_file.read_text( + encoding="utf-8", + errors="ignore", + ) + permissions = set() + + for match in re.findall( + r"@permissions\s+(.+)", + source, + re.IGNORECASE, + ): + permissions.update( + token.strip().lower() + for token in match.split() + if token.strip() + ) + + permissions &= PLUGIN_PERMISSIONS + + metadata["permissions"] = sorted( + permissions + ) + name = plugin_file.name + + if name.endswith( + ".plugin.js" + ): + name = name[:-10] + + metadata = { + "name": name, + "version": "unknown", + "author": "", + "description": "", + "enabled": True, + "path": str( + plugin_file + ), + "size": plugin_file.stat().st_size, + } + + patterns = { + "name": r"@name\s+(.+)", + "version": r"@version\s+(.+)", + "author": r"@author\s+(.+)", + "description": r"@description\s+(.+)", + } + + for key, pattern in ( + patterns.items() + ): + match = re.search( + pattern, + source, + re.IGNORECASE, + ) + + if match: + metadata[key] = ( + match.group(1) + .strip() + ) + + if self.db: + metadata[ + "enabled" + ] = self.db.plugin_enabled( + self.profile_name, + metadata["name"], + True, + ) + + return metadata + + def validate_file( + self, + source: Path, + ) -> tuple[ + bool, + str, + ]: + filename = ( + source.name.lower() + ) + + if not filename.endswith( + ".plugin.js" + ): + return ( + False, + "invalid extension", + ) + + if ( + source.stat().st_size + > PLUGIN_MAX_SIZE + ): + return ( + False, + "plugin exceeds size limit", + ) + + content = ( + source.read_text( + encoding="utf-8", + errors="ignore", + ) + ) + + lowered = ( + content.lower() + ) + + for pattern in ( + PLUGIN_BLOCKED_PATTERNS + ): + if ( + pattern.lower() + in lowered + ): + return ( + False, + f"blocked pattern: {pattern}", + ) + + return ( + True, + "", + ) + + def bootstrap_script( + self, + ) -> QWebEngineScript: + script = ( + QWebEngineScript() + ) + + script.setName( + "plugin-runtime" + ) + + script.setRunsOnSubFrames( + True + ) + + script.setInjectionPoint( + QWebEngineScript.InjectionPoint.DocumentCreation + ) + + script.setWorldId( + SCRIPT_WORLD + ) + + script.setSourceCode( + """ + +(() => { + + if (window.PluginRuntime) + return; + + window.__bridgeReady = + new Promise(resolve => { + + const complete = () => { + + if ( + window.bridge + ) { + resolve( + window.bridge + ); + + return; + } + + setTimeout( + complete, + 25, + ); + }; + + if ( + typeof qt !== "undefined" + && typeof QWebChannel !== "undefined" + ) { + new QWebChannel( + qt.webChannelTransport, + channel => { + + window.bridge = + channel.objects.bridge; + + resolve( + window.bridge + ); + } + ); + + return; + } + + complete(); + }); + + window.PluginRuntime = { + plugins:{}, + failed:{}, + started:{}, + hasPermission( + plugin, + permission + ){ + return !!( + this.permissions[ + plugin + ]?.[permission] + ); + }, + async bridge(){ + return await ( + window.__bridgeReady + ); + }, + + log(name,msg){ + console.log( + `[Plugin:${name}]`, + msg + ); + }, + + async fail( + name, + error + ){ + console.error( + `[Plugin:${name}]`, + error + ); + + this.failed[name] = + String(error); + + try{ + const bridge = + await this.bridge(); + + bridge.pluginCrash( + name, + String(error) + ); + } + catch{} + } + }; + + window.BdApi = { + + Data:{ + + async load( + plugin, + key + ){ + try{ + const bridge = + await PluginRuntime.bridge(); + + const raw = + bridge.pluginLoad( + plugin, + key + ); + + return JSON.parse( + raw + ); + } + catch{ + return null; + } + }, + + async save( + plugin, + key, + value + ){ + try{ + const bridge = + await PluginRuntime.bridge(); + + bridge.pluginSave( + plugin, + key, + JSON.stringify( + value + ) + ); + } + catch{} + } + }, + + Logger:{ + + async log( + ...a + ){ + try{ + const bridge = + await PluginRuntime.bridge(); + + bridge.pluginLog( + "runtime", + "info", + a.join(" ") + ); + } + catch{} + + console.log( + ...a + ); + }, + + async warn( + ...a + ){ + try{ + const bridge = + await PluginRuntime.bridge(); + + bridge.pluginLog( + "runtime", + "warning", + a.join(" ") + ); + } + catch{} + + console.warn( + ...a + ); + }, + + async error( + ...a + ){ + try{ + const bridge = + await PluginRuntime.bridge(); + + bridge.pluginLog( + "runtime", + "error", + a.join(" ") + ); + } + catch{} + + console.error( + ...a + ); + } + }, + + UI:{ + showToast( + message + ){ + console.log( + "[Toast]", + message + ); + } + }, + + DOM:{ + addStyle( + id, + css + ){ + let style = + document.getElementById( + id + ); + + if(!style){ + style = + document.createElement( + "style" + ); + + style.id = + id; + + document.head.appendChild( + style + ); + } + + style.textContent = + css; + }, + + removeStyle( + id + ){ + document + .getElementById( + id + ) + ?.remove(); + } + } + }; + + window.ZeresPluginLibrary = { + Logger: + BdApi.Logger, + + Utilities:{}, + + DOMTools: + BdApi.DOM, + + PluginUpdater:{ + checkForUpdate(){} + }, + + Patcher:{ + before(){}, + after(){}, + instead(){}, + unpatchAll(){} + } + }; +})(); +""" + ) + + return script + + def plugin_script( + self, + plugin_file: Path, + ) -> QWebEngineScript: + meta = self.metadata( + plugin_file + ) + + source = ( + plugin_file.read_text( + encoding="utf-8", + errors="ignore", + ) + ) + + wrapped = f""" +(() => {{ + const module = {{ + exports:null + }}; + + const exports = + module.exports; + + try {{ + {source} + + const PluginClass = + module.exports; + + if( + typeof PluginClass + !== "function" + ) + return; + + const plugin = + new PluginClass(); + plugin.__permissions = + Object.freeze( + %PERMISSIONS% + ); + + PluginRuntime.plugins[ + {_js_string(meta["name"])} + ] = plugin; + + try {{ + if( + typeof plugin.start + === "function" + ) + plugin.start(); + + else if( + typeof plugin.onStart + === "function" + ) + plugin.onStart(); + + PluginRuntime.started[ + {_js_string(meta["name"])} + ] = true; + }} + catch(error) {{ + PluginRuntime.fail( + {_js_string(meta["name"])}, + error + ); + }} + }} + catch(error) {{ + PluginRuntime.fail( + {_js_string(meta["name"])}, + error + ); + }} +}})(); +""" + + script = ( + QWebEngineScript() + ) + + script.setName( + f"plugin:{meta['name']}" + ) + + script.setRunsOnSubFrames( + True + ) + + script.setInjectionPoint( + QWebEngineScript.InjectionPoint.DocumentCreation + ) + + script.setWorldId( + SCRIPT_WORLD + ) + + script.setSourceCode( + wrapped + ) + + return script + + def load_scripts( + self, + ) -> list[QWebEngineScript]: + scripts = [ + self.bootstrap_script() + ] + + for plugin in ( + self.discover() + ): + try: + scripts.append( + self.plugin_script( + plugin + ) + ) + + except Exception: + log.exception( + "plugin load failed: %s", + plugin, + ) + + if self.db: + self.db.plugin_crash( + self.profile_name, + plugin.stem, + "load failure", + ) + + return scripts + + def install_file( + self, + source: Path, + ) -> Path: + valid, error = ( + self.validate_file( + plugin + ) + ) + + if not valid: + self.quarantine_plugin( + plugin + ) + continue + if self.is_quarantined( + meta["name"] + ): + continue + filename = re.sub( + r"[^a-zA-Z0-9._-]", + "_", + source.name, + ) + + destination = ( + self.directory + / filename + ) + + destination.write_text( + source.read_text( + encoding="utf-8", + errors="ignore", + ), + encoding="utf-8", + ) + + if ( + self.db + and self.db.plugin_crash_count( + self.profile_name, + meta["name"], + ) + >= PLUGIN_CRASH_LIMIT + ): + self.quarantine_plugin( + plugin + ) + continue + + return destination + + + def uninstall( + self, + plugin_name: str, + ) -> bool: + for plugin in ( + self.directory.glob( + PLUGIN_FILE_PATTERN + ) + ): + meta = self.metadata( + plugin + ) + + if ( + meta["name"] + != plugin_name + ): + continue + + plugin.unlink( + missing_ok=True + ) + + if self.db: + self.db.delete_plugin( + self.profile_name, + plugin_name, + ) + + return True + + return False + + def enable( + self, + plugin_name: str, + ) -> None: + if self.db: + self.db.set_plugin_enabled( + self.profile_name, + plugin_name, + True, + ) + + def disable( + self, + plugin_name: str, + ) -> None: + if self.db: + self.db.set_plugin_enabled( + self.profile_name, + plugin_name, + False, + ) + + def inventory( + self, + ) -> list[dict]: + rows = [] + + for plugin in sorted( + self.directory.glob( + PLUGIN_FILE_PATTERN + ) + ): + try: + rows.append( + self.metadata( + plugin + ) + ) + + except Exception: + log.exception( + "inventory failed: %s", + plugin, + ) + + return rows + + def diagnostics( + self, + ) -> dict: + plugins = ( + self.inventory() + ) + + return { + "profile": self.profile_name, + "plugin_count": len( + plugins + ), + "enabled_count": len( + [ + p + for p in plugins + if p.get( + "enabled" + ) + ] + ), + "disabled_count": len( + [ + p + for p in plugins + if not p.get( + "enabled" + ) + ] + ), + "plugins": plugins, + } + def quarantine_plugin( + self, + plugin_file: Path, + ) -> None: + + try: + target = ( + self.quarantine_dir + / plugin_file.name + ) + + plugin_file.replace( + target + ) + + meta = self.metadata( + target + ) + + if self.db: + self.db.set_plugin_enabled( + self.profile_name, + meta["name"], + False, + ) + + self.db.set_plugin_quarantined( + self.profile_name, + meta["name"], + True, + ) + + except Exception: + log.exception( + "plugin quarantine failed" + ) + + + def is_quarantined( + self, + plugin_name: str, + ) -> bool: + + if not self.db: + return False + + return self.db.plugin_quarantined( + self.profile_name, + plugin_name, + ) + # v8 + +class AddonManager: + def __init__( + self, + profile_name: str, + db=None, + ): + self.profile_name = profile_name + self.db = db + + self.themes = ThemeManager( + profile_name + ) + + self.extensions = ( + ExtensionManager( + profile_name + ) + ) + + self.plugins = ( + PluginManager( + profile_name, + db, + ) + ) + + def install( + self, + profile: QWebEngineProfile, + ) -> None: + collection = ( + profile.scripts() + ) + + existing = { + collection.script( + i + ).name() + for i in range( + collection.count() + ) + } + + for script in ( + self.themes.load_scripts() + ): + if ( + script.name() + not in existing + ): + collection.insert( + script + ) + + for script in ( + self.extensions.load_scripts() + ): + if ( + script.name() + not in existing + ): + collection.insert( + script + ) + + for script in ( + self.plugins.load_scripts() + ): + if ( + script.name() + not in existing + ): + collection.insert( + script + ) + + def inventory( + self, + ) -> dict: + themes = [] + + extensions = [] + + plugins = ( + self.plugins.inventory() + ) + + for theme in ( + self.themes.discover() + ): + themes.append( + { + "name": theme.stem, + "type": "theme", + "enabled": True, + "path": str( + theme + ), + } + ) + + for extension in ( + self.extensions.discover() + ): + manifest = ( + extension + / "manifest.json" + ) + + enabled = True + + try: + if ( + manifest.exists() + ): + enabled = bool( + json.loads( + manifest.read_text( + encoding="utf-8" + ) + ).get( + "enabled", + True, + ) + ) + except Exception: + enabled = False + + extensions.append( + { + "name": extension.name, + "type": "extension", + "enabled": enabled, + "path": str( + extension + ), + } + ) + + return { + "themes": themes, + "extensions": extensions, + "plugins": plugins, + } + + def diagnostics( + self, + ) -> dict: + plugin_diag = ( + self.plugins.diagnostics() + ) + + return { + "profile": self.profile_name, + "themes": len( + self.themes.discover() + ), + "extensions": len( + self.extensions.discover() + ), + "plugins": plugin_diag, + } + + def reload( + self, + profile: QWebEngineProfile, + ) -> None: + profile.scripts().clear() + self.install( + profile + ) + + def install_plugin( + self, + source: Path, + ) -> Path: + return ( + self.plugins.install_file( + source + ) + ) + + def uninstall_plugin( + self, + plugin_name: str, + ) -> bool: + return ( + self.plugins.uninstall( + plugin_name + ) + ) + + def enable_plugin( + self, + plugin_name: str, + ) -> None: + self.plugins.enable( + plugin_name + ) + + def disable_plugin( + self, + plugin_name: str, + ) -> None: + self.plugins.disable( + plugin_name + ) + + def plugin_inventory( + self, + ) -> list[dict]: + return ( + self.plugins.inventory() + ) + + def plugin_diagnostics( + self, + ) -> dict: + return ( + self.plugins.diagnostics() + ) + diff --git a/blocklists.py b/blocklists.py new file mode 100644 index 0000000..d390ae6 --- /dev/null +++ b/blocklists.py @@ -0,0 +1,227 @@ +# v2 +from __future__ import annotations + +import pickle +from pathlib import Path + + +def load_domain_list( + path: Path, +) -> set[str]: + + domains: set[str] = set() + + if not path.exists(): + return domains + + for raw in path.read_text( + encoding="utf-8", + errors="ignore", + ).splitlines(): + + line = raw.strip() + + if ( + not line + or line.startswith("#") + or line.startswith("!") + or line.startswith("//") + ): + continue + + if line.startswith("||"): + line = line[2:] + + if line.startswith( + "0.0.0.0 " + ): + line = line.split( + None, + 1, + )[1] + + elif line.startswith( + "127.0.0.1 " + ): + line = line.split( + None, + 1, + )[1] + + line = ( + line.replace("^", "") + .replace("/", "") + .strip() + .lower() + ) + + if ( + "." not in line + or " " in line + ): + continue + + domains.add( + line + ) + + return domains + + +def save_cache( + cache_file: Path, + domains: set[str], +) -> None: + + cache_file.parent.mkdir( + parents=True, + exist_ok=True, + ) + + with cache_file.open( + "wb" + ) as fh: + pickle.dump( + domains, + fh, + protocol=pickle.HIGHEST_PROTOCOL, + ) + + +def load_cache( + cache_file: Path, +) -> set[str]: + + if not cache_file.exists(): + return set() + + try: + with cache_file.open( + "rb" + ) as fh: + data = pickle.load( + fh + ) + + if isinstance( + data, + set, + ): + return { + str(domain) + .lower() + .strip() + for domain in data + if domain + } + + except Exception: + pass + + return set() + + +class BlocklistEngine: + + def __init__( + self, + blocklists: list[Path], + cache_file: Path, + ): + self.blocklists = ( + blocklists + ) + + self.cache_file = ( + cache_file + ) + + self.domains: set[str] = ( + set() + ) + + def load( + self, + rebuild: bool = False, + ) -> None: + + if ( + not rebuild + and self.cache_file.exists() + ): + cached = load_cache( + self.cache_file + ) + + if cached: + self.domains = cached + return + + domains: set[str] = set() + + for path in ( + self.blocklists + ): + domains.update( + load_domain_list( + path + ) + ) + + self.domains = domains + + save_cache( + self.cache_file, + domains, + ) + + def contains( + self, + host: str, + ) -> bool: + + host = ( + host.lower() + .strip() + ) + + if not host: + return False + + parts = host.split( + "." + ) + + for index in range( + len(parts) + ): + candidate = ".".join( + parts[index:] + ) + + if ( + candidate + in self.domains + ): + return True + + return False + + def count( + self, + ) -> int: + return len( + self.domains + ) + + def reload( + self, + ) -> None: + self.load( + rebuild=True + ) + + def empty( + self, + ) -> bool: + return not self.domains \ No newline at end of file diff --git a/browser.py b/browser.py new file mode 100644 index 0000000..792a3d4 --- /dev/null +++ b/browser.py @@ -0,0 +1,983 @@ +from __future__ import annotations + +import json +import logging +from pathlib import Path +from typing import Any + +from PySide6.QtCore import QObject, QUrl, Signal, Slot +from PySide6.QtGui import QDesktopServices +from PySide6.QtWebChannel import QWebChannel +from PySide6.QtWebEngineCore import ( + QWebEngineDownloadRequest, + QWebEnginePage, + QWebEngineProfile, + QWebEngineSettings, + QWebEngineUrlRequestInterceptor, +) +from PySide6.QtWebEngineWidgets import QWebEngineView +from PySide6.QtWidgets import QMessageBox + +from addons import AddonManager +from cdp import CDPClient +from config import ( + DEFAULT_SETTINGS, + FEATURE_PERMISSION_MAP, + START_URL, + +) +from models import ProfileContext +from network import NetworkBackend + +log = logging.getLogger(__name__) + +CACHE_SIZE_BYTES = 512 * 1024 * 1024 + +BLOCKED_PROTOCOLS = { + "file", + "shell", + "cmd", + "powershell", + "ms-settings", +} + +PROMPT_PROTOCOLS = { + "mailto", + "discord", + "spotify", + "steam", +} + +BLOCKED_EXTENSIONS = { + ".bat", + ".cmd", + ".ps1", + ".vbs", + ".js", + ".jse", + ".wsf", + ".scr", + ".hta", + ".lnk", + ".reg", + ".iso", + ".img", + ".cpl", + ".msc", + ".jar", + ".msix", + ".msixbundle", + ".appinstaller", +} + + +WARN_EXTENSIONS = { + ".exe", + ".dll", + ".msi", + ".com", +} + + +class Bridge(QObject): + messageReceived = Signal(str) + pluginLogReceived = Signal(str) + pluginErrorReceived = Signal(str) + + def __init__( + self, + db, + context: ProfileContext, + ): + super().__init__() + + self.db = db + self.context = context + + @Slot(str) + def log( + self, + message: str, + ) -> None: + self.messageReceived.emit( + message + ) + + @Slot(str) + def plugin_log( + self, + message: str, + ) -> None: + self.pluginLogReceived.emit( + message + ) + + @Slot(str) + def plugin_error( + self, + message: str, + ) -> None: + self.pluginErrorReceived.emit( + message + ) + + @Slot(result=str) + def version( + self, + ) -> str: + return "1.0.0" + + @Slot(result=str) + def platform( + self, + ) -> str: + return "PySide6" + + @Slot(result=str) + def plugin_runtime( + self, + ) -> str: + return "bdapi-lite" + + @Slot(str, str, result=str) + def pluginLoad( + self, + plugin: str, + key: str, + ) -> str: + value = ( + self.db.plugin_setting_get( + self.context.name, + plugin, + key, + None, + ) + ) + + return json.dumps( + value, + ensure_ascii=False, + ) + + @Slot(str, str, str) + def pluginSave( + self, + plugin: str, + key: str, + value: str, + ) -> None: + try: + value = json.loads( + value + ) + except Exception: + pass + + self.db.plugin_setting_set( + self.context.name, + plugin, + key, + value, + ) + + @Slot(str, str, str) + def pluginLog( + self, + plugin: str, + level: str, + message: str, + ) -> None: + self.db.plugin_log( + self.context.name, + plugin, + level, + message, + ) + + @Slot(str, str) + def pluginCrash( + self, + plugin: str, + error: str, + ) -> None: + self.db.plugin_crash( + self.context.name, + plugin, + error, + ) + +# v3 +class RequestInterceptor(QWebEngineUrlRequestInterceptor): + def __init__( + self, + network: NetworkBackend, + ): + super().__init__() + self.network = network + + def interceptRequest( + self, + info, + ) -> None: + + host = ( + info.requestUrl() + .host() + .lower() + ) + + request = { + "url": info.requestUrl().toString(), + "host": host, + "headers": {}, + } + + try: + _, blocked = ( + self.network.process_request( + request + ) + ) + + if blocked: + info.block(True) + + except Exception: + log.exception( + "request interception failed" + ) +class BrowserPage(QWebEnginePage): + consoleMessage = Signal(str) + + def __init__( + self, + profile: QWebEngineProfile, + permissions: dict[str, Any], + parent=None, + ): + super().__init__( + profile, + parent, + ) + + self.permissions = permissions or {} + + self.featurePermissionRequested.connect( + self._handle_permission_request + ) + + def javaScriptConsoleMessage( + self, + level, + message: str, + line: int, + source: str, + ) -> None: + self.consoleMessage.emit( + f"{source}:{line} {message}" + ) + + def acceptNavigationRequest( + self, + url, + nav_type, + is_main_frame, + ) -> bool: + scheme = ( + url.scheme() + .lower() + .strip() + ) + + if scheme in { + "javascript", + "data", + }: + return False + + if scheme in BLOCKED_PROTOCOLS: + return False + + if scheme in PROMPT_PROTOCOLS: + result = QMessageBox.question( + self.view(), + "External Link", + ( + "Open external application?\n\n" + f"{url.toString()}" + ), + ) + + if ( + result + == QMessageBox.StandardButton.Yes + ): + QDesktopServices.openUrl( + url + ) + + return False + + return super().acceptNavigationRequest( + url, + nav_type, + is_main_frame, + ) + + def _handle_permission_request( + self, + origin, + feature, + ) -> None: + feature_name = str( + feature + ) + + allowed = False + + for ( + key, + permission, + ) in FEATURE_PERMISSION_MAP.items(): + if key in feature_name: + allowed = bool( + self.permissions.get( + permission, + False, + ) + ) + break + + self.setFeaturePermission( + origin, + feature, + ( + QWebEnginePage.PermissionPolicy.PermissionGrantedByUser + if allowed + else QWebEnginePage.PermissionPolicy.PermissionDeniedByUser + ), + ) + + +class BrowserView(QWebEngineView): + downloadProgress = Signal( + str, + int, + int, + ) + + pluginDiagnostics = Signal( + dict + ) + + def __init__( + self, + db, + context: ProfileContext, + settings: dict[str, Any] | None, + parent=None, + ): + super().__init__(parent) + + self.db = db + self.context = context + + self._settings_data = ( + settings + or dict( + DEFAULT_SETTINGS + ) + ) + + self.db.ensure_profile( + self.context.name + ) + + self.network = ( + NetworkBackend( + self.db, + self._settings_data, + ) + ) + + self.profile = ( + self._create_profile() + ) + + self.interceptor = ( + RequestInterceptor( + self.network + ) + ) + + self.profile.setUrlRequestInterceptor( + self.interceptor + ) + + self.page_ = BrowserPage( + self.profile, + self._settings_data.get( + "permissions", + {}, + ), + self, + ) + + self.setPage( + self.page_ + ) + + self.bridge = None + self.channel = None + self.addons = None + self.cdp = None + self._devtools_page = None + self._devtools_view = None + + self._configure_web_settings() + self._initialize_bridge() + self._initialize_downloads() + self._load_addons() + + def _create_profile( + self, + ) -> QWebEngineProfile: + storage_path = ( + self.context.storage_path + ) + + cache_path = ( + self.context.cache_path + ) + + profile = ( + QWebEngineProfile( + self.context.name, + self, + ) + ) + + profile.setPersistentStoragePath( + str(storage_path) + ) + + profile.setCachePath( + str(cache_path) + ) + + profile.setHttpCacheMaximumSize( + CACHE_SIZE_BYTES + ) + + profile.setPersistentCookiesPolicy( + QWebEngineProfile.PersistentCookiesPolicy.ForcePersistentCookies + ) + + return profile + + def _configure_web_settings( + self, + ) -> None: + settings = self.settings() + + for attribute in ( + QWebEngineSettings.WebAttribute.JavascriptEnabled, + QWebEngineSettings.WebAttribute.LocalStorageEnabled, + QWebEngineSettings.WebAttribute.FullScreenSupportEnabled, + QWebEngineSettings.WebAttribute.ScrollAnimatorEnabled, + QWebEngineSettings.WebAttribute.WebGLEnabled, + QWebEngineSettings.WebAttribute.Accelerated2dCanvasEnabled, + ): + settings.setAttribute( + attribute, + True, + ) + + settings.setAttribute( + QWebEngineSettings.WebAttribute.AllowRunningInsecureContent, + False, + ) + + def _initialize_bridge( + self, + ) -> None: + self.bridge = Bridge( + self.db, + self.context, + ) + + self.channel = ( + QWebChannel() + ) + + self.channel.registerObject( + "bridge", + self.bridge, + ) + + self.page_.setWebChannel( + self.channel + ) + + def _initialize_downloads( + self, + ) -> None: + self.profile.downloadRequested.connect( + self._handle_download + ) + + def _load_addons( + self, + ) -> None: + self.addons = AddonManager( + self.context.name, + self.db, + ) + + self.addons.install( + self.profile + ) + + def plugin_inventory( + self, + ) -> list[dict]: + if not self.addons: + return [] + + rows = [] + + for plugin in ( + self.addons.plugins.discover() + ): + try: + rows.append( + self.addons.plugins.metadata( + plugin + ) + ) + except Exception: + log.exception( + "plugin metadata" + ) + + return rows + + def install_plugin_file( + self, + source: str | Path, + ) -> Path: + installed = ( + self.addons.plugins.install_file( + Path(source) + ) + ) + + self.reload_addons() + + return installed + + def plugin_diagnostics( + self, + ) -> None: + self.page().runJavaScript( + """ +(() => { + if (!window.PluginRuntime) + return {}; + + return { + loaded:Object.keys( + PluginRuntime.plugins || {} + ), + failed:Object.keys( + PluginRuntime.failed || {} + ), + failures: + PluginRuntime.failed || {} + }; +})(); +""", + self.pluginDiagnostics.emit, + ) + + def initialize_cdp( + self, + websocket_url: str, + ) -> None: + if not self._settings_data.get( + "enable_cdp", + False, + ): + return + + if self.cdp: + return + + try: + self.cdp = CDPClient( + db=self.db, + websocket_url=websocket_url, + capture_bodies=False, + backend=self.network, + ) + + self.cdp.start() + + except Exception: + log.exception( + "cdp startup failed" + ) + + # v2 + def _download_directory( + self, + ) -> Path: + + directory = ( + self.context + .downloads_path + ) + + directory.mkdir( + parents=True, + exist_ok=True, + ) + + return directory + + def _handle_download( + self, + download: QWebEngineDownloadRequest, + ) -> None: + filename = ( + download.downloadFileName() + ) + + extension = ( + Path(filename) + .suffix + .lower() + ) + + if extension in BLOCKED_EXTENSIONS: + download.cancel() + + QMessageBox.warning( + self, + "Download Blocked", + filename, + ) + + return + + if extension in WARN_EXTENSIONS: + result = QMessageBox.warning( + self, + "Executable Download", + filename, + QMessageBox.StandardButton.Yes + | QMessageBox.StandardButton.No, + ) + + if ( + result + != QMessageBox.StandardButton.Yes + ): + download.cancel() + return + + download.setDownloadDirectory( + str( + self._download_directory() + ) + ) + + download.accept() + + download.receivedBytesChanged.connect( + lambda d=download: + self._emit_download_progress( + d + ) + ) + + def _emit_download_progress( + self, + download: QWebEngineDownloadRequest, + ) -> None: + self.downloadProgress.emit( + download.downloadFileName(), + download.receivedBytes(), + download.totalBytes(), + ) + + def reload_addons( + self, + ) -> None: + self.profile.scripts().clear() + self._load_addons() + self.reload() + + def open( + self, + url: str = START_URL, + ) -> None: + self.load( + QUrl(url) + ) + + def open_devtools( + self, + ) -> None: + if self._devtools_page is None: + self._devtools_page = ( + QWebEnginePage( + self.profile, + self, + ) + ) + + self.page_.setDevToolsPage( + self._devtools_page + ) + + if self._devtools_view is None: + self._devtools_view = ( + QWebEngineView() + ) + + self._devtools_view.setPage( + self._devtools_page + ) + + self._devtools_view.resize( + 1400, + 900, + ) + + self._devtools_view.show() + + def switch_profile( + self, + profile_name: str, + ) -> None: + raise RuntimeError( + "profile switching requires browser recreation" + ) + + # v3 + def install_plugin_url( + self, + url: str, + ) -> Path: + import tempfile + + from urllib.parse import ( + urlparse, + ) + from urllib.request import ( + urlopen, + ) + + parsed = urlparse( + url + ) + + if ( + parsed.scheme + not in { + "https" + } + ): + raise ValueError( + "https required" + ) + + with urlopen( + url, + timeout=30, + ) as response: + + content_length = ( + response.headers.get( + "Content-Length" + ) + ) + + if content_length: + if ( + int(content_length) + > PLUGIN_MAX_SIZE + ): + raise ValueError( + "plugin exceeds size limit" + ) + + content_type = ( + response.headers.get( + "Content-Type", + "", + ) + .split(";")[0] + .strip() + .lower() + ) + + allowed_types = { + "application/javascript", + "text/javascript", + "application/x-javascript", + "text/plain", + } + + if ( + content_type + and content_type + not in allowed_types + ): + raise ValueError( + f"unsupported content type: " + f"{content_type}" + ) + + data = bytearray() + + while True: + chunk = response.read( + 65536 + ) + + if not chunk: + break + + data.extend( + chunk + ) + + if ( + len(data) + > PLUGIN_MAX_SIZE + ): + raise ValueError( + "plugin exceeds size limit" + ) + + with tempfile.NamedTemporaryFile( + suffix=".plugin.js", + delete=False, + ) as handle: + handle.write(data) + + path = Path( + handle.name + ) + + try: + installed = ( + self.install_plugin_file( + path + ) + ) + + return installed + + finally: + path.unlink( + missing_ok=True + ) + + def remove_plugin( + self, + plugin_name: str, + ) -> bool: + removed = ( + self.addons.uninstall_plugin( + plugin_name + ) + ) + + if removed: + self.reload_addons() + + return removed + + def set_plugin_enabled( + self, + plugin_name: str, + enabled: bool, + ) -> None: + if enabled: + self.addons.enable_plugin( + plugin_name + ) + else: + self.addons.disable_plugin( + plugin_name + ) + + self.reload_addons() + + # v2 + def shutdown( + self, + ) -> None: + + if self.cdp: + try: + self.cdp.stop() + except Exception: + log.exception( + "cdp stop failed" + ) + + self.cdp = None + + try: + self.setPage(None) + except Exception: + pass + + if self.page_: + self.page_.deleteLater() + self.page_ = None + + if self.channel: + self.channel.deleteLater() + self.channel = None + + if self.bridge: + self.bridge.deleteLater() + self.bridge = None + + if self.profile: + self.profile.deleteLater() + self.profile = None + if ( + self.profile + and self.interceptor + ): + try: + self.profile.setUrlRequestInterceptor( + None + ) + except Exception: + pass + if self.interceptor: + self.interceptor.deleteLater() + self.interceptor = None + if self.page_: + try: + self.page_.triggerAction( + QWebEnginePage.WebAction.Stop + ) + except Exception: + pass + if self._devtools_page: + self._devtools_page.deleteLater() + self._devtools_page = None + + if self._devtools_view: + self._devtools_view.close() + self._devtools_view.deleteLater() + self._devtools_view = None + \ No newline at end of file diff --git a/cdp.py b/cdp.py new file mode 100644 index 0000000..37c2e75 --- /dev/null +++ b/cdp.py @@ -0,0 +1,714 @@ +# v7 +from __future__ import annotations + +import asyncio +import json +import logging +import threading +import time +import uuid +from datetime import UTC, datetime +from typing import Any +from urllib.parse import urlparse + +from PySide6.QtCore import QObject, Signal + +log = logging.getLogger(__name__) + +try: + import websockets +except Exception: + websockets = None + + +class CDPEvents(QObject): + connected = Signal() + disconnected = Signal() + requestCaptured = Signal(dict) + responseCaptured = Signal(dict) + errorOccurred = Signal(str) + + +class CDPClient: + def __init__( + self, + db, + websocket_url: str, + capture_bodies: bool = False, + backend=None, + ) -> None: + self.db = db + self.websocket_url = websocket_url + self.capture_bodies = capture_bodies + self.backend = backend + + self.events = CDPEvents() + + self._loop = None + self._thread = None + self._socket = None + + self._running = False + self._message_id = 0 + + self._requests: dict[ + str, + dict[str, Any] + ] = {} + + self._pending: dict[ + int, + asyncio.Future, + ] = {} + + self._reconnect_delay = 5 + + def start( + self, + ) -> None: + if websockets is None: + log.warning( + "websockets package not installed" + ) + return + + if self._running: + return + + self._running = True + + self._thread = threading.Thread( + target=self._thread_main, + daemon=True, + name="CDPThread", + ) + + self._thread.start() + + def stop( + self, + ) -> None: + self._running = False + + if ( + self._loop + and self._socket + ): + try: + asyncio.run_coroutine_threadsafe( + self._socket.close(), + self._loop, + ) + except Exception: + pass + + if ( + self._thread + and self._thread.is_alive() + ): + self._thread.join( + timeout=5 + ) + + self._socket = None + self._thread = None + self._pending.clear() + self._requests.clear() + + def send( + self, + method: str, + params: dict | None = None, + ) -> None: + if ( + not self._loop + or not self._running + ): + return + + asyncio.run_coroutine_threadsafe( + self._send( + method, + params or {}, + ), + self._loop, + ) + + def _thread_main( + self, + ) -> None: + self._loop = asyncio.new_event_loop() + + asyncio.set_event_loop( + self._loop + ) + + try: + self._loop.run_until_complete( + self._run() + ) + + except Exception: + log.exception( + "cdp loop failed" + ) + + finally: + self._socket = None + self._loop = None + + async def _run( + self, + ) -> None: + while self._running: + + try: + await self._connect() + + except Exception as exc: + log.exception( + "cdp connect failed" + ) + + self.events.errorOccurred.emit( + str(exc) + ) + + if not self._running: + break + + await asyncio.sleep( + self._reconnect_delay + ) + + async def _connect( + self, + ) -> None: + async with websockets.connect( + self.websocket_url, + max_size=32 * 1024 * 1024, + ping_interval=30, + ping_timeout=30, + close_timeout=10, + ) as socket: + + self._socket = socket + + self._requests.clear() + + self.events.connected.emit() + + await self._send( + "Network.enable", + {}, + ) + + await self._send( + "Page.enable", + {}, + ) + + await self._send( + "Runtime.enable", + {}, + ) + + await self._send( + "Fetch.enable", + { + "patterns": [ + { + "urlPattern": "*", + "requestStage": "Request", + } + ] + }, + ) + + while self._running: + + raw = await socket.recv() + + try: + message = json.loads( + raw + ) + + except Exception: + continue + + if "id" in message: + + future = self._pending.pop( + message["id"], + None, + ) + + if ( + future + and not future.done() + ): + future.set_result( + message + ) + + continue + + await self._handle_message( + message + ) + + self._socket = None + + for future in ( + self._pending.values() + ): + try: + if not future.done(): + future.cancel() + except Exception: + pass + + self._pending.clear() + + self.events.disconnected.emit() + + async def _send( + self, + method: str, + params: dict, + ) -> None: + if not self._socket: + return + + self._message_id += 1 + + payload = { + "id": self._message_id, + "method": method, + "params": params, + } + + try: + await self._socket.send( + json.dumps(payload) + ) + except Exception: + log.exception( + "cdp send failed" + ) + + async def _send_wait( + self, + method: str, + params: dict | None = None, + timeout: float = 5.0, + ) -> dict | None: + + if ( + not self._socket + or not self._loop + ): + return None + + self._message_id += 1 + + message_id = ( + self._message_id + ) + + payload = { + "id": message_id, + "method": method, + "params": params or {}, + } + + future = ( + self._loop.create_future() + ) + + self._pending[ + message_id + ] = future + + await self._socket.send( + json.dumps(payload) + ) + + try: + return await asyncio.wait_for( + future, + timeout, + ) + + except Exception: + return None + + finally: + self._pending.pop( + message_id, + None, + ) + + async def _handle_message( + self, + message: dict, + ) -> None: + + method = message.get( + "method" + ) + + if not method: + return + + params = message.get( + "params", + {}, + ) + + if ( + method + == "Network.requestWillBeSent" + ): + self._handle_request( + params + ) + + elif ( + method + == "Network.responseReceived" + ): + await self._handle_response( + params + ) + + elif ( + method + == "Network.loadingFailed" + ): + request_id = params.get( + "requestId" + ) + + if request_id: + self._requests.pop( + request_id, + None, + ) + + elif ( + method + == "Fetch.requestPaused" + ): + await self._continue_fetch( + params + ) + + async def _continue_fetch( + self, + params: dict, + ) -> None: + + request_id = params.get( + "requestId" + ) + + if not request_id: + return + + if self.backend: + + try: + request_data = params.get( + "request", + {}, + ) + + url = request_data.get( + "url", + "", + ) + + try: + host = ( + urlparse(url) + .hostname + or "" + ).lower() + except Exception: + host = "" + + request = { + "url": url, + "host": host, + "headers": request_data.get( + "headers", + {}, + ), + } + + _, blocked = ( + self.backend.process_request( + request + ) + ) + + if blocked: + + await self._send( + "Fetch.failRequest", + { + "requestId": + request_id, + "errorReason": + "BlockedByClient", + }, + ) + + return + + except Exception: + log.exception( + "fetch policy evaluation failed" + ) + + await self._send( + "Fetch.continueRequest", + { + "requestId": + request_id, + }, + ) + + def _handle_request( + self, + params: dict, + ) -> None: + + request = params.get( + "request", + {}, + ) + + request_id = ( + params.get( + "requestId" + ) + or str( + uuid.uuid4() + ) + ) + + data = { + "request_id": + request_id, + "timestamp": + datetime.now( + UTC + ).isoformat(), + "method": + request.get( + "method" + ), + "url": + request.get( + "url" + ), + "headers": + request.get( + "headers", + {}, + ), + "start": + time.time(), + } + + if self.backend: + + try: + data, blocked = ( + self.backend.process_request( + data + ) + ) + + if blocked: + return + + except Exception: + log.exception( + "network pipeline failed" + ) + + self._requests[ + request_id + ] = data + + self.events.requestCaptured.emit( + data + ) + + # v7 + async def _fetch_body( + self, + request_id: str, + ) -> str | None: + + result = await self._send_wait( + "Network.getResponseBody", + { + "requestId": + request_id, + }, + ) + + if not result: + return None + + return ( + result.get( + "result", + {}, + ).get( + "body" + ) + ) + + async def _handle_response( + self, + params: dict, + ) -> None: + + request_id = params.get( + "requestId" + ) + + response = params.get( + "response", + {}, + ) + + existing = ( + self._requests.pop( + request_id, + {}, + ) + ) + + duration = None + + if "start" in existing: + duration = ( + time.time() + - existing["start"] + ) * 1000 + + url = existing.get( + "url" + ) + + try: + host = ( + urlparse( + url + ).hostname + if url + else None + ) + + except Exception: + host = None + + response_body = None + + if ( + self.capture_bodies + and request_id + ): + try: + response_body = ( + await self._fetch_body( + request_id + ) + ) + except Exception: + pass + + record = { + "request_id": + request_id, + "timestamp": + existing.get( + "timestamp" + ), + "method": + existing.get( + "method" + ), + "url": + url, + "host": + host, + "status": + response.get( + "status" + ), + "request_headers": + existing.get( + "headers", + {}, + ), + "response_headers": + response.get( + "headers", + {}, + ), + "duration_ms": + duration, + } + + try: + self.db.queue_request( + request_id=record[ + "request_id" + ], + timestamp=record[ + "timestamp" + ], + method=record[ + "method" + ], + url=record[ + "url" + ], + host=record[ + "host" + ], + status_code=record[ + "status" + ], + resource_type=response.get( + "mimeType" + ), + request_headers=record[ + "request_headers" + ], + response_headers=record[ + "response_headers" + ], + request_body=None, + response_body=response_body, + duration_ms=record[ + "duration_ms" + ], + ) + + except Exception: + log.exception( + "failed to persist request" + ) + + self.events.responseCaptured.emit( + record + ) \ No newline at end of file diff --git a/config.py b/config.py new file mode 100644 index 0000000..1072f9d --- /dev/null +++ b/config.py @@ -0,0 +1,536 @@ +# v6 + +from __future__ import annotations + +import json +import logging +from pathlib import Path +from typing import Any + +APP_NAME = "Discord Client" +APP_VERSION = "1.0.0" + +START_URL = "https://discord.com/app" + +WINDOW_WIDTH = 1400 +WINDOW_HEIGHT = 900 + +LOG_LEVEL = logging.INFO + +BASE_DIR = Path.home() / ".discord-client" + +DATABASE_PATH = BASE_DIR / "client.db" +SETTINGS_FILE = BASE_DIR / "settings.json" + +PROFILES_DIR = BASE_DIR / "profiles" +DOWNLOADS_DIR = BASE_DIR / "downloads" +LOGS_DIR = BASE_DIR / "logs" + +THEMES_DIR = BASE_DIR / "themes" +EXTENSIONS_DIR = BASE_DIR / "extensions" + +EXPORTS_DIR = BASE_DIR / "exports" +HAR_DIR = EXPORTS_DIR / "har" + +DEFAULT_PROFILE = "default" + +NETWORK_RETENTION = 100_000 +NETWORK_BATCH_SIZE = 100 +NETWORK_FLUSH_INTERVAL = 1.0 + + +DISCORD_TELEMETRY_BLOCKS = [ + "sentry.io", + "www.sentry.io", + "crash.discord.com", + "telemetry.discord.com", + "api.statsig.com", + "segment.io", + "cdn.segment.com", +] + +DEFAULT_PROXY = { + "enabled": False, + "proxy_type": "http", + "host": "", + "port": 0, + "username": "", + "password": "", +} + +DEFAULT_TRAY = { + "enabled": True, + "minimize_to_tray": True, + "close_to_tray": True, +} + +DEFAULT_DEVTOOLS = { + "enabled": True, + "capture_requests": True, + "capture_responses": True, + "capture_bodies": False, + "auto_open": False, +} + +DEFAULT_PERMISSIONS = { + "microphone": False, + "camera": False, + "notifications": False, + "clipboard": False, +} + +DEFAULT_SETTINGS: dict[str, Any] = { + "active_profile": DEFAULT_PROFILE, + "window_width": WINDOW_WIDTH, + "window_height": WINDOW_HEIGHT, + "download_directory": str(DOWNLOADS_DIR), + "blocked_hosts": list( + DISCORD_TELEMETRY_BLOCKS + ), + "network_logging": True, + "enable_cdp": False, + "enable_vencord": False, + "permissions": DEFAULT_PERMISSIONS, + "proxy": DEFAULT_PROXY, + "tray": DEFAULT_TRAY, + "devtools": DEFAULT_DEVTOOLS, + "blocklist_mode": "pro", + "disabled_blocklists": [], +} + +FEATURE_PERMISSION_MAP = { + "MediaAudioCapture": "microphone", + "MediaVideoCapture": "camera", + "MediaAudioVideoCapture": "camera", + "Notifications": "notifications", + "ClipboardReadWrite": "clipboard", + "ClipboardSanitizedWrite": "clipboard", +} + +PLUGIN_FILE_PATTERN = "*.plugin.js" + +PLUGIN_MAX_SIZE = 5 * 1024 * 1024 + +PLUGIN_CRASH_LIMIT = 5 + +PLUGIN_AUTOLOAD = True + +PLUGIN_ALLOWED_EXTENSIONS = { + ".plugin.js", +} + +PLUGIN_ALLOWED_SCHEMES = { + "https", +} + +PLUGIN_BLOCKED_PATTERNS = { + "child_process.exec(", + "child_process.spawn(", + "powershell.exe", + "cmd.exe /c", + "process.exec(", +} +PLUGIN_PERMISSIONS = { + "storage", + "dom", + "network", + "websocket", + "notifications", + "clipboard", +} +PROFILE_EXPORT_VERSION = 1 + +PROFILE_EXPORT_NAME = ( + "profilebundle" +) + +# v1 +BLOCKLISTS_DIR = ( + BASE_DIR + / "blocklists" +) + +BLOCKLIST_CACHE_DIR = ( + BLOCKLISTS_DIR + / "cache" +) +def blocklists_dir( +) -> Path: + BLOCKLISTS_DIR.mkdir( + parents=True, + exist_ok=True, + ) + return BLOCKLISTS_DIR + + +# v1 +def blocklist_cache_dir( +) -> Path: + BLOCKLIST_CACHE_DIR.mkdir( + parents=True, + exist_ok=True, + ) + return BLOCKLIST_CACHE_DIR + +def ensure_directories() -> None: + for directory in ( + BASE_DIR, + PROFILES_DIR, + DOWNLOADS_DIR, + LOGS_DIR, + THEMES_DIR, + EXTENSIONS_DIR, + EXPORTS_DIR, + HAR_DIR, + BLOCKLISTS_DIR, + BLOCKLIST_CACHE_DIR, + ): + directory.mkdir( + parents=True, + exist_ok=True, + ) + +def merge_dict( + defaults: dict, + current: dict, +) -> dict: + merged = dict(defaults) + + for key, value in current.items(): + if ( + key in merged + and isinstance( + merged[key], + dict, + ) + and isinstance( + value, + dict, + ) + ): + merged[key] = merge_dict( + merged[key], + value, + ) + else: + merged[key] = value + + return merged + + +def _normalize_proxy( + settings: dict[str, Any], +) -> None: + proxy = settings.get( + "proxy", + {}, + ) + + if not isinstance( + proxy, + dict, + ): + settings["proxy"] = dict( + DEFAULT_PROXY + ) + return + + if ( + "type" in proxy + and "proxy_type" + not in proxy + ): + proxy["proxy_type"] = proxy.pop( + "type" + ) + + settings["proxy"] = merge_dict( + DEFAULT_PROXY, + proxy, + ) + + +def _normalize_permissions( + settings: dict[str, Any], +) -> None: + permissions = settings.get( + "permissions", + {}, + ) + + if not isinstance( + permissions, + dict, + ): + permissions = {} + + settings["permissions"] = merge_dict( + DEFAULT_PERMISSIONS, + permissions, + ) + + +def _normalize_blocked_hosts( + settings: dict[str, Any], +) -> None: + hosts = { + str(host) + .strip() + .lower() + for host in settings.get( + "blocked_hosts", + [], + ) + if str(host).strip() + } + + hosts.update( + DISCORD_TELEMETRY_BLOCKS + ) + + settings["blocked_hosts"] = sorted( + hosts + ) + + +def load_settings() -> dict[str, Any]: + ensure_directories() + + if not SETTINGS_FILE.exists(): + settings = merge_dict( + {}, + DEFAULT_SETTINGS, + ) + + save_settings( + settings + ) + + return settings + + try: + loaded = json.loads( + SETTINGS_FILE.read_text( + encoding="utf-8" + ) + ) + + if not isinstance( + loaded, + dict, + ): + raise ValueError + + if ( + "active_profile" + not in loaded + and "active_account" + in loaded + ): + loaded[ + "active_profile" + ] = loaded[ + "active_account" + ] + + settings = merge_dict( + DEFAULT_SETTINGS, + loaded, + ) + + if not settings.get( + "active_profile" + ): + settings[ + "active_profile" + ] = DEFAULT_PROFILE + + settings[ + "download_directory" + ] = str( + settings.get( + "download_directory" + ) + or DOWNLOADS_DIR + ) + + _normalize_proxy( + settings + ) + + _normalize_permissions( + settings + ) + + _normalize_blocked_hosts( + settings + ) + + return settings + + except Exception: + settings = merge_dict( + {}, + DEFAULT_SETTINGS, + ) + + save_settings( + settings + ) + + return settings + + +def save_settings( + settings: dict[str, Any], +) -> None: + ensure_directories() + + _normalize_proxy( + settings + ) + + _normalize_permissions( + settings + ) + + _normalize_blocked_hosts( + settings + ) + + SETTINGS_FILE.write_text( + json.dumps( + settings, + indent=2, + sort_keys=True, + ensure_ascii=False, + ), + encoding="utf-8", + ) + + +def profile_root( + profile_name: str, +) -> Path: + profile_name = ( + str(profile_name).strip() + or DEFAULT_PROFILE + ) + + root = ( + PROFILES_DIR + / profile_name + ) + + root.mkdir( + parents=True, + exist_ok=True, + ) + + return root + + +def profile_paths( + profile_name: str, +) -> tuple[Path, Path]: + root = profile_root( + profile_name + ) + + storage = root / "storage" + cache = root / "cache" + + storage.mkdir( + parents=True, + exist_ok=True, + ) + + cache.mkdir( + parents=True, + exist_ok=True, + ) + + return ( + storage, + cache, + ) + + +def profile_theme_dir( + profile_name: str, +) -> Path: + path = ( + THEMES_DIR + / profile_name + ) + + path.mkdir( + parents=True, + exist_ok=True, + ) + + return path + + +def profile_extension_dir( + profile_name: str, +) -> Path: + path = ( + EXTENSIONS_DIR + / profile_name + ) + + path.mkdir( + parents=True, + exist_ok=True, + ) + + return path + + +def profile_vencord_dir( + profile_name: str, +) -> Path: + path = ( + profile_root( + profile_name + ) + / "vencord" + ) + + path.mkdir( + parents=True, + exist_ok=True, + ) + + for child in ( + "plugins", + "themes", + "logs", + ): + ( + path / child + ).mkdir( + parents=True, + exist_ok=True, + ) + + return path + + +def profile_plugin_dir( + profile_name: str, +) -> Path: + root = ( + profile_vencord_dir( + profile_name + ) + / "plugins" + ) + + root.mkdir( + parents=True, + exist_ok=True, + ) + + return root \ No newline at end of file