From af6e6f9bd5810b009d9cc61c87fc5aa7f64dc96a Mon Sep 17 00:00:00 2001 From: casper Date: Mon, 8 Jun 2026 06:18:44 +0000 Subject: [PATCH] v1 --- db.py | 965 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 965 insertions(+) create mode 100644 db.py diff --git a/db.py b/db.py new file mode 100644 index 0000000..317073e --- /dev/null +++ b/db.py @@ -0,0 +1,965 @@ +from __future__ import annotations +import json +import logging +import queue +import sqlite3 +import threading +import time +from contextlib import contextmanager +from pathlib import Path +from typing import Any + +from config import ( + NETWORK_BATCH_SIZE, + NETWORK_FLUSH_INTERVAL, + NETWORK_RETENTION, +) + +log = logging.getLogger(__name__) + + +class Database: + def __init__( + self, + db_path: str | Path, + ): + self.db_path = Path( + db_path + ) + + self.db_path.parent.mkdir( + parents=True, + exist_ok=True, + ) + + self._lock = threading.RLock() + + self._queue: queue.Queue = ( + queue.Queue( + maxsize=20000 + ) + ) + + self._running = True + + self._conn = sqlite3.connect( + self.db_path, + check_same_thread=False, + ) + + self._conn.row_factory = ( + sqlite3.Row + ) + + self._initialize() + + self._writer = threading.Thread( + target=self._writer_loop, + daemon=True, + name="DatabaseWriter", + ) + + self._writer.start() + + @contextmanager + def connection(self): + with self._lock: + yield self._conn + + def _initialize( + self, + ) -> None: + with self.connection() as conn: + conn.executescript( + """ + PRAGMA journal_mode=WAL; + PRAGMA synchronous=NORMAL; + PRAGMA foreign_keys=ON; + PRAGMA temp_store=MEMORY; + PRAGMA cache_size=-20000; + PRAGMA mmap_size=268435456; + + CREATE TABLE IF NOT EXISTS settings ( + key TEXT PRIMARY KEY, + value TEXT NOT NULL + ); + + CREATE TABLE IF NOT EXISTS profiles ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + name TEXT UNIQUE NOT NULL, + created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP + ); + + CREATE TABLE IF NOT EXISTS themes ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + profile_name TEXT NOT NULL, + name TEXT NOT NULL, + path TEXT NOT NULL, + enabled INTEGER NOT NULL DEFAULT 1, + UNIQUE(profile_name,name) + ); + + CREATE TABLE IF NOT EXISTS extensions ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + profile_name TEXT NOT NULL, + name TEXT NOT NULL, + path TEXT NOT NULL, + enabled INTEGER NOT NULL DEFAULT 1, + UNIQUE(profile_name,name) + ); + + CREATE TABLE IF NOT EXISTS proxies ( + profile_name TEXT PRIMARY KEY, + enabled INTEGER NOT NULL DEFAULT 0, + proxy_type TEXT, + host TEXT, + port INTEGER, + username TEXT, + password TEXT + ); + + CREATE TABLE IF NOT EXISTS blocked_domains ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + domain TEXT UNIQUE NOT NULL + ); + + CREATE TABLE IF NOT EXISTS network_requests ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + request_id TEXT, + timestamp TEXT, + method TEXT, + url TEXT, + host TEXT, + status_code INTEGER, + resource_type TEXT, + request_headers TEXT, + response_headers TEXT, + request_body TEXT, + response_body TEXT, + duration_ms REAL + ); + + CREATE TABLE IF NOT EXISTS plugin_registry ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + profile_name TEXT NOT NULL, + plugin_name TEXT NOT NULL, + version TEXT, + enabled INTEGER NOT NULL DEFAULT 1, + installed_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP, + UNIQUE(profile_name,plugin_name) + ); + + CREATE TABLE IF NOT EXISTS plugin_settings ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + profile_name TEXT NOT NULL, + plugin_name TEXT NOT NULL, + setting_key TEXT NOT NULL, + setting_value TEXT NOT NULL, + UNIQUE( + profile_name, + plugin_name, + setting_key + ) + ); + + CREATE TABLE IF NOT EXISTS plugin_logs ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + profile_name TEXT NOT NULL, + plugin_name TEXT NOT NULL, + level TEXT NOT NULL, + message TEXT NOT NULL, + created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP + ); + + CREATE TABLE IF NOT EXISTS plugin_crashes ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + profile_name TEXT NOT NULL, + plugin_name TEXT NOT NULL, + error TEXT NOT NULL, + created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP + ); + + CREATE INDEX IF NOT EXISTS idx_req_host + ON network_requests(host); + + CREATE INDEX IF NOT EXISTS idx_req_url + ON network_requests(url); + + CREATE INDEX IF NOT EXISTS idx_req_status + ON network_requests(status_code); + + CREATE INDEX IF NOT EXISTS idx_req_timestamp + ON network_requests(timestamp); + + CREATE INDEX IF NOT EXISTS idx_plugin_registry_profile + ON plugin_registry(profile_name); + + CREATE INDEX IF NOT EXISTS idx_plugin_settings_lookup + ON plugin_settings( + profile_name, + plugin_name, + setting_key + ); + + CREATE INDEX IF NOT EXISTS idx_plugin_logs_profile + ON plugin_logs(profile_name); + + CREATE INDEX IF NOT EXISTS idx_plugin_crashes_profile + ON plugin_crashes(profile_name); + """ + ) + + conn.commit() + + def shutdown( + self, + ) -> None: + self._running = False + + if ( + self._writer + and self._writer.is_alive() + ): + self._writer.join( + timeout=10 + ) + + with self._lock: + try: + self._conn.commit() + self._conn.close() + except Exception: + pass + + def _writer_loop( + self, + ) -> None: + batch = [] + + last_flush = ( + time.monotonic() + ) + + while ( + self._running + or not self._queue.empty() + ): + try: + batch.append( + self._queue.get( + timeout=0.25 + ) + ) + except queue.Empty: + pass + + now = time.monotonic() + + if ( + len(batch) + >= NETWORK_BATCH_SIZE + or ( + batch + and ( + now + - last_flush + ) + >= NETWORK_FLUSH_INTERVAL + ) + ): + self._flush(batch) + batch.clear() + last_flush = now + + if batch: + self._flush(batch) + + def _flush( + self, + batch: list, + ) -> None: + try: + with self.connection() as conn: + conn.executemany( + """ + INSERT INTO network_requests ( + request_id, + timestamp, + method, + url, + host, + status_code, + resource_type, + request_headers, + response_headers, + request_body, + response_body, + duration_ms + ) + VALUES ( + ?,?,?,?,?,?, + ?,?,?,?,?,? + ) + """, + batch, + ) + + conn.commit() + + self._cleanup() + + except Exception: + log.exception( + "db flush failed" + ) + + def _cleanup( + self, + ) -> None: + try: + with self.connection() as conn: + row = conn.execute( + """ + SELECT MAX(id) + FROM network_requests + """ + ).fetchone() + + max_id = ( + row[0] + if row + else None + ) + + if ( + max_id is None + or max_id + <= NETWORK_RETENTION + ): + return + + conn.execute( + """ + DELETE FROM network_requests + WHERE id < ? + """, + ( + max_id + - NETWORK_RETENTION, + ), + ) + + conn.commit() + + except Exception: + log.exception( + "cleanup failed" + ) + + def queue_request( + self, + *, + request_id: str, + timestamp: str, + method: str | None, + url: str | None, + host: str | None, + status_code: int | None, + resource_type: str | None, + request_headers: dict | None, + response_headers: dict | None, + request_body: str | None, + response_body: str | None, + duration_ms: float | None, + ) -> None: + try: + self._queue.put_nowait( + ( + request_id, + timestamp, + method, + url, + host, + status_code, + resource_type, + json.dumps( + request_headers + or {} + ), + json.dumps( + response_headers + or {} + ), + request_body, + response_body, + duration_ms, + ) + ) + + except queue.Full: + log.warning( + "network queue full" + ) + + def get_setting( + self, + key: str, + default: Any = None, + ) -> Any: + with self.connection() as conn: + row = conn.execute( + """ + SELECT value + FROM settings + WHERE key=? + """, + (key,), + ).fetchone() + + if not row: + return default + + try: + return json.loads( + row["value"] + ) + except Exception: + return default + + def set_setting( + self, + key: str, + value: Any, + ) -> None: + with self.connection() as conn: + conn.execute( + """ + INSERT INTO settings( + key, + value + ) + VALUES(?,?) + ON CONFLICT(key) + DO UPDATE SET + value=excluded.value + """, + ( + key, + json.dumps( + value, + ensure_ascii=False, + ), + ), + ) + + conn.commit() + + def ensure_profile( + self, + profile_name: str, + ) -> None: + with self.connection() as conn: + conn.execute( + """ + INSERT OR IGNORE + INTO profiles(name) + VALUES(?) + """, + ( + profile_name, + ), + ) + + conn.commit() + + def list_profiles( + self, + ) -> list[str]: + with self.connection() as conn: + rows = conn.execute( + """ + SELECT name + FROM profiles + ORDER BY name + """ + ).fetchall() + + return [ + row["name"] + for row in rows + ] + + def add_blocked_domain( + self, + domain: str, + ) -> None: + with self.connection() as conn: + conn.execute( + """ + INSERT OR IGNORE + INTO blocked_domains(domain) + VALUES(?) + """, + ( + domain.lower().strip(), + ), + ) + + conn.commit() + + def blocked_domains( + self, + ) -> list[str]: + with self.connection() as conn: + rows = conn.execute( + """ + SELECT domain + FROM blocked_domains + ORDER BY domain + """ + ).fetchall() + + return [ + row["domain"] + for row in rows + ] + + def recent_requests( + self, + limit: int = 1000, + ): + with self.connection() as conn: + return conn.execute( + """ + SELECT * + FROM network_requests + ORDER BY id DESC + LIMIT ? + """, + ( + limit, + ), + ).fetchall() + + def search_requests( + self, + query: str, + limit: int = 1000, + ): + like = f"%{query}%" + + with self.connection() as conn: + return conn.execute( + """ + SELECT * + FROM network_requests + WHERE + url LIKE ? + OR host LIKE ? + OR method LIKE ? + ORDER BY id DESC + LIMIT ? + """, + ( + like, + like, + like, + limit, + ), + ).fetchall() + + def plugin_enabled( + self, + profile_name: str, + plugin_name: str, + default: bool = True, + ) -> bool: + with self.connection() as conn: + row = conn.execute( + """ + SELECT enabled + FROM plugin_registry + WHERE + profile_name=? + AND plugin_name=? + """, + ( + profile_name, + plugin_name, + ), + ).fetchone() + + return ( + default + if not row + else bool( + row["enabled"] + ) + ) + + def set_plugin_enabled( + self, + profile_name: str, + plugin_name: str, + enabled: bool, + version: str = "", + ) -> None: + with self.connection() as conn: + conn.execute( + """ + INSERT INTO plugin_registry( + profile_name, + plugin_name, + version, + enabled + ) + VALUES(?,?,?,?) + ON CONFLICT( + profile_name, + plugin_name + ) + DO UPDATE SET + enabled=excluded.enabled, + version=excluded.version + """, + ( + profile_name, + plugin_name, + version, + int(enabled), + ), + ) + + conn.commit() + + def plugin_setting_get( + self, + profile_name: str, + plugin_name: str, + key: str, + default=None, + ): + with self.connection() as conn: + row = conn.execute( + """ + SELECT setting_value + FROM plugin_settings + WHERE + profile_name=? + AND plugin_name=? + AND setting_key=? + """, + ( + profile_name, + plugin_name, + key, + ), + ).fetchone() + + if not row: + return default + + try: + return json.loads( + row["setting_value"] + ) + except Exception: + return default + + def plugin_setting_set( + self, + profile_name: str, + plugin_name: str, + key: str, + value, + ) -> None: + with self.connection() as conn: + conn.execute( + """ + INSERT INTO plugin_settings( + profile_name, + plugin_name, + setting_key, + setting_value + ) + VALUES(?,?,?,?) + ON CONFLICT( + profile_name, + plugin_name, + setting_key + ) + DO UPDATE SET + setting_value=excluded.setting_value + """, + ( + profile_name, + plugin_name, + key, + json.dumps( + value, + ensure_ascii=False, + ), + ), + ) + + conn.commit() + + def plugin_log( + self, + profile_name: str, + plugin_name: str, + level: str, + message: str, + ) -> None: + with self.connection() as conn: + conn.execute( + """ + INSERT INTO plugin_logs( + profile_name, + plugin_name, + level, + message + ) + VALUES(?,?,?,?) + """, + ( + profile_name, + plugin_name, + level, + message, + ), + ) + + conn.commit() + + def plugin_crash( + self, + profile_name: str, + plugin_name: str, + error: str, + ) -> None: + with self.connection() as conn: + conn.execute( + """ + INSERT INTO plugin_crashes( + profile_name, + plugin_name, + error + ) + VALUES(?,?,?) + """, + ( + profile_name, + plugin_name, + error, + ), + ) + + conn.commit() + + def plugin_logs( + self, + profile_name: str, + plugin_name: str | None = None, + limit: int = 1000, + ): + with self.connection() as conn: + if plugin_name: + return conn.execute( + """ + SELECT * + FROM plugin_logs + WHERE + profile_name=? + AND plugin_name=? + ORDER BY id DESC + LIMIT ? + """, + ( + profile_name, + plugin_name, + limit, + ), + ).fetchall() + + return conn.execute( + """ + SELECT * + FROM plugin_logs + WHERE profile_name=? + ORDER BY id DESC + LIMIT ? + """, + ( + profile_name, + limit, + ), + ).fetchall() + + def plugin_crashes( + self, + profile_name: str, + plugin_name: str | None = None, + limit: int = 1000, + ): + with self.connection() as conn: + if plugin_name: + return conn.execute( + """ + SELECT * + FROM plugin_crashes + WHERE + profile_name=? + AND plugin_name=? + ORDER BY id DESC + LIMIT ? + """, + ( + profile_name, + plugin_name, + limit, + ), + ).fetchall() + + return conn.execute( + """ + SELECT * + FROM plugin_crashes + WHERE profile_name=? + ORDER BY id DESC + LIMIT ? + """, + ( + profile_name, + limit, + ), + ).fetchall() + + def plugins( + self, + profile_name: str, + ): + with self.connection() as conn: + return conn.execute( + """ + SELECT * + FROM plugin_registry + WHERE profile_name=? + ORDER BY plugin_name + """, + ( + profile_name, + ), + ).fetchall() + + # db.py additions + + def delete_plugin( + self, + profile_name: str, + plugin_name: str, + ) -> None: + with self.connection() as conn: + conn.execute( + """ + DELETE FROM plugin_registry + WHERE + profile_name=? + AND plugin_name=? + """, + ( + profile_name, + plugin_name, + ), + ) + + conn.execute( + """ + DELETE FROM plugin_settings + WHERE + profile_name=? + AND plugin_name=? + """, + ( + profile_name, + plugin_name, + ), + ) + + conn.execute( + """ + DELETE FROM plugin_logs + WHERE + profile_name=? + AND plugin_name=? + """, + ( + profile_name, + plugin_name, + ), + ) + + conn.execute( + """ + DELETE FROM plugin_crashes + WHERE + profile_name=? + AND plugin_name=? + """, + ( + profile_name, + plugin_name, + ), + ) + + conn.commit() + + # v1 + def plugin_crash_count( + self, + profile_name: str, + plugin_name: str, + ) -> int: + with self.connection() as conn: + row = conn.execute( + """ + SELECT COUNT(*) + FROM plugin_crashes + WHERE + profile_name=? + AND plugin_name=? + """, + ( + profile_name, + plugin_name, + ), + ).fetchone() + + return int(row[0] or 0) + + + # v1 + def plugin_quarantined( + self, + profile_name: str, + plugin_name: str, + ) -> bool: + return bool( + self.get_setting( + f"plugin_quarantine:{profile_name}:{plugin_name}", + False, + ) + ) + + + # v1 + def set_plugin_quarantined( + self, + profile_name: str, + plugin_name: str, + value: bool, + ) -> None: + self.set_setting( + f"plugin_quarantine:{profile_name}:{plugin_name}", + bool(value), + ) \ No newline at end of file