Files
devdisc/managers.py
2026-06-08 06:17:48 +00:00

1609 lines
34 KiB
Python

from __future__ import annotations
import json
from pathlib import Path
from PySide6.QtCore import QObject, Qt, Signal
from PySide6.QtGui import QAction, QIcon
from PySide6.QtWidgets import (
QApplication,
QCheckBox,
QDialog,
QFileDialog,
QFormLayout,
QGroupBox,
QHBoxLayout,
QLineEdit,
QListWidget,
QMainWindow,
QMenu,
QMessageBox,
QPlainTextEdit,
QPushButton,
QSplitter,
QSystemTrayIcon,
QTableView,
QTabWidget,
QVBoxLayout,
QWidget,
QInputDialog,
)
from models import (
AddonModel,
NetworkTableModel,
PluginCrashTableModel,
PluginLogTableModel,
PluginTableModel,
ProfileContext,
)
from addons import AddonManager
from network import ProxyConfig
from config import (
PLUGIN_ALLOWED_EXTENSIONS,
PLUGIN_BLOCKED_PATTERNS,
PLUGIN_FILE_PATTERN,
PLUGIN_MAX_SIZE,
profile_extension_dir,
profile_plugin_dir,
profile_theme_dir,
profile_root,
)
# v2
class ContextManager:
def __init__(
self,
db,
settings: dict,
):
self.db = db
self.settings = settings
self._contexts: dict[
str,
ProfileContext,
] = {}
def current(
self,
) -> ProfileContext:
return self.profile(
self.settings.get(
"active_profile",
"default",
)
)
def get(
self,
profile_name: str,
) -> ProfileContext:
return self.profile(
profile_name
)
def register(
self,
context: ProfileContext,
) -> None:
self._contexts[
context.name
] = context
def remove(
self,
profile_name: str,
) -> None:
self._contexts.pop(
profile_name,
None,
)
def profile(
self,
profile_name: str,
) -> ProfileContext:
if (
profile_name
in self._contexts
):
return self._contexts[
profile_name
]
self.db.ensure_profile(
profile_name
)
root = profile_root(
profile_name
)
# v3
context = ProfileContext(
name=profile_name,
root=root,
storage_path=(
root / "storage"
),
cache_path=(
root / "cache"
),
downloads_path=(
root / "downloads"
),
plugins_path=(
profile_plugin_dir(
profile_name
)
),
themes_path=(
profile_theme_dir(
profile_name
)
),
extensions_path=(
profile_extension_dir(
profile_name
)
),
profile_db=(
root / "profile.db"
),
permissions_db=(
root / "permissions.db"
),
network_db=(
root / "network.db"
),
)
context.storage_path.mkdir(
parents=True,
exist_ok=True,
)
context.cache_path.mkdir(
parents=True,
exist_ok=True,
)
context.downloads_path.mkdir(
parents=True,
exist_ok=True,
)
self._contexts[
profile_name
] = context
return context
def switch(
self,
profile_name: str,
) -> ProfileContext:
self.settings[
"active_profile"
] = profile_name
save_settings(
self.settings
)
return self.profile(
profile_name
)
class PluginManagerWindow(
QDialog
):
def __init__(
self,
browser,
db,
profile_name,
parent=None,
):
super().__init__(
parent
)
self.browser = browser
self.db = db
self.profile_name = (
profile_name
)
self.resize(
1200,
850,
)
self.setWindowTitle(
"Plugins"
)
self.model = (
PluginTableModel()
)
self.logs_model = (
PluginLogTableModel()
)
self.crash_model = (
PluginCrashTableModel()
)
self.plugins_table = (
QTableView()
)
self.logs_table = (
QTableView()
)
self.crashes_table = (
QTableView()
)
self.diagnostics = (
QPlainTextEdit()
)
self.diagnostics.setReadOnly(
True
)
self.plugins_table.setModel(
self.model
)
self.logs_table.setModel(
self.logs_model
)
self.crashes_table.setModel(
self.crash_model
)
tabs = QTabWidget()
tabs.addTab(
self.plugins_table,
"Plugins",
)
tabs.addTab(
self.logs_table,
"Logs",
)
tabs.addTab(
self.crashes_table,
"Crashes",
)
tabs.addTab(
self.diagnostics,
"Diagnostics",
)
self.install_file_btn = (
QPushButton(
"Install File"
)
)
self.install_url_btn = (
QPushButton(
"Install URL"
)
)
self.enable_btn = (
QPushButton(
"Enable"
)
)
self.disable_btn = (
QPushButton(
"Disable"
)
)
self.reload_btn = (
QPushButton(
"Reload"
)
)
self.delete_btn = (
QPushButton(
"Delete"
)
)
buttons = QHBoxLayout()
for button in (
self.install_file_btn,
self.install_url_btn,
self.enable_btn,
self.disable_btn,
self.reload_btn,
self.delete_btn,
):
buttons.addWidget(
button
)
layout = QVBoxLayout(
self
)
layout.addWidget(
tabs
)
layout.addLayout(
buttons
)
self.install_file_btn.clicked.connect(
self.install_file
)
self.install_url_btn.clicked.connect(
self.install_url
)
self.enable_btn.clicked.connect(
self.enable_plugin
)
self.disable_btn.clicked.connect(
self.disable_plugin
)
self.reload_btn.clicked.connect(
self.reload_plugins
)
self.delete_btn.clicked.connect(
self.delete_plugin
)
self.reload()
def selected_plugin(
self,
) -> str | None:
index = (
self.plugins_table.currentIndex()
)
if not index.isValid():
return None
row = self.model.row(
index.row()
)
return row.get(
"name"
)
def install_file(
self,
) -> None:
path, _ = (
QFileDialog.getOpenFileName(
self,
"Install Plugin",
"",
"*.plugin.js",
)
)
if not path:
return
try:
self.browser.install_plugin_file(
path
)
self.reload()
except Exception as exc:
QMessageBox.warning(
self,
"Install Failed",
str(exc),
)
# v2
def install_url(
self,
) -> None:
url, accepted = (
QInputDialog.getText(
self,
"Install URL",
"Plugin URL",
)
)
url = url.strip()
if (
not accepted
or not url
):
return
try:
self.browser.install_plugin_url(
url
)
self.reload()
except Exception as exc:
QMessageBox.warning(
self,
"Install Failed",
str(exc),
)
def enable_plugin(
self,
) -> None:
plugin = (
self.selected_plugin()
)
if not plugin:
return
self.browser.set_plugin_enabled(
plugin,
True,
)
self.reload()
def disable_plugin(
self,
) -> None:
plugin = (
self.selected_plugin()
)
if not plugin:
return
self.browser.set_plugin_enabled(
plugin,
False,
)
self.reload()
def delete_plugin(
self,
) -> None:
plugin = (
self.selected_plugin()
)
if not plugin:
return
result = (
QMessageBox.question(
self,
"Delete Plugin",
plugin,
)
)
if (
result
!= QMessageBox.StandardButton.Yes
):
return
self.browser.remove_plugin(
plugin
)
self.reload()
def reload_plugins(
self,
) -> None:
self.browser.reload_addons()
self.reload()
def reload(
self,
) -> None:
self.model.set_plugins(
self.browser.plugin_inventory()
)
self.logs_model.set_rows(
list(
self.db.plugin_logs(
self.profile_name
)
)
)
self.crash_model.set_rows(
list(
self.db.plugin_crashes(
self.profile_name
)
)
)
self.diagnostics.setPlainText(
json.dumps(
self.browser.plugin_diagnostics_snapshot(),
indent=2,
ensure_ascii=False,
)
)
class SettingsWindow(QDialog):
settingsChanged = Signal()
def __init__(self, settings: dict, parent=None):
super().__init__(parent)
self.settings = settings
self.setWindowTitle("Settings")
self.resize(950, 700)
tabs = QTabWidget()
tabs.addTab(self._general_tab(), "General")
tabs.addTab(self._network_tab(), "Network")
tabs.addTab(self._permissions_tab(), "Permissions")
tabs.addTab(self._privacy_tab(), "Privacy")
tabs.addTab(self._developer_tab(), "Developer")
layout = QVBoxLayout(self)
layout.addWidget(tabs)
save_button = QPushButton("Save")
save_button.clicked.connect(self._save)
layout.addWidget(save_button)
def _general_tab(self):
widget = QWidget()
layout = QFormLayout(widget)
self.download_path = QLineEdit(
self.settings.get("download_directory", "")
)
layout.addRow("Downloads", self.download_path)
return widget
def _network_tab(self):
widget = QWidget()
layout = QFormLayout(widget)
self.network_logging = QCheckBox()
self.network_logging.setChecked(
self.settings.get("network_logging", True)
)
layout.addRow("Enable Logging", self.network_logging)
return widget
def _permissions_tab(self):
widget = QWidget()
layout = QVBoxLayout(widget)
permissions = self.settings.setdefault(
"permissions",
{},
)
self.permission_boxes = {}
for key, title in (
("microphone", "Microphone"),
("camera", "Camera"),
("clipboard", "Clipboard"),
("notifications", "Notifications"),
("downloads", "Downloads"),
("fullscreen", "Fullscreen"),
):
box = QCheckBox(title)
box.setChecked(
bool(
permissions.get(
key,
False,
)
)
)
self.permission_boxes[key] = box
layout.addWidget(box)
layout.addStretch()
return widget
def _privacy_tab(self):
widget = QWidget()
layout = QVBoxLayout(widget)
blocked = set(
self.settings.get(
"blocked_hosts",
[],
)
)
self.discord_telemetry = QCheckBox(
"Block Discord Telemetry"
)
self.discord_telemetry.setChecked(
"telemetry.discord.com" in blocked
)
self.discord_crash = QCheckBox(
"Block Discord Crash Reporting"
)
self.discord_crash.setChecked(
"crash.discord.com" in blocked
)
self.sentry = QCheckBox(
"Block Sentry"
)
self.sentry.setChecked(
"sentry.io" in blocked
)
self.statsig = QCheckBox(
"Block Statsig"
)
self.statsig.setChecked(
"api.statsig.com" in blocked
)
layout.addWidget(self.discord_telemetry)
layout.addWidget(self.discord_crash)
layout.addWidget(self.sentry)
layout.addWidget(self.statsig)
group = QGroupBox(
"Blocked Hosts"
)
group_layout = QVBoxLayout(group)
self.blocked_hosts = QPlainTextEdit()
self.blocked_hosts.setPlainText(
"\n".join(
sorted(blocked)
)
)
group_layout.addWidget(
self.blocked_hosts
)
layout.addWidget(group)
return widget
def _developer_tab(self):
widget = QWidget()
layout = QFormLayout(widget)
self.enable_cdp = QCheckBox()
self.enable_cdp.setChecked(
self.settings.get(
"enable_cdp",
True,
)
)
layout.addRow(
"Enable CDP",
self.enable_cdp,
)
return widget
def _save(self):
self.settings["download_directory"] = (
self.download_path.text().strip()
)
self.settings["network_logging"] = (
self.network_logging.isChecked()
)
self.settings["enable_cdp"] = (
self.enable_cdp.isChecked()
)
permissions = self.settings.setdefault(
"permissions",
{},
)
for key, box in self.permission_boxes.items():
permissions[key] = (
box.isChecked()
)
blocked = {
line.strip().lower()
for line in self.blocked_hosts.toPlainText().splitlines()
if line.strip()
}
if self.discord_telemetry.isChecked():
blocked.add(
"telemetry.discord.com"
)
if self.discord_crash.isChecked():
blocked.add(
"crash.discord.com"
)
if self.sentry.isChecked():
blocked.add("sentry.io")
if self.statsig.isChecked():
blocked.add(
"api.statsig.com"
)
self.settings[
"blocked_hosts"
] = sorted(blocked)
self.settingsChanged.emit()
self.accept()
# v4
class ProfileManager(QDialog):
profileSelected = Signal(str)
PROFILE_MAX_FILES = 10000
PROFILE_MAX_FILE_SIZE = 50 * 1024 * 1024
PROFILE_MAX_TOTAL_SIZE = 500 * 1024 * 1024
PROFILE_EXPORT_SKIP = {
"cache",
"GPUCache",
"Code Cache",
"blob_storage",
"Service Worker",
"Session Storage",
}
def __init__(
self,
profiles: list[str],
current: str,
parent=None,
):
super().__init__(parent)
self.setWindowTitle(
"Profiles"
)
self.resize(500, 550)
self.list_widget = QListWidget()
profiles = sorted(profiles)
self.list_widget.addItems(
profiles
)
if current in profiles:
self.list_widget.setCurrentRow(
profiles.index(current)
)
switch_button = QPushButton(
"Switch"
)
export_button = QPushButton(
"Export"
)
import_button = QPushButton(
"Import"
)
switch_button.clicked.connect(
self._switch
)
export_button.clicked.connect(
self._export
)
import_button.clicked.connect(
self._import
)
buttons = QHBoxLayout()
buttons.addWidget(switch_button)
buttons.addWidget(export_button)
buttons.addWidget(import_button)
layout = QVBoxLayout(self)
layout.addWidget(self.list_widget)
layout.addLayout(buttons)
def _switch(
self,
):
item = self.list_widget.currentItem()
if not item:
return
self.profileSelected.emit(
item.text()
)
self.accept()
def _export(
self,
):
item = (
self.list_widget.currentItem()
)
if not item:
return
profile_name = (
item.text()
)
path, _ = (
QFileDialog.getSaveFileName(
self,
"Export Profile",
f"{profile_name}.profilebundle",
"*.profilebundle",
)
)
if not path:
return
profile_root_path = (
profile_root(
profile_name
)
).resolve()
export = {
"version": 1,
"profile": profile_name,
"storage": {},
}
file_count = 0
total_size = 0
for file in (
profile_root_path.rglob(
"*"
)
):
if (
not file.is_file()
or file.is_symlink()
):
continue
try:
resolved = (
file.resolve()
)
if (
profile_root_path
not in resolved.parents
):
continue
relative = (
resolved.relative_to(
profile_root_path
)
)
if any(
part
in self.PROFILE_EXPORT_SKIP
for part in relative.parts
):
continue
size = (
resolved.stat()
.st_size
)
if (
size
> self.PROFILE_MAX_FILE_SIZE
):
continue
file_count += 1
total_size += size
if (
file_count
> self.PROFILE_MAX_FILES
):
raise ValueError(
"profile exceeds file limit"
)
if (
total_size
> self.PROFILE_MAX_TOTAL_SIZE
):
raise ValueError(
"profile exceeds size limit"
)
export[
"storage"
][
str(relative)
] = (
resolved.read_bytes()
.hex()
)
except Exception:
raise
Path(path).write_text(
json.dumps(
export,
indent=2,
ensure_ascii=False,
),
encoding="utf-8",
)
QMessageBox.information(
self,
"Export",
"Profile exported.",
)
def _import(
self,
):
path, _ = (
QFileDialog.getOpenFileName(
self,
"Import Profile",
"",
"*.profilebundle",
)
)
if not path:
return
try:
bundle = json.loads(
Path(path).read_text(
encoding="utf-8"
)
)
profile_name = str(
bundle.get(
"profile",
"",
)
).strip()
if not profile_name:
raise ValueError(
"invalid profile"
)
root = profile_root(
profile_name
)
root.mkdir(
parents=True,
exist_ok=True,
)
root_resolved = (
root.resolve()
)
file_count = 0
total_size = 0
for (
relative,
payload,
) in bundle.get(
"storage",
{},
).items():
file_count += 1
if (
file_count
> self.PROFILE_MAX_FILES
):
raise ValueError(
"profile exceeds file limit"
)
size = (
len(payload)
// 2
)
if (
size
> self.PROFILE_MAX_FILE_SIZE
):
continue
total_size += size
if (
total_size
> self.PROFILE_MAX_TOTAL_SIZE
):
raise ValueError(
"profile exceeds size limit"
)
target = (
root
/ relative
).resolve()
if (
root_resolved
not in target.parents
and target
!= root_resolved
):
continue
target.parent.mkdir(
parents=True,
exist_ok=True,
)
target.write_bytes(
bytes.fromhex(
payload
)
)
QMessageBox.information(
self,
"Import",
"Profile imported.",
)
except Exception as exc:
QMessageBox.warning(
self,
"Import",
str(exc),
)
class AddonManagerWindow(QDialog):
def __init__(
self,
context: ProfileContext,
parent=None,
):
super().__init__(parent)
self.context = context
self.manager = AddonManager(
context.name,
getattr(
parent,
"db",
None,
),
)
self.setWindowTitle(
"Themes & Extensions"
)
self.resize(900, 650)
self.model = AddonModel()
self.table = QTableView()
self.table.setModel(self.model)
refresh = QPushButton(
"Refresh"
)
refresh.clicked.connect(
self.reload
)
layout = QVBoxLayout(self)
layout.addWidget(self.table)
layout.addWidget(refresh)
self.reload()
def reload(
self,
):
rows = []
for theme in (
self.manager.themes.discover()
):
rows.append(
{
"name": theme.stem,
"enabled": True,
"type": "Theme",
}
)
for extension in (
self.manager.extensions.discover()
):
enabled = True
manifest = (
extension
/ "manifest.json"
)
if manifest.exists():
try:
enabled = bool(
json.loads(
manifest.read_text(
encoding="utf-8"
)
).get(
"enabled",
True,
)
)
except Exception:
pass
rows.append(
{
"name": extension.name,
"enabled": enabled,
"type": "Extension",
}
)
for plugin in (
self.manager.plugins.inventory()
):
rows.append(
{
"name": plugin[
"name"
],
"enabled": plugin.get(
"enabled",
True,
),
"type": "Plugin",
}
)
self.model.set_addons(
rows
)
class ThemeEditor(QDialog):
def __init__(
self,
theme_path: Path,
parent=None,
):
super().__init__(parent)
self.theme_path = theme_path
self.setWindowTitle(
theme_path.name
)
self.resize(1000, 700)
self.editor = QPlainTextEdit()
if theme_path.exists():
self.editor.setPlainText(
theme_path.read_text(
encoding="utf-8"
)
)
save_button = QPushButton(
"Save"
)
save_button.clicked.connect(
self.save
)
layout = QVBoxLayout(self)
layout.addWidget(self.editor)
layout.addWidget(save_button)
def save(self):
self.theme_path.write_text(
self.editor.toPlainText(),
encoding="utf-8",
)
self.accept()
class ProxyManagerWindow(QDialog):
def __init__(
self,
network,
profile_name: str,
parent=None,
):
super().__init__(parent)
self.network = network
self.profile_name = profile_name
self.setWindowTitle(
"Proxy Settings"
)
self.resize(500, 350)
proxy = (
network.proxy.get_proxy(
profile_name
)
)
self.enabled = QCheckBox()
self.enabled.setChecked(
proxy.enabled
)
self.host = QLineEdit(
proxy.host
)
self.port = QLineEdit(
str(proxy.port)
)
self.username = QLineEdit(
proxy.username
)
self.password = QLineEdit(
proxy.password
)
self.type_box = QLineEdit(
proxy.proxy_type
)
layout = QFormLayout(self)
layout.addRow(
"Enabled",
self.enabled,
)
layout.addRow(
"Type",
self.type_box,
)
layout.addRow(
"Host",
self.host,
)
layout.addRow(
"Port",
self.port,
)
layout.addRow(
"Username",
self.username,
)
layout.addRow(
"Password",
self.password,
)
save_button = QPushButton(
"Save"
)
save_button.clicked.connect(
self.save
)
layout.addRow(save_button)
def save(self):
try:
port = int(
self.port.text().strip()
or "0"
)
except ValueError:
port = 0
proxy = ProxyConfig(
enabled=self.enabled.isChecked(),
proxy_type=self.type_box.text().strip() or "http",
host=self.host.text().strip(),
port=port,
username=self.username.text(),
password=self.password.text(),
)
self.network.proxy.save_proxy(
self.profile_name,
proxy,
)
self.accept()
class NetworkInspector(QMainWindow):
def __init__(
self,
network,
parent=None,
):
super().__init__(parent)
self.network = network
self.setWindowTitle(
"Network Inspector"
)
self.resize(1400, 850)
self.model = NetworkTableModel()
self.table = QTableView()
self.table.setModel(self.model)
self.table.horizontalHeader().setStretchLastSection(
True
)
self.search = QLineEdit()
self.search.setPlaceholderText(
"Search URL, host, method..."
)
self.search.textChanged.connect(
self.reload
)
self.details = QPlainTextEdit()
self.details.setReadOnly(True)
splitter = QSplitter(
Qt.Orientation.Vertical
)
splitter.addWidget(
self.table
)
splitter.addWidget(
self.details
)
container = QWidget()
layout = QVBoxLayout(
container
)
layout.addWidget(
self.search
)
layout.addWidget(
splitter
)
self.setCentralWidget(
container
)
self.table.clicked.connect(
self.show_details
)
self.reload()
def reload(self):
query = (
self.search.text().strip()
)
rows = (
self.network.search(query)
if query
else self.network.recent()
)
self.model.set_rows(rows)
def show_details(
self,
index,
):
try:
row = self.model.row(
index.row()
)
self.details.setPlainText(
json.dumps(
dict(row),
indent=2,
ensure_ascii=False,
)
)
except Exception:
self.details.clear()
# v2
class TrayManager(QObject):
def __init__(
self,
parent,
):
super().__init__(parent)
self.parent_window = parent
self.tray = QSystemTrayIcon(
parent
)
icon = (
QApplication.windowIcon()
)
if icon.isNull():
icon = (
self.parent_window.windowIcon()
)
self.tray.setIcon(
icon
)
self.menu = QMenu()
self.open_action = QAction(
"Open",
parent,
)
self.hide_action = QAction(
"Hide",
parent,
)
self.quit_action = QAction(
"Quit",
parent,
)
self.menu.addAction(
self.open_action
)
self.menu.addAction(
self.hide_action
)
self.menu.addSeparator()
self.menu.addAction(
self.quit_action
)
self.tray.setContextMenu(
self.menu
)
self.open_action.triggered.connect(
self.show_window
)
self.hide_action.triggered.connect(
parent.hide
)
self.quit_action.triggered.connect(
self.quit_application
)
self.tray.activated.connect(
self._activated
)
def _activated(
self,
reason,
):
if reason == QSystemTrayIcon.ActivationReason.DoubleClick:
self.show_window()
def show_window(self):
self.parent_window.showNormal()
self.parent_window.raise_()
self.parent_window.activateWindow()
def quit_application(self):
self.tray.hide()
self.parent_window.tray_manager = None
self.parent_window.settings_data.setdefault(
"tray",
{}
)["close_to_tray"] = False
self.parent_window.close()