from __future__ import annotations import json import logging import socket from dataclasses import dataclass from pathlib import Path from typing import Any from urllib.parse import urlparse from dataclasses import dataclass from PySide6.QtCore import QObject, Signal from config import DISCORD_TELEMETRY_BLOCKS from blocklists import ( BlocklistEngine, ) log = logging.getLogger(__name__) REDACT_HEADERS = { "authorization", "cookie", "set-cookie", "x-super-properties", "x-fingerprint", "token", "access_token", "refresh_token", } DISCORD_GATEWAY_HOSTS = { "gateway.discord.gg", "remote-auth-gateway.discord.gg", } @dataclass(slots=True) class ProxyConfig: enabled: bool = False proxy_type: str = "http" host: str = "" port: int = 0 username: str = "" password: str = "" @property def valid(self) -> bool: return ( self.enabled and bool(self.host) and self.port > 0 ) def to_dict( self, ) -> dict[str, Any]: return { "enabled": self.enabled, "proxy_type": self.proxy_type, "host": self.host, "port": self.port, "username": self.username, "password": self.password, } @classmethod def from_dict( cls, data: dict[str, Any] | None, ) -> "ProxyConfig": data = data or {} return cls( enabled=bool( data.get( "enabled", False, ) ), proxy_type=str( data.get( "proxy_type", "http", ) ), host=str( data.get( "host", "", ) ), port=int( data.get( "port", 0, ) ), username=str( data.get( "username", "", ) ), password=str( data.get( "password", "", ) ), ) class ProxyManager(QObject): proxyChanged = Signal(dict) def __init__( self, db, ): super().__init__() self.db = db def get_proxy( self, profile_name: str, ) -> ProxyConfig: return ProxyConfig.from_dict( self.db.get_setting( f"proxy:{profile_name}", {}, ) ) def save_proxy( self, profile_name: str, proxy: ProxyConfig, ) -> None: self.db.set_setting( f"proxy:{profile_name}", proxy.to_dict(), ) self.proxyChanged.emit( proxy.to_dict() ) def clear_proxy( self, profile_name: str, ) -> None: self.save_proxy( profile_name, ProxyConfig(), ) def test_proxy( self, proxy: ProxyConfig, timeout: float = 5.0, ) -> bool: if not proxy.valid: return False try: with socket.create_connection( ( proxy.host, proxy.port, ), timeout=timeout, ): return True except Exception: return False # v2 class DomainBlocker: DEFAULT_BLOCKS = set( DISCORD_TELEMETRY_BLOCKS ) def __init__( self, db, blocklist_engine=None, ): self.db = db self.blocklist = ( blocklist_engine ) def domains( self, ) -> set[str]: domains = set( self.DEFAULT_BLOCKS ) try: domains.update( self.db.blocked_domains() ) except Exception: log.exception( "blocked domain load failed" ) return { str(domain) .lower() .strip() for domain in domains if domain } def add( self, domain: str, ) -> None: domain = ( domain .lower() .strip() ) if domain: self.db.add_blocked_domain( domain ) def remove( self, domain: str, ) -> None: try: with self.db.connection() as conn: conn.execute( """ DELETE FROM blocked_domains WHERE domain=? """, ( domain.lower() .strip(), ), ) conn.commit() except Exception: log.exception( "domain removal failed" ) # v2 def is_blocked( self, host: str, ) -> bool: host = ( host.lower() .strip() ) if ( self.blocklist and self.blocklist.contains( host ) ): return True for domain in self.domains(): if ( host == domain or host.endswith( f".{domain}" ) ): return True return False class HeaderRules: def __init__( self, db, ): self.db = db def all( self, ) -> list[dict]: return self.db.get_setting( "header_rules", [], ) def save( self, rules: list[dict], ) -> None: self.db.set_setting( "header_rules", rules, ) @staticmethod def redact( headers: dict, ) -> dict: result = {} for key, value in ( headers or {} ).items(): if ( str(key) .lower() .strip() in REDACT_HEADERS ): result[key] = ( "[REDACTED]" ) else: result[key] = value return result def apply( self, headers: dict, ) -> dict: result = dict( headers or {} ) for rule in self.all(): action = str( rule.get( "action", "", ) ).lower() name = str( rule.get( "name", "", ) ) value = rule.get( "value", "", ) if not name: continue if action in { "add", "replace", }: result[name] = value elif action == "remove": result.pop( name, None, ) return result # v5 # v3 class NetworkRules: def __init__( self, settings: dict, ): self.settings = settings def load( self, profile_name: str | None = None, plugin_name: str | None = None, ) -> list[dict]: try: rules = self.settings.get( "network_rules", [], ) if not isinstance( rules, list, ): return [] except Exception: log.exception( "network rule load failed" ) return [] output: list[dict] = [] for rule in rules: if not isinstance( rule, dict, ): continue if ( profile_name and rule.get( "profile" ) not in { None, profile_name, } ): continue if ( plugin_name and rule.get( "plugin" ) not in { None, plugin_name, } ): continue output.append( rule ) return output def evaluate( self, host: str, profile_name: str | None = None, plugin_name: str | None = None, ) -> tuple[bool, str | None]: host = ( host.lower() .strip() ) rules = sorted( self.load( profile_name, plugin_name, ), key=lambda r: ( 0 if r.get( "plugin" ) else ( 1 if r.get( "profile" ) else 2 ) ), ) for rule in rules: pattern = ( str( rule.get( "pattern", "", ) ) .lower() .strip() ) if not pattern: continue matched = ( host == pattern or host.endswith( f".{pattern}" ) ) if not matched: continue action = ( str( rule.get( "action", "", ) ) .lower() .strip() ) if action in { "block", "deny", }: return ( False, None, ) if action == "allow": return ( True, None, ) if action == "redirect": target = ( str( rule.get( "target", "", ) ) .strip() ) return ( True, target or None, ) return ( True, None, ) class RequestReplay: def __init__( self, db, ): self.db = db def get_request( self, request_id: int, ): with self.db.connection() as conn: return conn.execute( """ SELECT * FROM network_requests WHERE id=? """, ( request_id, ), ).fetchone() class HARExporter: def __init__( self, db, ): self.db = db def export( self, output_file: str | Path, limit: int = 5000, ) -> Path: output_file = Path( output_file ) entries = [] for row in self.db.recent_requests( limit ): entries.append( { "startedDateTime": row[ "timestamp" ], "request": { "method": row[ "method" ], "url": row[ "url" ], }, "response": { "status": row[ "status_code" ], }, } ) output_file.write_text( json.dumps( { "log": { "version": "1.2", "creator": { "name": "Discord Client", "version": "1.0", }, "entries": entries, } }, indent=2, ensure_ascii=False, ), encoding="utf-8", ) return output_file # v4# v5 class NetworkBackend(QObject): requestAdded = Signal(dict) def __init__( self, db, settings: dict, ): super().__init__() self.db = db self.settings = settings mode = self.db.get_setting( "blocklist_mode", "pro", ) blocklist_dir = Path( "config/blocklists" ) blocklists = sorted( path for path in blocklist_dir.glob( "*.txt" ) if path.is_file() ) if mode == "pro": blocklists = [ path for path in blocklists if "ultimate" not in path.name.lower() ] cache_file = ( blocklist_dir / f"{mode}.cache.bin" ) self.blocklist_engine = ( BlocklistEngine( blocklists=blocklists, cache_file=cache_file, ) ) try: self.blocklist_engine.load() except Exception: log.exception( "blocklist load failed" ) self.blocker = DomainBlocker( db, self.blocklist_engine, ) self.headers = HeaderRules( db ) self.rules = NetworkRules( settings ) self.proxy = ProxyManager( db ) self.har = HARExporter( db ) self.replay = RequestReplay( db ) def recent( self, limit: int = 1000, ): return self.db.recent_requests( limit ) def search( self, query: str, limit: int = 1000, ): return self.db.search_requests( query, limit, ) @staticmethod def host_from_url( url: str, ) -> str: try: return ( urlparse(url) .hostname or "" ).lower() except Exception: return "" def process_request( self, request: dict, ) -> tuple[dict, bool]: url = str( request.get( "url", "", ) ) host = self.host_from_url( url ) request["host"] = host if host in DISCORD_GATEWAY_HOSTS: self.requestAdded.emit( request ) return request, False if ( host and self.blocker.is_blocked( host ) ): request["_blocked"] = True self.requestAdded.emit( request ) return request, True allowed, redirect_url = ( self.rules.evaluate( host=host, profile_name=request.get( "profile" ), plugin_name=request.get( "plugin" ), ) ) if not allowed: request["_blocked"] = True self.requestAdded.emit( request ) return request, True if redirect_url: request[ "redirect_url" ] = redirect_url headers = self.headers.apply( request.get( "headers", {}, ) ) request["headers"] = ( self.headers.redact( headers ) ) self.requestAdded.emit( request ) return request, False