diff --git a/main.py b/main.py new file mode 100644 index 0000000..b48d111 --- /dev/null +++ b/main.py @@ -0,0 +1,691 @@ +# v6 + +from __future__ import annotations +import logging +import os +import sys +from logging.handlers import RotatingFileHandler +import traceback + +from theme import ( + apply_dark_palette, + apply_stylesheet, +) +from PySide6.QtCore import Qt +from PySide6.QtGui import QIcon +from PySide6.QtWidgets import ( + QApplication, + QLabel, + QMainWindow, + QMessageBox, + QStatusBar, + +) +# whats needed +from browser import BrowserView +from config import ( + APP_NAME, + APP_VERSION, + DATABASE_PATH, + LOG_LEVEL, + LOGS_DIR, + START_URL, + WINDOW_HEIGHT, + WINDOW_WIDTH, + ensure_directories, + load_settings, + save_settings, +) +from db import Database + +from managers import ( + AddonManagerWindow, + NetworkInspector, + PluginManagerWindow, + ProfileManager, + SettingsWindow, + TrayManager, + ContextManager, +) + +# v2 +os.environ.setdefault( + "QTWEBENGINE_CHROMIUM_FLAGS", + ( + "--disable-features=" + "Translate," + "MediaRouter " + "--enable-features=" + "DnsOverHttps " + "--dns-over-https-mode=secure " + "--dns-over-https-templates=" + "https://dns.quad9.net/dns-query " + "--disable-background-networking " + "--disable-component-update " + "--disable-domain-reliability " + "--disable-breakpad " + "--no-pings " + "--enable-gpu-rasterization " + "--enable-zero-copy " + "--disable-sync " + "--disable-translate " + "--disable-features=AutofillServerCommunication " + "--enable-native-gpu-memory-buffers " + ), +) + + +LOG_FILE = LOGS_DIR / "client.log" + + +def configure_logging() -> None: + LOG_FILE.parent.mkdir( + parents=True, + exist_ok=True, + ) + + root = logging.getLogger() + + root.setLevel( + LOG_LEVEL + ) + + root.handlers.clear() + + formatter = logging.Formatter( + "%(asctime)s %(levelname)s %(name)s: %(message)s" + ) + + file_handler = ( + RotatingFileHandler( + LOG_FILE, + maxBytes=10 * 1024 * 1024, + backupCount=5, + encoding="utf-8", + ) + ) + + file_handler.setFormatter( + formatter + ) + + console_handler = ( + logging.StreamHandler() + ) + + console_handler.setFormatter( + formatter + ) + + root.addHandler( + file_handler + ) + + root.addHandler( + console_handler + ) + + +configure_logging() + +log = logging.getLogger( + __name__ +) + + +class MainWindow(QMainWindow): + def __init__( + self, + db: Database, + settings: dict, + ): + super().__init__() + + self.db = db + self.settings_data = settings + + self.contexts = ContextManager( + self.db, + self.settings_data, + ) + + self.plugins_window = None + self.settings_window = None + self.profile_window = None + self.addons_window = None + self.network_window = None + self.tray_manager = None + + self.setWindowTitle( + f"{APP_NAME}---{APP_VERSION}" + ) + + self.resize( + max( + 800, + int( + settings.get( + "window_width", + WINDOW_WIDTH, + ) + ), + ), + max( + 600, + int( + settings.get( + "window_height", + WINDOW_HEIGHT, + ) + ), + ), + ) + + self.browser = BrowserView( + db=self.db, + context=self.context, + settings=self.settings_data, + parent=self, + ) + + self.setCentralWidget( + self.browser + ) + + self._build_status_bar() + self._connect_signals() + self._build_tray() + + self.browser.open( + START_URL + ) + + @property + def context(self): + return self.contexts.current() + + def _build_status_bar( + self, + ) -> None: + self.status_bar = ( + QStatusBar( + self + ) + ) + + self.setStatusBar( + self.status_bar + ) + + self.download_label = QLabel( + "Ready", + self, + ) + + self.status_bar.addPermanentWidget( + self.download_label + ) + self.status_bar.addPermanentWidget( + QLabel( + f"Blocked: " + f"{self.browser.network.blocklist_engine.count():,}" + ) + ) + + def _connect_signals( + self, + ) -> None: + self.browser.downloadProgress.connect( + self._download_progress + ) + + self.browser.page_.consoleMessage.connect( + self._console_message + ) + + self.browser.bridge.messageReceived.connect( + self._bridge_message + ) + + def _build_tray( + self, + ) -> None: + tray_cfg = self.settings_data.get( + "tray", + {}, + ) + + if not tray_cfg.get( + "enabled", + True, + ): + return + + self.tray_manager = ( + TrayManager( + self + ) + ) + + self.tray_manager.open_action.triggered.connect( + self._show_window + ) + + self.tray_manager.tray.show() + + def _show_window( + self, + ) -> None: + self.showNormal() + self.raise_() + self.activateWindow() + + def _console_message( + self, + message: str, + ) -> None: + ignored = { + "setMemoryInformation not available", + "[ProcessUtilsElectron]", + } + + if any( + token in message + for token in ignored + ): + return + + log.info( + "console=%s", + message, + ) + + def _bridge_message( + self, + message: str, + ) -> None: + log.info( + "bridge=%s", + message, + ) + + def _download_progress( + self, + filename: str, + received: int, + total: int, + ) -> None: + if total <= 0: + self.download_label.setText( + f"{filename}: {received:,} B" + ) + return + + percent = int( + ( + received + / max( + total, + 1, + ) + ) + * 100 + ) + + self.download_label.setText( + ( + f"{filename}: " + f"{percent}% " + f"({received:,}/{total:,})" + ) + ) + + def open_settings( + self, + ) -> None: + window = SettingsWindow( + self.settings_data, + self, + ) + + window.settingsChanged.connect( + self._settings_changed + ) + + self.settings_window = window + + window.exec() + + def open_plugins( + self, + ) -> None: + self.plugins_window = ( + PluginManagerWindow( + self.browser, + self.db, + self.context.name, + self, + ) + ) + + self.plugins_window.show() + + def open_profiles( + self, + ) -> None: + window = ProfileManager( + self.db.list_profiles(), + self.settings_data.get( + "active_profile", + "default", + ), + self, + ) + + window.profileSelected.connect( + self._profile_selected + ) + + self.profile_window = window + + window.exec() + + def open_addons( + self, + ) -> None: + self.addons_window = ( + AddonManagerWindow( + self.context, + self, + ) + ) + + self.addons_window.show() + + def open_network( + self, + ) -> None: + self.network_window = ( + NetworkInspector( + self.browser.network, + self, + ) + ) + + self.network_window.show() + + def open_devtools( + self, + ) -> None: + self.browser.open_devtools() + + def _settings_changed( + self, + ) -> None: + save_settings( + self.settings_data + ) + + def _profile_selected( + self, + profile_name: str, + ) -> None: + current = ( + self.settings_data.get( + "active_profile" + ) + ) + + if profile_name == current: + return + + self.settings_data[ + "active_profile" + ] = profile_name + + save_settings( + self.settings_data + ) + + QMessageBox.information( + self, + APP_NAME, + ( + "Profile updated.\n\n" + "Restart required." + ), + ) + + def _persist_window_state( + self, + ) -> None: + size = self.size() + + self.settings_data[ + "window_width" + ] = size.width() + + self.settings_data[ + "window_height" + ] = size.height() + + save_settings( + self.settings_data + ) + + def _shutdown( + self, + ) -> None: + try: + self.browser.shutdown() + except Exception: + log.exception( + "browser shutdown failed" + ) + + try: + self.db.shutdown() + except Exception: + log.exception( + "database shutdown failed" + ) + + # v2 + def closeEvent( + self, + event, + ): + if getattr( + self, + "_closing", + False, + ): + event.accept() + return + + tray_cfg = self.settings_data.get( + "tray", + {}, + ) + + if ( + tray_cfg.get( + "close_to_tray", + True, + ) + and self.tray_manager + ): + event.ignore() + self.hide() + return + + self._closing = True + + try: + self._persist_window_state() + + for widget in ( + self.plugins_window, + self.settings_window, + self.profile_window, + self.addons_window, + self.network_window, + ): + if widget: + try: + widget.close() + except Exception: + log.exception( + "child window close failed" + ) + + self._shutdown() + + except Exception: + log.exception( + "shutdown failed" + ) + + event.accept() + + super().closeEvent( + event + ) + + + +def build_database() -> Database: + return Database( + DATABASE_PATH + ) + +# v3 +def build_app() -> QApplication: + os.environ.setdefault( + "QTWEBENGINE_REMOTE_DEBUGGING", + "0", + ) + + os.environ.setdefault( + "CHROME_CRASHPAD_PIPE_NAME", + "", + ) + + QApplication.setAttribute( + Qt.ApplicationAttribute.AA_ShareOpenGLContexts + ) + + return QApplication( + sys.argv + ) + + +def fatal_error( + text: str, +) -> None: + try: + app = QApplication.instance() + + if app is None: + app = QApplication( + sys.argv + ) + + QMessageBox.critical( + None, + APP_NAME, + text, + ) + + except Exception: + pass +# v3 +def main() -> int: + try: + ensure_directories() + + settings = load_settings() + + db = build_database() + + QApplication.setDesktopSettingsAware( + True + ) + + app = build_app() + + icon = QIcon( + "app.ico" + ) + + if icon.isNull(): + + for candidate in ( + "ui/app.ico", + "assets/app.ico", + "icon.ico", + ): + test = QIcon( + candidate + ) + + if not test.isNull(): + icon = test + break + + if not icon.isNull(): + app.setWindowIcon( + icon + ) + + app.setApplicationName( + APP_NAME + ) + + app.setApplicationVersion( + APP_VERSION + ) + + app.setStyle( + "Fusion" + ) + + apply_dark_palette( + app + ) + + app.setStyleSheet( + apply_stylesheet() + ) + + window = MainWindow( + db=db, + settings=settings, + ) + + window.show() + + return app.exec() + + except KeyboardInterrupt: + return 0 + + except SystemExit: + return 0 + + except Exception as exc: + log.exception( + "fatal startup error" + ) + + fatal_error( + str(exc) + ) + + return 1 + + +if __name__ == "__main__": + raise SystemExit( + main() + ) \ No newline at end of file diff --git a/managers.py b/managers.py new file mode 100644 index 0000000..08d9f56 --- /dev/null +++ b/managers.py @@ -0,0 +1,1609 @@ + +from __future__ import annotations +import json +from pathlib import Path + +from PySide6.QtCore import QObject, Qt, Signal +from PySide6.QtGui import QAction, QIcon +from PySide6.QtWidgets import ( + QApplication, + QCheckBox, + QDialog, + QFileDialog, + QFormLayout, + QGroupBox, + QHBoxLayout, + QLineEdit, + QListWidget, + QMainWindow, + QMenu, + QMessageBox, + QPlainTextEdit, + QPushButton, + QSplitter, + QSystemTrayIcon, + QTableView, + QTabWidget, + QVBoxLayout, + QWidget, + QInputDialog, +) + +from models import ( + AddonModel, + NetworkTableModel, + PluginCrashTableModel, + PluginLogTableModel, + PluginTableModel, + ProfileContext, +) +from addons import AddonManager +from network import ProxyConfig +from config import ( + PLUGIN_ALLOWED_EXTENSIONS, + PLUGIN_BLOCKED_PATTERNS, + PLUGIN_FILE_PATTERN, + PLUGIN_MAX_SIZE, + profile_extension_dir, + profile_plugin_dir, + profile_theme_dir, + profile_root, +) + +# v2 +class ContextManager: + def __init__( + self, + db, + settings: dict, + ): + self.db = db + self.settings = settings + self._contexts: dict[ + str, + ProfileContext, + ] = {} + + def current( + self, + ) -> ProfileContext: + return self.profile( + self.settings.get( + "active_profile", + "default", + ) + ) + + def get( + self, + profile_name: str, + ) -> ProfileContext: + return self.profile( + profile_name + ) + + def register( + self, + context: ProfileContext, + ) -> None: + self._contexts[ + context.name + ] = context + + def remove( + self, + profile_name: str, + ) -> None: + self._contexts.pop( + profile_name, + None, + ) + + def profile( + self, + profile_name: str, + ) -> ProfileContext: + + if ( + profile_name + in self._contexts + ): + return self._contexts[ + profile_name + ] + + self.db.ensure_profile( + profile_name + ) + + root = profile_root( + profile_name + ) + + # v3 + context = ProfileContext( + name=profile_name, + root=root, + storage_path=( + root / "storage" + ), + cache_path=( + root / "cache" + ), + downloads_path=( + root / "downloads" + ), + plugins_path=( + profile_plugin_dir( + profile_name + ) + ), + themes_path=( + profile_theme_dir( + profile_name + ) + ), + extensions_path=( + profile_extension_dir( + profile_name + ) + ), + profile_db=( + root / "profile.db" + ), + permissions_db=( + root / "permissions.db" + ), + network_db=( + root / "network.db" + ), + ) + context.storage_path.mkdir( + parents=True, + exist_ok=True, + ) + + context.cache_path.mkdir( + parents=True, + exist_ok=True, + ) + + context.downloads_path.mkdir( + parents=True, + exist_ok=True, + ) + + self._contexts[ + profile_name + ] = context + + return context + + def switch( + self, + profile_name: str, + ) -> ProfileContext: + + self.settings[ + "active_profile" + ] = profile_name + save_settings( + self.settings + ) + return self.profile( + profile_name + ) +class PluginManagerWindow( + QDialog +): + def __init__( + self, + browser, + db, + profile_name, + parent=None, + ): + super().__init__( + parent + ) + + self.browser = browser + self.db = db + self.profile_name = ( + profile_name + ) + + self.resize( + 1200, + 850, + ) + + self.setWindowTitle( + "Plugins" + ) + + self.model = ( + PluginTableModel() + ) + + self.logs_model = ( + PluginLogTableModel() + ) + + self.crash_model = ( + PluginCrashTableModel() + ) + + self.plugins_table = ( + QTableView() + ) + + self.logs_table = ( + QTableView() + ) + + self.crashes_table = ( + QTableView() + ) + + self.diagnostics = ( + QPlainTextEdit() + ) + + self.diagnostics.setReadOnly( + True + ) + + self.plugins_table.setModel( + self.model + ) + + self.logs_table.setModel( + self.logs_model + ) + + self.crashes_table.setModel( + self.crash_model + ) + + tabs = QTabWidget() + + tabs.addTab( + self.plugins_table, + "Plugins", + ) + + tabs.addTab( + self.logs_table, + "Logs", + ) + + tabs.addTab( + self.crashes_table, + "Crashes", + ) + + tabs.addTab( + self.diagnostics, + "Diagnostics", + ) + + self.install_file_btn = ( + QPushButton( + "Install File" + ) + ) + + self.install_url_btn = ( + QPushButton( + "Install URL" + ) + ) + + self.enable_btn = ( + QPushButton( + "Enable" + ) + ) + + self.disable_btn = ( + QPushButton( + "Disable" + ) + ) + + self.reload_btn = ( + QPushButton( + "Reload" + ) + ) + + self.delete_btn = ( + QPushButton( + "Delete" + ) + ) + + buttons = QHBoxLayout() + + for button in ( + self.install_file_btn, + self.install_url_btn, + self.enable_btn, + self.disable_btn, + self.reload_btn, + self.delete_btn, + ): + buttons.addWidget( + button + ) + + layout = QVBoxLayout( + self + ) + + layout.addWidget( + tabs + ) + + layout.addLayout( + buttons + ) + + self.install_file_btn.clicked.connect( + self.install_file + ) + + self.install_url_btn.clicked.connect( + self.install_url + ) + + self.enable_btn.clicked.connect( + self.enable_plugin + ) + + self.disable_btn.clicked.connect( + self.disable_plugin + ) + + self.reload_btn.clicked.connect( + self.reload_plugins + ) + + self.delete_btn.clicked.connect( + self.delete_plugin + ) + + self.reload() + + def selected_plugin( + self, + ) -> str | None: + index = ( + self.plugins_table.currentIndex() + ) + + if not index.isValid(): + return None + + row = self.model.row( + index.row() + ) + + return row.get( + "name" + ) + + def install_file( + self, + ) -> None: + path, _ = ( + QFileDialog.getOpenFileName( + self, + "Install Plugin", + "", + "*.plugin.js", + ) + ) + + if not path: + return + + try: + self.browser.install_plugin_file( + path + ) + + self.reload() + + except Exception as exc: + QMessageBox.warning( + self, + "Install Failed", + str(exc), + ) + + # v2 + def install_url( + self, + ) -> None: + url, accepted = ( + QInputDialog.getText( + self, + "Install URL", + "Plugin URL", + ) + ) + + url = url.strip() + + if ( + not accepted + or not url + ): + return + + try: + self.browser.install_plugin_url( + url + ) + + self.reload() + + except Exception as exc: + QMessageBox.warning( + self, + "Install Failed", + str(exc), + ) + + def enable_plugin( + self, + ) -> None: + plugin = ( + self.selected_plugin() + ) + + if not plugin: + return + + self.browser.set_plugin_enabled( + plugin, + True, + ) + + self.reload() + + def disable_plugin( + self, + ) -> None: + plugin = ( + self.selected_plugin() + ) + + if not plugin: + return + + self.browser.set_plugin_enabled( + plugin, + False, + ) + + self.reload() + + def delete_plugin( + self, + ) -> None: + plugin = ( + self.selected_plugin() + ) + + if not plugin: + return + + result = ( + QMessageBox.question( + self, + "Delete Plugin", + plugin, + ) + ) + + if ( + result + != QMessageBox.StandardButton.Yes + ): + return + + self.browser.remove_plugin( + plugin + ) + + self.reload() + + def reload_plugins( + self, + ) -> None: + self.browser.reload_addons() + self.reload() + + def reload( + self, + ) -> None: + self.model.set_plugins( + self.browser.plugin_inventory() + ) + + self.logs_model.set_rows( + list( + self.db.plugin_logs( + self.profile_name + ) + ) + ) + + self.crash_model.set_rows( + list( + self.db.plugin_crashes( + self.profile_name + ) + ) + ) + + self.diagnostics.setPlainText( + json.dumps( + self.browser.plugin_diagnostics_snapshot(), + indent=2, + ensure_ascii=False, + ) + ) +class SettingsWindow(QDialog): + settingsChanged = Signal() + + def __init__(self, settings: dict, parent=None): + super().__init__(parent) + self.settings = settings + self.setWindowTitle("Settings") + self.resize(950, 700) + + tabs = QTabWidget() + tabs.addTab(self._general_tab(), "General") + tabs.addTab(self._network_tab(), "Network") + tabs.addTab(self._permissions_tab(), "Permissions") + tabs.addTab(self._privacy_tab(), "Privacy") + tabs.addTab(self._developer_tab(), "Developer") + + layout = QVBoxLayout(self) + layout.addWidget(tabs) + + save_button = QPushButton("Save") + save_button.clicked.connect(self._save) + layout.addWidget(save_button) + + def _general_tab(self): + widget = QWidget() + layout = QFormLayout(widget) + + self.download_path = QLineEdit( + self.settings.get("download_directory", "") + ) + + layout.addRow("Downloads", self.download_path) + return widget + + def _network_tab(self): + widget = QWidget() + layout = QFormLayout(widget) + + self.network_logging = QCheckBox() + self.network_logging.setChecked( + self.settings.get("network_logging", True) + ) + + layout.addRow("Enable Logging", self.network_logging) + return widget + + def _permissions_tab(self): + widget = QWidget() + layout = QVBoxLayout(widget) + + permissions = self.settings.setdefault( + "permissions", + {}, + ) + + self.permission_boxes = {} + + for key, title in ( + ("microphone", "Microphone"), + ("camera", "Camera"), + ("clipboard", "Clipboard"), + ("notifications", "Notifications"), + ("downloads", "Downloads"), + ("fullscreen", "Fullscreen"), + ): + box = QCheckBox(title) + box.setChecked( + bool( + permissions.get( + key, + False, + ) + ) + ) + self.permission_boxes[key] = box + layout.addWidget(box) + + layout.addStretch() + return widget + + def _privacy_tab(self): + widget = QWidget() + layout = QVBoxLayout(widget) + + blocked = set( + self.settings.get( + "blocked_hosts", + [], + ) + ) + + self.discord_telemetry = QCheckBox( + "Block Discord Telemetry" + ) + self.discord_telemetry.setChecked( + "telemetry.discord.com" in blocked + ) + + self.discord_crash = QCheckBox( + "Block Discord Crash Reporting" + ) + self.discord_crash.setChecked( + "crash.discord.com" in blocked + ) + + self.sentry = QCheckBox( + "Block Sentry" + ) + self.sentry.setChecked( + "sentry.io" in blocked + ) + + self.statsig = QCheckBox( + "Block Statsig" + ) + self.statsig.setChecked( + "api.statsig.com" in blocked + ) + + layout.addWidget(self.discord_telemetry) + layout.addWidget(self.discord_crash) + layout.addWidget(self.sentry) + layout.addWidget(self.statsig) + + group = QGroupBox( + "Blocked Hosts" + ) + + group_layout = QVBoxLayout(group) + + self.blocked_hosts = QPlainTextEdit() + self.blocked_hosts.setPlainText( + "\n".join( + sorted(blocked) + ) + ) + + group_layout.addWidget( + self.blocked_hosts + ) + + layout.addWidget(group) + + return widget + + def _developer_tab(self): + widget = QWidget() + layout = QFormLayout(widget) + + self.enable_cdp = QCheckBox() + + self.enable_cdp.setChecked( + self.settings.get( + "enable_cdp", + True, + ) + ) + + layout.addRow( + "Enable CDP", + self.enable_cdp, + ) + + return widget + + def _save(self): + self.settings["download_directory"] = ( + self.download_path.text().strip() + ) + + self.settings["network_logging"] = ( + self.network_logging.isChecked() + ) + + self.settings["enable_cdp"] = ( + self.enable_cdp.isChecked() + ) + + permissions = self.settings.setdefault( + "permissions", + {}, + ) + + for key, box in self.permission_boxes.items(): + permissions[key] = ( + box.isChecked() + ) + + blocked = { + line.strip().lower() + for line in self.blocked_hosts.toPlainText().splitlines() + if line.strip() + } + + if self.discord_telemetry.isChecked(): + blocked.add( + "telemetry.discord.com" + ) + + if self.discord_crash.isChecked(): + blocked.add( + "crash.discord.com" + ) + + if self.sentry.isChecked(): + blocked.add("sentry.io") + + if self.statsig.isChecked(): + blocked.add( + "api.statsig.com" + ) + + self.settings[ + "blocked_hosts" + ] = sorted(blocked) + + self.settingsChanged.emit() + self.accept() + +# v4 +class ProfileManager(QDialog): + profileSelected = Signal(str) + + PROFILE_MAX_FILES = 10000 + PROFILE_MAX_FILE_SIZE = 50 * 1024 * 1024 + PROFILE_MAX_TOTAL_SIZE = 500 * 1024 * 1024 + + PROFILE_EXPORT_SKIP = { + "cache", + "GPUCache", + "Code Cache", + "blob_storage", + "Service Worker", + "Session Storage", + } + + def __init__( + self, + profiles: list[str], + current: str, + parent=None, + ): + super().__init__(parent) + + self.setWindowTitle( + "Profiles" + ) + + self.resize(500, 550) + + self.list_widget = QListWidget() + + profiles = sorted(profiles) + + self.list_widget.addItems( + profiles + ) + + if current in profiles: + self.list_widget.setCurrentRow( + profiles.index(current) + ) + + switch_button = QPushButton( + "Switch" + ) + + export_button = QPushButton( + "Export" + ) + + import_button = QPushButton( + "Import" + ) + + switch_button.clicked.connect( + self._switch + ) + + export_button.clicked.connect( + self._export + ) + + import_button.clicked.connect( + self._import + ) + + buttons = QHBoxLayout() + buttons.addWidget(switch_button) + buttons.addWidget(export_button) + buttons.addWidget(import_button) + + layout = QVBoxLayout(self) + layout.addWidget(self.list_widget) + layout.addLayout(buttons) + + def _switch( + self, + ): + item = self.list_widget.currentItem() + + if not item: + return + + self.profileSelected.emit( + item.text() + ) + + self.accept() + + def _export( + self, + ): + item = ( + self.list_widget.currentItem() + ) + + if not item: + return + + profile_name = ( + item.text() + ) + + path, _ = ( + QFileDialog.getSaveFileName( + self, + "Export Profile", + f"{profile_name}.profilebundle", + "*.profilebundle", + ) + ) + + if not path: + return + + profile_root_path = ( + profile_root( + profile_name + ) + ).resolve() + + export = { + "version": 1, + "profile": profile_name, + "storage": {}, + } + + file_count = 0 + total_size = 0 + + for file in ( + profile_root_path.rglob( + "*" + ) + ): + if ( + not file.is_file() + or file.is_symlink() + ): + continue + + try: + resolved = ( + file.resolve() + ) + + if ( + profile_root_path + not in resolved.parents + ): + continue + + relative = ( + resolved.relative_to( + profile_root_path + ) + ) + + if any( + part + in self.PROFILE_EXPORT_SKIP + for part in relative.parts + ): + continue + + size = ( + resolved.stat() + .st_size + ) + + if ( + size + > self.PROFILE_MAX_FILE_SIZE + ): + continue + + file_count += 1 + total_size += size + + if ( + file_count + > self.PROFILE_MAX_FILES + ): + raise ValueError( + "profile exceeds file limit" + ) + + if ( + total_size + > self.PROFILE_MAX_TOTAL_SIZE + ): + raise ValueError( + "profile exceeds size limit" + ) + + export[ + "storage" + ][ + str(relative) + ] = ( + resolved.read_bytes() + .hex() + ) + + except Exception: + raise + + Path(path).write_text( + json.dumps( + export, + indent=2, + ensure_ascii=False, + ), + encoding="utf-8", + ) + + QMessageBox.information( + self, + "Export", + "Profile exported.", + ) + + def _import( + self, + ): + path, _ = ( + QFileDialog.getOpenFileName( + self, + "Import Profile", + "", + "*.profilebundle", + ) + ) + + if not path: + return + + try: + bundle = json.loads( + Path(path).read_text( + encoding="utf-8" + ) + ) + + profile_name = str( + bundle.get( + "profile", + "", + ) + ).strip() + + if not profile_name: + raise ValueError( + "invalid profile" + ) + + root = profile_root( + profile_name + ) + + root.mkdir( + parents=True, + exist_ok=True, + ) + + root_resolved = ( + root.resolve() + ) + + file_count = 0 + total_size = 0 + + for ( + relative, + payload, + ) in bundle.get( + "storage", + {}, + ).items(): + + file_count += 1 + + if ( + file_count + > self.PROFILE_MAX_FILES + ): + raise ValueError( + "profile exceeds file limit" + ) + + size = ( + len(payload) + // 2 + ) + + if ( + size + > self.PROFILE_MAX_FILE_SIZE + ): + continue + + total_size += size + + if ( + total_size + > self.PROFILE_MAX_TOTAL_SIZE + ): + raise ValueError( + "profile exceeds size limit" + ) + + target = ( + root + / relative + ).resolve() + + if ( + root_resolved + not in target.parents + and target + != root_resolved + ): + continue + + target.parent.mkdir( + parents=True, + exist_ok=True, + ) + + target.write_bytes( + bytes.fromhex( + payload + ) + ) + + QMessageBox.information( + self, + "Import", + "Profile imported.", + ) + + except Exception as exc: + QMessageBox.warning( + self, + "Import", + str(exc), + ) +class AddonManagerWindow(QDialog): + def __init__( + self, + context: ProfileContext, + parent=None, + ): + super().__init__(parent) + + self.context = context + + self.manager = AddonManager( + context.name, + getattr( + parent, + "db", + None, + ), + ) + + self.setWindowTitle( + "Themes & Extensions" + ) + + self.resize(900, 650) + + self.model = AddonModel() + + self.table = QTableView() + self.table.setModel(self.model) + + refresh = QPushButton( + "Refresh" + ) + + refresh.clicked.connect( + self.reload + ) + + layout = QVBoxLayout(self) + layout.addWidget(self.table) + layout.addWidget(refresh) + + self.reload() + + + def reload( + self, + ): + rows = [] + + for theme in ( + self.manager.themes.discover() + ): + rows.append( + { + "name": theme.stem, + "enabled": True, + "type": "Theme", + } + ) + + for extension in ( + self.manager.extensions.discover() + ): + enabled = True + + manifest = ( + extension + / "manifest.json" + ) + + if manifest.exists(): + try: + enabled = bool( + json.loads( + manifest.read_text( + encoding="utf-8" + ) + ).get( + "enabled", + True, + ) + ) + except Exception: + pass + + rows.append( + { + "name": extension.name, + "enabled": enabled, + "type": "Extension", + } + ) + + for plugin in ( + self.manager.plugins.inventory() + ): + rows.append( + { + "name": plugin[ + "name" + ], + "enabled": plugin.get( + "enabled", + True, + ), + "type": "Plugin", + } + ) + + self.model.set_addons( + rows + ) + + +class ThemeEditor(QDialog): + def __init__( + self, + theme_path: Path, + parent=None, + ): + super().__init__(parent) + + self.theme_path = theme_path + + self.setWindowTitle( + theme_path.name + ) + + self.resize(1000, 700) + + self.editor = QPlainTextEdit() + + if theme_path.exists(): + self.editor.setPlainText( + theme_path.read_text( + encoding="utf-8" + ) + ) + + save_button = QPushButton( + "Save" + ) + + save_button.clicked.connect( + self.save + ) + + layout = QVBoxLayout(self) + layout.addWidget(self.editor) + layout.addWidget(save_button) + + def save(self): + self.theme_path.write_text( + self.editor.toPlainText(), + encoding="utf-8", + ) + self.accept() + + +class ProxyManagerWindow(QDialog): + def __init__( + self, + network, + profile_name: str, + parent=None, + ): + super().__init__(parent) + + self.network = network + self.profile_name = profile_name + + self.setWindowTitle( + "Proxy Settings" + ) + + self.resize(500, 350) + + proxy = ( + network.proxy.get_proxy( + profile_name + ) + ) + + self.enabled = QCheckBox() + self.enabled.setChecked( + proxy.enabled + ) + + self.host = QLineEdit( + proxy.host + ) + + self.port = QLineEdit( + str(proxy.port) + ) + + self.username = QLineEdit( + proxy.username + ) + + self.password = QLineEdit( + proxy.password + ) + + self.type_box = QLineEdit( + proxy.proxy_type + ) + + layout = QFormLayout(self) + + layout.addRow( + "Enabled", + self.enabled, + ) + + layout.addRow( + "Type", + self.type_box, + ) + + layout.addRow( + "Host", + self.host, + ) + + layout.addRow( + "Port", + self.port, + ) + + layout.addRow( + "Username", + self.username, + ) + + layout.addRow( + "Password", + self.password, + ) + + save_button = QPushButton( + "Save" + ) + + save_button.clicked.connect( + self.save + ) + + layout.addRow(save_button) + + def save(self): + try: + port = int( + self.port.text().strip() + or "0" + ) + except ValueError: + port = 0 + + proxy = ProxyConfig( + enabled=self.enabled.isChecked(), + proxy_type=self.type_box.text().strip() or "http", + host=self.host.text().strip(), + port=port, + username=self.username.text(), + password=self.password.text(), + ) + + self.network.proxy.save_proxy( + self.profile_name, + proxy, + ) + + self.accept() + + +class NetworkInspector(QMainWindow): + def __init__( + self, + network, + parent=None, + ): + super().__init__(parent) + + self.network = network + + self.setWindowTitle( + "Network Inspector" + ) + + self.resize(1400, 850) + + self.model = NetworkTableModel() + + self.table = QTableView() + self.table.setModel(self.model) + + self.table.horizontalHeader().setStretchLastSection( + True + ) + + self.search = QLineEdit() + self.search.setPlaceholderText( + "Search URL, host, method..." + ) + + self.search.textChanged.connect( + self.reload + ) + + self.details = QPlainTextEdit() + self.details.setReadOnly(True) + + splitter = QSplitter( + Qt.Orientation.Vertical + ) + + splitter.addWidget( + self.table + ) + + splitter.addWidget( + self.details + ) + + container = QWidget() + + layout = QVBoxLayout( + container + ) + + layout.addWidget( + self.search + ) + + layout.addWidget( + splitter + ) + + self.setCentralWidget( + container + ) + + self.table.clicked.connect( + self.show_details + ) + + self.reload() + + def reload(self): + query = ( + self.search.text().strip() + ) + + rows = ( + self.network.search(query) + if query + else self.network.recent() + ) + + self.model.set_rows(rows) + + def show_details( + self, + index, + ): + try: + row = self.model.row( + index.row() + ) + + self.details.setPlainText( + json.dumps( + dict(row), + indent=2, + ensure_ascii=False, + ) + ) + + except Exception: + self.details.clear() + + +# v2 +class TrayManager(QObject): + def __init__( + self, + parent, + ): + super().__init__(parent) + + self.parent_window = parent + + self.tray = QSystemTrayIcon( + parent + ) + + icon = ( + QApplication.windowIcon() + ) + + if icon.isNull(): + icon = ( + self.parent_window.windowIcon() + ) + + self.tray.setIcon( + icon + ) + + self.menu = QMenu() + + self.open_action = QAction( + "Open", + parent, + ) + + self.hide_action = QAction( + "Hide", + parent, + ) + + self.quit_action = QAction( + "Quit", + parent, + ) + + self.menu.addAction( + self.open_action + ) + + self.menu.addAction( + self.hide_action + ) + + self.menu.addSeparator() + + self.menu.addAction( + self.quit_action + ) + + self.tray.setContextMenu( + self.menu + ) + + self.open_action.triggered.connect( + self.show_window + ) + + self.hide_action.triggered.connect( + parent.hide + ) + + self.quit_action.triggered.connect( + self.quit_application + ) + + self.tray.activated.connect( + self._activated + ) + + def _activated( + self, + reason, + ): + if reason == QSystemTrayIcon.ActivationReason.DoubleClick: + self.show_window() + + def show_window(self): + self.parent_window.showNormal() + self.parent_window.raise_() + self.parent_window.activateWindow() + + def quit_application(self): + self.tray.hide() + self.parent_window.tray_manager = None + self.parent_window.settings_data.setdefault( + "tray", + {} + )["close_to_tray"] = False + self.parent_window.close() \ No newline at end of file diff --git a/models.py b/models.py new file mode 100644 index 0000000..e41c3f8 --- /dev/null +++ b/models.py @@ -0,0 +1,768 @@ +# v7 + +from __future__ import annotations + +from typing import Any +from dataclasses import dataclass +from pathlib import Path + +from PySide6.QtCore import ( + QAbstractTableModel, + QModelIndex, + Qt, +) + + + +@dataclass(slots=True) +class ProfileContext: + name: str + + root: Path + + storage_path: Path + cache_path: Path + downloads_path: Path + + plugins_path: Path + themes_path: Path + extensions_path: Path + + profile_db: Path + permissions_db: Path + network_db: Path + +class NetworkTableModel( + QAbstractTableModel +): + HEADERS = ( + "Method", + "Status", + "Host", + "URL", + "Type", + "Duration", + "Timestamp", + ) + + def __init__( + self, + parent=None, + ): + super().__init__( + parent + ) + + self._rows: list[ + dict[str, Any] + ] = [] + + def set_rows( + self, + rows, + ) -> None: + self.beginResetModel() + + self._rows = [ + dict(row) + for row in rows + ] + + self.endResetModel() + + def append_row( + self, + row: dict, + ) -> None: + position = len( + self._rows + ) + + self.beginInsertRows( + QModelIndex(), + position, + position, + ) + + self._rows.append( + row + ) + + self.endInsertRows() + + def rowCount( + self, + parent=QModelIndex(), + ) -> int: + return 0 if parent.isValid() else len( + self._rows + ) + + def columnCount( + self, + parent=QModelIndex(), + ) -> int: + return 0 if parent.isValid() else len( + self.HEADERS + ) + + def headerData( + self, + section, + orientation, + role, + ): + if ( + role + != Qt.ItemDataRole.DisplayRole + ): + return None + + if ( + orientation + == Qt.Orientation.Horizontal + ): + if ( + 0 + <= section + < len( + self.HEADERS + ) + ): + return self.HEADERS[ + section + ] + + return str( + section + 1 + ) + + def data( + self, + index, + role, + ): + if ( + not index.isValid() + or role + != Qt.ItemDataRole.DisplayRole + ): + return None + + row = self._rows[ + index.row() + ] + + match index.column(): + case 0: + return row.get( + "method", + "", + ) + + case 1: + return row.get( + "status_code", + row.get( + "status", + "", + ), + ) + + case 2: + return row.get( + "host", + "", + ) + + case 3: + return row.get( + "url", + "", + ) + + case 4: + return row.get( + "resource_type", + "", + ) + + case 5: + duration = row.get( + "duration_ms" + ) + + if duration is None: + return "" + + try: + return ( + f"{float(duration):.2f}" + ) + except ( + TypeError, + ValueError, + ): + return "" + + case 6: + return row.get( + "timestamp", + "", + ) + + return None + + def row( + self, + position: int, + ) -> dict: + return self._rows[ + position + ] + + +class ProfileModel( + QAbstractTableModel +): + HEADERS = ( + "Profile", + ) + + def __init__( + self, + parent=None, + ): + super().__init__( + parent + ) + + self._profiles: list[ + str + ] = [] + + def set_profiles( + self, + profiles: list[str], + ): + self.beginResetModel() + + self._profiles = sorted( + profiles + ) + + self.endResetModel() + + def rowCount( + self, + parent=QModelIndex(), + ): + return 0 if parent.isValid() else len( + self._profiles + ) + + def columnCount( + self, + parent=QModelIndex(), + ): + return 0 if parent.isValid() else 1 + + def headerData( + self, + section, + orientation, + role, + ): + if ( + role + != Qt.ItemDataRole.DisplayRole + ): + return None + + if ( + orientation + == Qt.Orientation.Horizontal + ): + return self.HEADERS[ + section + ] + + return None + + def data( + self, + index, + role, + ): + if ( + not index.isValid() + or role + != Qt.ItemDataRole.DisplayRole + ): + return None + + return self._profiles[ + index.row() + ] + + +class AddonModel( + QAbstractTableModel +): + HEADERS = ( + "Name", + "Enabled", + "Type", + ) + + def __init__( + self, + parent=None, + ): + super().__init__( + parent + ) + + self._addons: list[ + dict + ] = [] + + def set_addons( + self, + addons: list[dict], + ): + self.beginResetModel() + + self._addons = list( + addons + ) + + self.endResetModel() + + def rowCount( + self, + parent=QModelIndex(), + ): + return 0 if parent.isValid() else len( + self._addons + ) + + def columnCount( + self, + parent=QModelIndex(), + ): + return 0 if parent.isValid() else 3 + + def headerData( + self, + section, + orientation, + role, + ): + if ( + role + != Qt.ItemDataRole.DisplayRole + ): + return None + + if ( + orientation + == Qt.Orientation.Horizontal + ): + return self.HEADERS[ + section + ] + + return None + + def data( + self, + index, + role, + ): + if ( + not index.isValid() + or role + != Qt.ItemDataRole.DisplayRole + ): + return None + + addon = self._addons[ + index.row() + ] + + match index.column(): + case 0: + return addon.get( + "name", + "", + ) + + case 1: + return ( + "Yes" + if addon.get( + "enabled", + True, + ) + else "No" + ) + + case 2: + return addon.get( + "type", + "", + ) + + return None + + +class PluginTableModel( + QAbstractTableModel +): + HEADERS = ( + "Enabled", + "Name", + "Version", + "Author", + "Status", + ) + + def __init__( + self, + parent=None, + ): + super().__init__( + parent + ) + + self._plugins = [] + + def set_plugins( + self, + plugins, + ): + self.beginResetModel() + + self._plugins = [ + dict(plugin) + for plugin in plugins + ] + + self.endResetModel() + + def rowCount( + self, + parent=QModelIndex(), + ): + return 0 if parent.isValid() else len( + self._plugins + ) + + def columnCount( + self, + parent=QModelIndex(), + ): + return 0 if parent.isValid() else 5 + + def headerData( + self, + section, + orientation, + role, + ): + if ( + role + != Qt.ItemDataRole.DisplayRole + ): + return None + + if ( + orientation + == Qt.Orientation.Horizontal + ): + return self.HEADERS[ + section + ] + + return None + + def data( + self, + index, + role, + ): + if ( + not index.isValid() + or role + != Qt.ItemDataRole.DisplayRole + ): + return None + + plugin = self._plugins[ + index.row() + ] + + match index.column(): + case 0: + return ( + "Yes" + if plugin.get( + "enabled", + True, + ) + else "No" + ) + + case 1: + return plugin.get( + "name", + "", + ) + + case 2: + return plugin.get( + "version", + "", + ) + + case 3: + return plugin.get( + "author", + "", + ) + + case 4: + return plugin.get( + "status", + "Loaded", + ) + + return None + + def row( + self, + position: int, + ) -> dict: + return self._plugins[ + position + ] + + +class PluginLogTableModel( + QAbstractTableModel +): + HEADERS = ( + "Time", + "Plugin", + "Level", + "Message", + ) + + def __init__( + self, + parent=None, + ): + super().__init__( + parent + ) + + self._rows = [] + + def set_rows( + self, + rows, + ): + self.beginResetModel() + + self._rows = [ + dict(row) + for row in rows + ] + + self.endResetModel() + + def rowCount( + self, + parent=QModelIndex(), + ): + return 0 if parent.isValid() else len( + self._rows + ) + + def columnCount( + self, + parent=QModelIndex(), + ): + return 0 if parent.isValid() else 4 + + def headerData( + self, + section, + orientation, + role, + ): + if ( + role + != Qt.ItemDataRole.DisplayRole + ): + return None + + if ( + orientation + == Qt.Orientation.Horizontal + ): + return self.HEADERS[ + section + ] + + return None + + def data( + self, + index, + role, + ): + if ( + not index.isValid() + or role + != Qt.ItemDataRole.DisplayRole + ): + return None + + row = self._rows[ + index.row() + ] + + match index.column(): + case 0: + return row.get( + "created_at", + "", + ) + + case 1: + return row.get( + "plugin_name", + "", + ) + + case 2: + return row.get( + "level", + "", + ) + + case 3: + return row.get( + "message", + "", + ) + + return None + + +class PluginCrashTableModel( + QAbstractTableModel +): + HEADERS = ( + "Time", + "Plugin", + "Error", + ) + + def __init__( + self, + parent=None, + ): + super().__init__( + parent + ) + + self._rows = [] + + def set_rows( + self, + rows, + ): + self.beginResetModel() + + self._rows = [ + dict(row) + for row in rows + ] + + self.endResetModel() + + def rowCount( + self, + parent=QModelIndex(), + ): + return 0 if parent.isValid() else len( + self._rows + ) + + def columnCount( + self, + parent=QModelIndex(), + ): + return 0 if parent.isValid() else 3 + + def headerData( + self, + section, + orientation, + role, + ): + if ( + role + != Qt.ItemDataRole.DisplayRole + ): + return None + + if ( + orientation + == Qt.Orientation.Horizontal + ): + return self.HEADERS[ + section + ] + + return None + + def data( + self, + index, + role, + ): + if ( + not index.isValid() + or role + != Qt.ItemDataRole.DisplayRole + ): + return None + + row = self._rows[ + index.row() + ] + + match index.column(): + case 0: + return row.get( + "created_at", + "", + ) + + case 1: + return row.get( + "plugin_name", + "", + ) + + case 2: + return row.get( + "error", + "", + ) + + return None diff --git a/network.py b/network.py new file mode 100644 index 0000000..469ffa2 --- /dev/null +++ b/network.py @@ -0,0 +1,851 @@ +from __future__ import annotations + +import json +import logging +import socket +from dataclasses import dataclass +from pathlib import Path +from typing import Any +from urllib.parse import urlparse + + +from dataclasses import dataclass + +from PySide6.QtCore import QObject, Signal + +from config import DISCORD_TELEMETRY_BLOCKS + +from blocklists import ( + BlocklistEngine, +) +log = logging.getLogger(__name__) + +REDACT_HEADERS = { + "authorization", + "cookie", + "set-cookie", + "x-super-properties", + "x-fingerprint", + "token", + "access_token", + "refresh_token", +} + +DISCORD_GATEWAY_HOSTS = { + "gateway.discord.gg", + "remote-auth-gateway.discord.gg", +} + + +@dataclass(slots=True) +class ProxyConfig: + enabled: bool = False + proxy_type: str = "http" + host: str = "" + port: int = 0 + username: str = "" + password: str = "" + + @property + def valid(self) -> bool: + return ( + self.enabled + and bool(self.host) + and self.port > 0 + ) + + def to_dict( + self, + ) -> dict[str, Any]: + return { + "enabled": self.enabled, + "proxy_type": self.proxy_type, + "host": self.host, + "port": self.port, + "username": self.username, + "password": self.password, + } + + @classmethod + def from_dict( + cls, + data: dict[str, Any] | None, + ) -> "ProxyConfig": + data = data or {} + + return cls( + enabled=bool( + data.get( + "enabled", + False, + ) + ), + proxy_type=str( + data.get( + "proxy_type", + "http", + ) + ), + host=str( + data.get( + "host", + "", + ) + ), + port=int( + data.get( + "port", + 0, + ) + ), + username=str( + data.get( + "username", + "", + ) + ), + password=str( + data.get( + "password", + "", + ) + ), + ) + + +class ProxyManager(QObject): + proxyChanged = Signal(dict) + + def __init__( + self, + db, + ): + super().__init__() + self.db = db + + def get_proxy( + self, + profile_name: str, + ) -> ProxyConfig: + return ProxyConfig.from_dict( + self.db.get_setting( + f"proxy:{profile_name}", + {}, + ) + ) + + def save_proxy( + self, + profile_name: str, + proxy: ProxyConfig, + ) -> None: + self.db.set_setting( + f"proxy:{profile_name}", + proxy.to_dict(), + ) + + self.proxyChanged.emit( + proxy.to_dict() + ) + + def clear_proxy( + self, + profile_name: str, + ) -> None: + self.save_proxy( + profile_name, + ProxyConfig(), + ) + + def test_proxy( + self, + proxy: ProxyConfig, + timeout: float = 5.0, + ) -> bool: + if not proxy.valid: + return False + + try: + with socket.create_connection( + ( + proxy.host, + proxy.port, + ), + timeout=timeout, + ): + return True + except Exception: + return False + + +# v2 +class DomainBlocker: + + DEFAULT_BLOCKS = set( + DISCORD_TELEMETRY_BLOCKS + ) + + def __init__( + self, + db, + blocklist_engine=None, + ): + self.db = db + self.blocklist = ( + blocklist_engine + ) + + def domains( + self, + ) -> set[str]: + domains = set( + self.DEFAULT_BLOCKS + ) + + try: + domains.update( + self.db.blocked_domains() + ) + except Exception: + log.exception( + "blocked domain load failed" + ) + + return { + str(domain) + .lower() + .strip() + for domain in domains + if domain + } + + def add( + self, + domain: str, + ) -> None: + domain = ( + domain + .lower() + .strip() + ) + + if domain: + self.db.add_blocked_domain( + domain + ) + + def remove( + self, + domain: str, + ) -> None: + try: + with self.db.connection() as conn: + conn.execute( + """ + DELETE FROM blocked_domains + WHERE domain=? + """, + ( + domain.lower() + .strip(), + ), + ) + + conn.commit() + + except Exception: + log.exception( + "domain removal failed" + ) + + # v2 + def is_blocked( + self, + host: str, + ) -> bool: + + host = ( + host.lower() + .strip() + ) + + if ( + self.blocklist + and self.blocklist.contains( + host + ) + ): + return True + + for domain in self.domains(): + + if ( + host == domain + or host.endswith( + f".{domain}" + ) + ): + return True + + return False + +class HeaderRules: + def __init__( + self, + db, + ): + self.db = db + + def all( + self, + ) -> list[dict]: + return self.db.get_setting( + "header_rules", + [], + ) + + def save( + self, + rules: list[dict], + ) -> None: + self.db.set_setting( + "header_rules", + rules, + ) + + @staticmethod + def redact( + headers: dict, + ) -> dict: + result = {} + + for key, value in ( + headers or {} + ).items(): + if ( + str(key) + .lower() + .strip() + in REDACT_HEADERS + ): + result[key] = ( + "[REDACTED]" + ) + else: + result[key] = value + + return result + + def apply( + self, + headers: dict, + ) -> dict: + result = dict( + headers or {} + ) + + for rule in self.all(): + action = str( + rule.get( + "action", + "", + ) + ).lower() + + name = str( + rule.get( + "name", + "", + ) + ) + + value = rule.get( + "value", + "", + ) + + if not name: + continue + + if action in { + "add", + "replace", + }: + result[name] = value + + elif action == "remove": + result.pop( + name, + None, + ) + + return result +# v5 + + +# v3 +class NetworkRules: + + def __init__( + self, + settings: dict, + ): + self.settings = settings + + def load( + self, + profile_name: str | None = None, + plugin_name: str | None = None, + ) -> list[dict]: + + try: + rules = self.settings.get( + "network_rules", + [], + ) + + if not isinstance( + rules, + list, + ): + return [] + + except Exception: + log.exception( + "network rule load failed" + ) + return [] + + output: list[dict] = [] + + for rule in rules: + + if not isinstance( + rule, + dict, + ): + continue + + if ( + profile_name + and rule.get( + "profile" + ) + not in { + None, + profile_name, + } + ): + continue + + if ( + plugin_name + and rule.get( + "plugin" + ) + not in { + None, + plugin_name, + } + ): + continue + + output.append( + rule + ) + + return output + + def evaluate( + self, + host: str, + profile_name: str | None = None, + plugin_name: str | None = None, + ) -> tuple[bool, str | None]: + + host = ( + host.lower() + .strip() + ) + + rules = sorted( + self.load( + profile_name, + plugin_name, + ), + key=lambda r: ( + 0 + if r.get( + "plugin" + ) + else ( + 1 + if r.get( + "profile" + ) + else 2 + ) + ), + ) + + for rule in rules: + + pattern = ( + str( + rule.get( + "pattern", + "", + ) + ) + .lower() + .strip() + ) + + if not pattern: + continue + + matched = ( + host == pattern + or host.endswith( + f".{pattern}" + ) + ) + + if not matched: + continue + + action = ( + str( + rule.get( + "action", + "", + ) + ) + .lower() + .strip() + ) + + if action in { + "block", + "deny", + }: + return ( + False, + None, + ) + + if action == "allow": + return ( + True, + None, + ) + + if action == "redirect": + target = ( + str( + rule.get( + "target", + "", + ) + ) + .strip() + ) + + return ( + True, + target or None, + ) + + return ( + True, + None, + ) + +class RequestReplay: + def __init__( + self, + db, + ): + self.db = db + + def get_request( + self, + request_id: int, + ): + with self.db.connection() as conn: + return conn.execute( + """ + SELECT * + FROM network_requests + WHERE id=? + """, + ( + request_id, + ), + ).fetchone() + + +class HARExporter: + def __init__( + self, + db, + ): + self.db = db + + def export( + self, + output_file: str | Path, + limit: int = 5000, + ) -> Path: + output_file = Path( + output_file + ) + + entries = [] + + for row in self.db.recent_requests( + limit + ): + entries.append( + { + "startedDateTime": row[ + "timestamp" + ], + "request": { + "method": row[ + "method" + ], + "url": row[ + "url" + ], + }, + "response": { + "status": row[ + "status_code" + ], + }, + } + ) + + output_file.write_text( + json.dumps( + { + "log": { + "version": "1.2", + "creator": { + "name": "Discord Client", + "version": "1.0", + }, + "entries": entries, + } + }, + indent=2, + ensure_ascii=False, + ), + encoding="utf-8", + ) + + return output_file + + +# v4# v5 +class NetworkBackend(QObject): + requestAdded = Signal(dict) + + def __init__( + self, + db, + settings: dict, + ): + super().__init__() + + self.db = db + self.settings = settings + + mode = self.db.get_setting( + "blocklist_mode", + "pro", + ) + + blocklist_dir = Path( + "config/blocklists" + ) + + blocklists = sorted( + path + for path in blocklist_dir.glob( + "*.txt" + ) + if path.is_file() + ) + + if mode == "pro": + blocklists = [ + path + for path in blocklists + if "ultimate" + not in path.name.lower() + ] + + cache_file = ( + blocklist_dir + / f"{mode}.cache.bin" + ) + + self.blocklist_engine = ( + BlocklistEngine( + blocklists=blocklists, + cache_file=cache_file, + ) + ) + + try: + self.blocklist_engine.load() + except Exception: + log.exception( + "blocklist load failed" + ) + + self.blocker = DomainBlocker( + db, + self.blocklist_engine, + ) + + self.headers = HeaderRules( + db + ) + + self.rules = NetworkRules( + settings + ) + + self.proxy = ProxyManager( + db + ) + + self.har = HARExporter( + db + ) + + self.replay = RequestReplay( + db + ) + + def recent( + self, + limit: int = 1000, + ): + return self.db.recent_requests( + limit + ) + + def search( + self, + query: str, + limit: int = 1000, + ): + return self.db.search_requests( + query, + limit, + ) + + @staticmethod + def host_from_url( + url: str, + ) -> str: + try: + return ( + urlparse(url) + .hostname + or "" + ).lower() + except Exception: + return "" + + def process_request( + self, + request: dict, + ) -> tuple[dict, bool]: + + url = str( + request.get( + "url", + "", + ) + ) + + host = self.host_from_url( + url + ) + + request["host"] = host + + if host in DISCORD_GATEWAY_HOSTS: + + self.requestAdded.emit( + request + ) + + return request, False + + if ( + host + and self.blocker.is_blocked( + host + ) + ): + + request["_blocked"] = True + + self.requestAdded.emit( + request + ) + + return request, True + + allowed, redirect_url = ( + self.rules.evaluate( + host=host, + profile_name=request.get( + "profile" + ), + plugin_name=request.get( + "plugin" + ), + ) + ) + + if not allowed: + + request["_blocked"] = True + + self.requestAdded.emit( + request + ) + + return request, True + + if redirect_url: + + request[ + "redirect_url" + ] = redirect_url + + headers = self.headers.apply( + request.get( + "headers", + {}, + ) + ) + + request["headers"] = ( + self.headers.redact( + headers + ) + ) + + self.requestAdded.emit( + request + ) + + return request, False \ No newline at end of file diff --git a/theme.py b/theme.py new file mode 100644 index 0000000..d64096d --- /dev/null +++ b/theme.py @@ -0,0 +1,102 @@ +# v1 +from PySide6.QtGui import QColor +from PySide6.QtGui import QPalette +from PySide6.QtWidgets import QApplication + + +def apply_dark_palette( + app: QApplication, +) -> None: + + palette = QPalette() + + palette.setColor( + QPalette.ColorRole.Window, + QColor(24, 24, 27), + ) + + palette.setColor( + QPalette.ColorRole.WindowText, + QColor(250, 250, 250), + ) + + palette.setColor( + QPalette.ColorRole.Base, + QColor(17, 17, 20), + ) + + palette.setColor( + QPalette.ColorRole.AlternateBase, + QColor(30, 30, 34), + ) + + palette.setColor( + QPalette.ColorRole.Text, + QColor(250, 250, 250), + ) + + palette.setColor( + QPalette.ColorRole.Button, + QColor(39, 39, 42), + ) + + palette.setColor( + QPalette.ColorRole.ButtonText, + QColor(250, 250, 250), + ) + + palette.setColor( + QPalette.ColorRole.Highlight, + QColor(59, 130, 246), + ) + + palette.setColor( + QPalette.ColorRole.HighlightedText, + QColor(255, 255, 255), + ) + + palette.setColor( + QPalette.ColorRole.ToolTipBase, + QColor(24, 24, 27), + ) + + palette.setColor( + QPalette.ColorRole.ToolTipText, + QColor(250, 250, 250), + ) + + app.setPalette( + palette + ) + + +def apply_stylesheet( +) -> str: + + return """ +QPushButton { + border:none; + border-radius:6px; + padding:6px 12px; +} + +QLineEdit, +QTextEdit, +QPlainTextEdit, +QComboBox, +QListWidget, +QTreeWidget, +QTableView { + border:1px solid #3f3f46; + border-radius:6px; + padding:4px; +} + +QTabWidget::pane { + border:none; +} + +QToolTip { + border:1px solid #3f3f46; +} +""" \ No newline at end of file