Files
devdisc/network.py
2026-06-08 06:17:48 +00:00

851 lines
17 KiB
Python

from __future__ import annotations
import json
import logging
import socket
from dataclasses import dataclass
from pathlib import Path
from typing import Any
from urllib.parse import urlparse
from dataclasses import dataclass
from PySide6.QtCore import QObject, Signal
from config import DISCORD_TELEMETRY_BLOCKS
from blocklists import (
BlocklistEngine,
)
log = logging.getLogger(__name__)
REDACT_HEADERS = {
"authorization",
"cookie",
"set-cookie",
"x-super-properties",
"x-fingerprint",
"token",
"access_token",
"refresh_token",
}
DISCORD_GATEWAY_HOSTS = {
"gateway.discord.gg",
"remote-auth-gateway.discord.gg",
}
@dataclass(slots=True)
class ProxyConfig:
enabled: bool = False
proxy_type: str = "http"
host: str = ""
port: int = 0
username: str = ""
password: str = ""
@property
def valid(self) -> bool:
return (
self.enabled
and bool(self.host)
and self.port > 0
)
def to_dict(
self,
) -> dict[str, Any]:
return {
"enabled": self.enabled,
"proxy_type": self.proxy_type,
"host": self.host,
"port": self.port,
"username": self.username,
"password": self.password,
}
@classmethod
def from_dict(
cls,
data: dict[str, Any] | None,
) -> "ProxyConfig":
data = data or {}
return cls(
enabled=bool(
data.get(
"enabled",
False,
)
),
proxy_type=str(
data.get(
"proxy_type",
"http",
)
),
host=str(
data.get(
"host",
"",
)
),
port=int(
data.get(
"port",
0,
)
),
username=str(
data.get(
"username",
"",
)
),
password=str(
data.get(
"password",
"",
)
),
)
class ProxyManager(QObject):
proxyChanged = Signal(dict)
def __init__(
self,
db,
):
super().__init__()
self.db = db
def get_proxy(
self,
profile_name: str,
) -> ProxyConfig:
return ProxyConfig.from_dict(
self.db.get_setting(
f"proxy:{profile_name}",
{},
)
)
def save_proxy(
self,
profile_name: str,
proxy: ProxyConfig,
) -> None:
self.db.set_setting(
f"proxy:{profile_name}",
proxy.to_dict(),
)
self.proxyChanged.emit(
proxy.to_dict()
)
def clear_proxy(
self,
profile_name: str,
) -> None:
self.save_proxy(
profile_name,
ProxyConfig(),
)
def test_proxy(
self,
proxy: ProxyConfig,
timeout: float = 5.0,
) -> bool:
if not proxy.valid:
return False
try:
with socket.create_connection(
(
proxy.host,
proxy.port,
),
timeout=timeout,
):
return True
except Exception:
return False
# v2
class DomainBlocker:
DEFAULT_BLOCKS = set(
DISCORD_TELEMETRY_BLOCKS
)
def __init__(
self,
db,
blocklist_engine=None,
):
self.db = db
self.blocklist = (
blocklist_engine
)
def domains(
self,
) -> set[str]:
domains = set(
self.DEFAULT_BLOCKS
)
try:
domains.update(
self.db.blocked_domains()
)
except Exception:
log.exception(
"blocked domain load failed"
)
return {
str(domain)
.lower()
.strip()
for domain in domains
if domain
}
def add(
self,
domain: str,
) -> None:
domain = (
domain
.lower()
.strip()
)
if domain:
self.db.add_blocked_domain(
domain
)
def remove(
self,
domain: str,
) -> None:
try:
with self.db.connection() as conn:
conn.execute(
"""
DELETE FROM blocked_domains
WHERE domain=?
""",
(
domain.lower()
.strip(),
),
)
conn.commit()
except Exception:
log.exception(
"domain removal failed"
)
# v2
def is_blocked(
self,
host: str,
) -> bool:
host = (
host.lower()
.strip()
)
if (
self.blocklist
and self.blocklist.contains(
host
)
):
return True
for domain in self.domains():
if (
host == domain
or host.endswith(
f".{domain}"
)
):
return True
return False
class HeaderRules:
def __init__(
self,
db,
):
self.db = db
def all(
self,
) -> list[dict]:
return self.db.get_setting(
"header_rules",
[],
)
def save(
self,
rules: list[dict],
) -> None:
self.db.set_setting(
"header_rules",
rules,
)
@staticmethod
def redact(
headers: dict,
) -> dict:
result = {}
for key, value in (
headers or {}
).items():
if (
str(key)
.lower()
.strip()
in REDACT_HEADERS
):
result[key] = (
"[REDACTED]"
)
else:
result[key] = value
return result
def apply(
self,
headers: dict,
) -> dict:
result = dict(
headers or {}
)
for rule in self.all():
action = str(
rule.get(
"action",
"",
)
).lower()
name = str(
rule.get(
"name",
"",
)
)
value = rule.get(
"value",
"",
)
if not name:
continue
if action in {
"add",
"replace",
}:
result[name] = value
elif action == "remove":
result.pop(
name,
None,
)
return result
# v5
# v3
class NetworkRules:
def __init__(
self,
settings: dict,
):
self.settings = settings
def load(
self,
profile_name: str | None = None,
plugin_name: str | None = None,
) -> list[dict]:
try:
rules = self.settings.get(
"network_rules",
[],
)
if not isinstance(
rules,
list,
):
return []
except Exception:
log.exception(
"network rule load failed"
)
return []
output: list[dict] = []
for rule in rules:
if not isinstance(
rule,
dict,
):
continue
if (
profile_name
and rule.get(
"profile"
)
not in {
None,
profile_name,
}
):
continue
if (
plugin_name
and rule.get(
"plugin"
)
not in {
None,
plugin_name,
}
):
continue
output.append(
rule
)
return output
def evaluate(
self,
host: str,
profile_name: str | None = None,
plugin_name: str | None = None,
) -> tuple[bool, str | None]:
host = (
host.lower()
.strip()
)
rules = sorted(
self.load(
profile_name,
plugin_name,
),
key=lambda r: (
0
if r.get(
"plugin"
)
else (
1
if r.get(
"profile"
)
else 2
)
),
)
for rule in rules:
pattern = (
str(
rule.get(
"pattern",
"",
)
)
.lower()
.strip()
)
if not pattern:
continue
matched = (
host == pattern
or host.endswith(
f".{pattern}"
)
)
if not matched:
continue
action = (
str(
rule.get(
"action",
"",
)
)
.lower()
.strip()
)
if action in {
"block",
"deny",
}:
return (
False,
None,
)
if action == "allow":
return (
True,
None,
)
if action == "redirect":
target = (
str(
rule.get(
"target",
"",
)
)
.strip()
)
return (
True,
target or None,
)
return (
True,
None,
)
class RequestReplay:
def __init__(
self,
db,
):
self.db = db
def get_request(
self,
request_id: int,
):
with self.db.connection() as conn:
return conn.execute(
"""
SELECT *
FROM network_requests
WHERE id=?
""",
(
request_id,
),
).fetchone()
class HARExporter:
def __init__(
self,
db,
):
self.db = db
def export(
self,
output_file: str | Path,
limit: int = 5000,
) -> Path:
output_file = Path(
output_file
)
entries = []
for row in self.db.recent_requests(
limit
):
entries.append(
{
"startedDateTime": row[
"timestamp"
],
"request": {
"method": row[
"method"
],
"url": row[
"url"
],
},
"response": {
"status": row[
"status_code"
],
},
}
)
output_file.write_text(
json.dumps(
{
"log": {
"version": "1.2",
"creator": {
"name": "Discord Client",
"version": "1.0",
},
"entries": entries,
}
},
indent=2,
ensure_ascii=False,
),
encoding="utf-8",
)
return output_file
# v4# v5
class NetworkBackend(QObject):
requestAdded = Signal(dict)
def __init__(
self,
db,
settings: dict,
):
super().__init__()
self.db = db
self.settings = settings
mode = self.db.get_setting(
"blocklist_mode",
"pro",
)
blocklist_dir = Path(
"config/blocklists"
)
blocklists = sorted(
path
for path in blocklist_dir.glob(
"*.txt"
)
if path.is_file()
)
if mode == "pro":
blocklists = [
path
for path in blocklists
if "ultimate"
not in path.name.lower()
]
cache_file = (
blocklist_dir
/ f"{mode}.cache.bin"
)
self.blocklist_engine = (
BlocklistEngine(
blocklists=blocklists,
cache_file=cache_file,
)
)
try:
self.blocklist_engine.load()
except Exception:
log.exception(
"blocklist load failed"
)
self.blocker = DomainBlocker(
db,
self.blocklist_engine,
)
self.headers = HeaderRules(
db
)
self.rules = NetworkRules(
settings
)
self.proxy = ProxyManager(
db
)
self.har = HARExporter(
db
)
self.replay = RequestReplay(
db
)
def recent(
self,
limit: int = 1000,
):
return self.db.recent_requests(
limit
)
def search(
self,
query: str,
limit: int = 1000,
):
return self.db.search_requests(
query,
limit,
)
@staticmethod
def host_from_url(
url: str,
) -> str:
try:
return (
urlparse(url)
.hostname
or ""
).lower()
except Exception:
return ""
def process_request(
self,
request: dict,
) -> tuple[dict, bool]:
url = str(
request.get(
"url",
"",
)
)
host = self.host_from_url(
url
)
request["host"] = host
if host in DISCORD_GATEWAY_HOSTS:
self.requestAdded.emit(
request
)
return request, False
if (
host
and self.blocker.is_blocked(
host
)
):
request["_blocked"] = True
self.requestAdded.emit(
request
)
return request, True
allowed, redirect_url = (
self.rules.evaluate(
host=host,
profile_name=request.get(
"profile"
),
plugin_name=request.get(
"plugin"
),
)
)
if not allowed:
request["_blocked"] = True
self.requestAdded.emit(
request
)
return request, True
if redirect_url:
request[
"redirect_url"
] = redirect_url
headers = self.headers.apply(
request.get(
"headers",
{},
)
)
request["headers"] = (
self.headers.redact(
headers
)
)
self.requestAdded.emit(
request
)
return request, False