from __future__ import annotations import json import logging from pathlib import Path from typing import Any from PySide6.QtCore import QObject, QUrl, Signal, Slot from PySide6.QtGui import QDesktopServices from PySide6.QtWebChannel import QWebChannel from PySide6.QtWebEngineCore import ( QWebEngineDownloadRequest, QWebEnginePage, QWebEngineProfile, QWebEngineSettings, QWebEngineUrlRequestInterceptor, ) from PySide6.QtWebEngineWidgets import QWebEngineView from PySide6.QtWidgets import QMessageBox from addons import AddonManager from cdp import CDPClient from config import ( DEFAULT_SETTINGS, FEATURE_PERMISSION_MAP, START_URL, ) from models import ProfileContext from network import NetworkBackend log = logging.getLogger(__name__) CACHE_SIZE_BYTES = 512 * 1024 * 1024 BLOCKED_PROTOCOLS = { "file", "shell", "cmd", "powershell", "ms-settings", } PROMPT_PROTOCOLS = { "mailto", "discord", "spotify", "steam", } BLOCKED_EXTENSIONS = { ".bat", ".cmd", ".ps1", ".vbs", ".js", ".jse", ".wsf", ".scr", ".hta", ".lnk", ".reg", ".iso", ".img", ".cpl", ".msc", ".jar", ".msix", ".msixbundle", ".appinstaller", } WARN_EXTENSIONS = { ".exe", ".dll", ".msi", ".com", } class Bridge(QObject): messageReceived = Signal(str) pluginLogReceived = Signal(str) pluginErrorReceived = Signal(str) def __init__( self, db, context: ProfileContext, ): super().__init__() self.db = db self.context = context @Slot(str) def log( self, message: str, ) -> None: self.messageReceived.emit( message ) @Slot(str) def plugin_log( self, message: str, ) -> None: self.pluginLogReceived.emit( message ) @Slot(str) def plugin_error( self, message: str, ) -> None: self.pluginErrorReceived.emit( message ) @Slot(result=str) def version( self, ) -> str: return "1.0.0" @Slot(result=str) def platform( self, ) -> str: return "PySide6" @Slot(result=str) def plugin_runtime( self, ) -> str: return "bdapi-lite" @Slot(str, str, result=str) def pluginLoad( self, plugin: str, key: str, ) -> str: value = ( self.db.plugin_setting_get( self.context.name, plugin, key, None, ) ) return json.dumps( value, ensure_ascii=False, ) @Slot(str, str, str) def pluginSave( self, plugin: str, key: str, value: str, ) -> None: try: value = json.loads( value ) except Exception: pass self.db.plugin_setting_set( self.context.name, plugin, key, value, ) @Slot(str, str, str) def pluginLog( self, plugin: str, level: str, message: str, ) -> None: self.db.plugin_log( self.context.name, plugin, level, message, ) @Slot(str, str) def pluginCrash( self, plugin: str, error: str, ) -> None: self.db.plugin_crash( self.context.name, plugin, error, ) # v3 class RequestInterceptor(QWebEngineUrlRequestInterceptor): def __init__( self, network: NetworkBackend, ): super().__init__() self.network = network def interceptRequest( self, info, ) -> None: host = ( info.requestUrl() .host() .lower() ) request = { "url": info.requestUrl().toString(), "host": host, "headers": {}, } try: _, blocked = ( self.network.process_request( request ) ) if blocked: info.block(True) except Exception: log.exception( "request interception failed" ) class BrowserPage(QWebEnginePage): consoleMessage = Signal(str) def __init__( self, profile: QWebEngineProfile, permissions: dict[str, Any], parent=None, ): super().__init__( profile, parent, ) self.permissions = permissions or {} self.featurePermissionRequested.connect( self._handle_permission_request ) def javaScriptConsoleMessage( self, level, message: str, line: int, source: str, ) -> None: self.consoleMessage.emit( f"{source}:{line} {message}" ) def acceptNavigationRequest( self, url, nav_type, is_main_frame, ) -> bool: scheme = ( url.scheme() .lower() .strip() ) if scheme in { "javascript", "data", }: return False if scheme in BLOCKED_PROTOCOLS: return False if scheme in PROMPT_PROTOCOLS: result = QMessageBox.question( self.view(), "External Link", ( "Open external application?\n\n" f"{url.toString()}" ), ) if ( result == QMessageBox.StandardButton.Yes ): QDesktopServices.openUrl( url ) return False return super().acceptNavigationRequest( url, nav_type, is_main_frame, ) def _handle_permission_request( self, origin, feature, ) -> None: feature_name = str( feature ) allowed = False for ( key, permission, ) in FEATURE_PERMISSION_MAP.items(): if key in feature_name: allowed = bool( self.permissions.get( permission, False, ) ) break self.setFeaturePermission( origin, feature, ( QWebEnginePage.PermissionPolicy.PermissionGrantedByUser if allowed else QWebEnginePage.PermissionPolicy.PermissionDeniedByUser ), ) class BrowserView(QWebEngineView): downloadProgress = Signal( str, int, int, ) pluginDiagnostics = Signal( dict ) def __init__( self, db, context: ProfileContext, settings: dict[str, Any] | None, parent=None, ): super().__init__(parent) self.db = db self.context = context self._settings_data = ( settings or dict( DEFAULT_SETTINGS ) ) self.db.ensure_profile( self.context.name ) self.network = ( NetworkBackend( self.db, self._settings_data, ) ) self.profile = ( self._create_profile() ) self.interceptor = ( RequestInterceptor( self.network ) ) self.profile.setUrlRequestInterceptor( self.interceptor ) self.page_ = BrowserPage( self.profile, self._settings_data.get( "permissions", {}, ), self, ) self.setPage( self.page_ ) self.bridge = None self.channel = None self.addons = None self.cdp = None self._devtools_page = None self._devtools_view = None self._configure_web_settings() self._initialize_bridge() self._initialize_downloads() self._load_addons() def _create_profile( self, ) -> QWebEngineProfile: storage_path = ( self.context.storage_path ) cache_path = ( self.context.cache_path ) profile = ( QWebEngineProfile( self.context.name, self, ) ) profile.setPersistentStoragePath( str(storage_path) ) profile.setCachePath( str(cache_path) ) profile.setHttpCacheMaximumSize( CACHE_SIZE_BYTES ) profile.setPersistentCookiesPolicy( QWebEngineProfile.PersistentCookiesPolicy.ForcePersistentCookies ) return profile def _configure_web_settings( self, ) -> None: settings = self.settings() for attribute in ( QWebEngineSettings.WebAttribute.JavascriptEnabled, QWebEngineSettings.WebAttribute.LocalStorageEnabled, QWebEngineSettings.WebAttribute.FullScreenSupportEnabled, QWebEngineSettings.WebAttribute.ScrollAnimatorEnabled, QWebEngineSettings.WebAttribute.WebGLEnabled, QWebEngineSettings.WebAttribute.Accelerated2dCanvasEnabled, ): settings.setAttribute( attribute, True, ) settings.setAttribute( QWebEngineSettings.WebAttribute.AllowRunningInsecureContent, False, ) def _initialize_bridge( self, ) -> None: self.bridge = Bridge( self.db, self.context, ) self.channel = ( QWebChannel() ) self.channel.registerObject( "bridge", self.bridge, ) self.page_.setWebChannel( self.channel ) def _initialize_downloads( self, ) -> None: self.profile.downloadRequested.connect( self._handle_download ) def _load_addons( self, ) -> None: self.addons = AddonManager( self.context.name, self.db, ) self.addons.install( self.profile ) def plugin_inventory( self, ) -> list[dict]: if not self.addons: return [] rows = [] for plugin in ( self.addons.plugins.discover() ): try: rows.append( self.addons.plugins.metadata( plugin ) ) except Exception: log.exception( "plugin metadata" ) return rows def install_plugin_file( self, source: str | Path, ) -> Path: installed = ( self.addons.plugins.install_file( Path(source) ) ) self.reload_addons() return installed def plugin_diagnostics( self, ) -> None: self.page().runJavaScript( """ (() => { if (!window.PluginRuntime) return {}; return { loaded:Object.keys( PluginRuntime.plugins || {} ), failed:Object.keys( PluginRuntime.failed || {} ), failures: PluginRuntime.failed || {} }; })(); """, self.pluginDiagnostics.emit, ) def initialize_cdp( self, websocket_url: str, ) -> None: if not self._settings_data.get( "enable_cdp", False, ): return if self.cdp: return try: self.cdp = CDPClient( db=self.db, websocket_url=websocket_url, capture_bodies=False, backend=self.network, ) self.cdp.start() except Exception: log.exception( "cdp startup failed" ) # v2 def _download_directory( self, ) -> Path: directory = ( self.context .downloads_path ) directory.mkdir( parents=True, exist_ok=True, ) return directory def _handle_download( self, download: QWebEngineDownloadRequest, ) -> None: filename = ( download.downloadFileName() ) extension = ( Path(filename) .suffix .lower() ) if extension in BLOCKED_EXTENSIONS: download.cancel() QMessageBox.warning( self, "Download Blocked", filename, ) return if extension in WARN_EXTENSIONS: result = QMessageBox.warning( self, "Executable Download", filename, QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.No, ) if ( result != QMessageBox.StandardButton.Yes ): download.cancel() return download.setDownloadDirectory( str( self._download_directory() ) ) download.accept() download.receivedBytesChanged.connect( lambda d=download: self._emit_download_progress( d ) ) def _emit_download_progress( self, download: QWebEngineDownloadRequest, ) -> None: self.downloadProgress.emit( download.downloadFileName(), download.receivedBytes(), download.totalBytes(), ) def reload_addons( self, ) -> None: self.profile.scripts().clear() self._load_addons() self.reload() def open( self, url: str = START_URL, ) -> None: self.load( QUrl(url) ) def open_devtools( self, ) -> None: if self._devtools_page is None: self._devtools_page = ( QWebEnginePage( self.profile, self, ) ) self.page_.setDevToolsPage( self._devtools_page ) if self._devtools_view is None: self._devtools_view = ( QWebEngineView() ) self._devtools_view.setPage( self._devtools_page ) self._devtools_view.resize( 1400, 900, ) self._devtools_view.show() def switch_profile( self, profile_name: str, ) -> None: raise RuntimeError( "profile switching requires browser recreation" ) # v3 def install_plugin_url( self, url: str, ) -> Path: import tempfile from urllib.parse import ( urlparse, ) from urllib.request import ( urlopen, ) parsed = urlparse( url ) if ( parsed.scheme not in { "https" } ): raise ValueError( "https required" ) with urlopen( url, timeout=30, ) as response: content_length = ( response.headers.get( "Content-Length" ) ) if content_length: if ( int(content_length) > PLUGIN_MAX_SIZE ): raise ValueError( "plugin exceeds size limit" ) content_type = ( response.headers.get( "Content-Type", "", ) .split(";")[0] .strip() .lower() ) allowed_types = { "application/javascript", "text/javascript", "application/x-javascript", "text/plain", } if ( content_type and content_type not in allowed_types ): raise ValueError( f"unsupported content type: " f"{content_type}" ) data = bytearray() while True: chunk = response.read( 65536 ) if not chunk: break data.extend( chunk ) if ( len(data) > PLUGIN_MAX_SIZE ): raise ValueError( "plugin exceeds size limit" ) with tempfile.NamedTemporaryFile( suffix=".plugin.js", delete=False, ) as handle: handle.write(data) path = Path( handle.name ) try: installed = ( self.install_plugin_file( path ) ) return installed finally: path.unlink( missing_ok=True ) def remove_plugin( self, plugin_name: str, ) -> bool: removed = ( self.addons.uninstall_plugin( plugin_name ) ) if removed: self.reload_addons() return removed def set_plugin_enabled( self, plugin_name: str, enabled: bool, ) -> None: if enabled: self.addons.enable_plugin( plugin_name ) else: self.addons.disable_plugin( plugin_name ) self.reload_addons() # v2 def shutdown( self, ) -> None: if self.cdp: try: self.cdp.stop() except Exception: log.exception( "cdp stop failed" ) self.cdp = None try: self.setPage(None) except Exception: pass if self.page_: self.page_.deleteLater() self.page_ = None if self.channel: self.channel.deleteLater() self.channel = None if self.bridge: self.bridge.deleteLater() self.bridge = None if self.profile: self.profile.deleteLater() self.profile = None if ( self.profile and self.interceptor ): try: self.profile.setUrlRequestInterceptor( None ) except Exception: pass if self.interceptor: self.interceptor.deleteLater() self.interceptor = None if self.page_: try: self.page_.triggerAction( QWebEnginePage.WebAction.Stop ) except Exception: pass if self._devtools_page: self._devtools_page.deleteLater() self._devtools_page = None if self._devtools_view: self._devtools_view.close() self._devtools_view.deleteLater() self._devtools_view = None