Files
devdisc/addons.py
2026-06-08 06:16:54 +00:00

1330 lines
28 KiB
Python

from __future__ import annotations
import json
import logging
import re
from pathlib import Path
from PySide6.QtWebEngineCore import (
QWebEngineProfile,
QWebEngineScript,
)
from config import (
PLUGIN_BLOCKED_PATTERNS,
PLUGIN_CRASH_LIMIT,
PLUGIN_MAX_SIZE,
PLUGIN_PERMISSIONS,
profile_extension_dir,
profile_plugin_dir,
profile_theme_dir,
profile_vencord_dir,
)
log = logging.getLogger(__name__)
SCRIPT_WORLD = (
QWebEngineScript.ScriptWorldId.ApplicationWorld
)
PLUGIN_FILE_PATTERN = (
"*.plugin.js"
)
def _js_string(
value: str,
) -> str:
return json.dumps(
value,
ensure_ascii=False,
)
class ThemeManager:
def __init__(
self,
profile_name: str,
):
self.profile_name = profile_name
self.directory = profile_theme_dir(
profile_name
)
def discover(
self,
) -> list[Path]:
return sorted(
self.directory.glob(
"*.css"
)
)
def load_scripts(
self,
) -> list[QWebEngineScript]:
scripts = []
for css_file in self.discover():
try:
css = css_file.read_text(
encoding="utf-8"
)
script = (
QWebEngineScript()
)
script.setName(
f"theme:{css_file.stem}"
)
script.setRunsOnSubFrames(
True
)
script.setInjectionPoint(
QWebEngineScript.InjectionPoint.DocumentReady
)
script.setWorldId(
SCRIPT_WORLD
)
script.setSourceCode(
f"""
(() => {{
const id = "theme-{css_file.stem}";
if (document.getElementById(id)) return;
const style = document.createElement("style");
style.id = id;
style.textContent = {_js_string(css)};
const install = () => {{
if (!document.head) {{
requestAnimationFrame(install);
return;
}}
document.head.appendChild(style);
}};
install();
}})();
"""
)
scripts.append(
script
)
except Exception:
log.exception(
"theme load failed: %s",
css_file,
)
return scripts
class ExtensionManager:
def __init__(
self,
profile_name: str,
):
self.profile_name = profile_name
self.directory = (
profile_extension_dir(
profile_name
)
)
def discover(
self,
) -> list[Path]:
if not self.directory.exists():
return []
return sorted(
path
for path in self.directory.iterdir()
if path.is_dir()
)
def load_scripts(
self,
) -> list[QWebEngineScript]:
scripts = []
for extension in self.discover():
manifest_file = (
extension
/ "manifest.json"
)
script_file = (
extension
/ "script.js"
)
if (
not manifest_file.exists()
or not script_file.exists()
):
continue
try:
manifest = json.loads(
manifest_file.read_text(
encoding="utf-8"
)
)
if not manifest.get(
"enabled",
True,
):
continue
source = (
script_file.read_text(
encoding="utf-8"
)
)
script = (
QWebEngineScript()
)
script.setName(
manifest.get(
"name",
extension.name,
)
)
script.setRunsOnSubFrames(
True
)
script.setInjectionPoint(
QWebEngineScript.InjectionPoint.DocumentCreation
)
script.setWorldId(
SCRIPT_WORLD
)
script.setSourceCode(
source
)
scripts.append(
script
)
except Exception:
log.exception(
"extension load failed: %s",
extension,
)
return scripts
class PluginManager:
def __init__(
self,
profile_name: str,
db=None,
):
self.profile_name = profile_name
self.db = db
self.directory = profile_plugin_dir(
profile_name
)
self.directory.mkdir(
parents=True,
exist_ok=True,
)
self.quarantine_dir = (
profile_vencord_dir(
profile_name
)
/ "quarantine"
)
self.quarantine_dir.mkdir(
parents=True,
exist_ok=True,
)
def discover(
self,
) -> list[Path]:
plugins = []
for plugin in sorted(
self.directory.glob(
PLUGIN_FILE_PATTERN
)
):
try:
meta = self.metadata(
plugin
)
if (
self.db
and not self.db.plugin_enabled(
self.profile_name,
meta["name"],
True,
)
):
continue
plugins.append(
plugin
)
except Exception:
log.exception(
"plugin discovery failed: %s",
plugin,
)
return plugins
def metadata(
self,
plugin_file: Path,
) -> dict:
source = plugin_file.read_text(
encoding="utf-8",
errors="ignore",
)
permissions = set()
for match in re.findall(
r"@permissions\s+(.+)",
source,
re.IGNORECASE,
):
permissions.update(
token.strip().lower()
for token in match.split()
if token.strip()
)
permissions &= PLUGIN_PERMISSIONS
metadata["permissions"] = sorted(
permissions
)
name = plugin_file.name
if name.endswith(
".plugin.js"
):
name = name[:-10]
metadata = {
"name": name,
"version": "unknown",
"author": "",
"description": "",
"enabled": True,
"path": str(
plugin_file
),
"size": plugin_file.stat().st_size,
}
patterns = {
"name": r"@name\s+(.+)",
"version": r"@version\s+(.+)",
"author": r"@author\s+(.+)",
"description": r"@description\s+(.+)",
}
for key, pattern in (
patterns.items()
):
match = re.search(
pattern,
source,
re.IGNORECASE,
)
if match:
metadata[key] = (
match.group(1)
.strip()
)
if self.db:
metadata[
"enabled"
] = self.db.plugin_enabled(
self.profile_name,
metadata["name"],
True,
)
return metadata
def validate_file(
self,
source: Path,
) -> tuple[
bool,
str,
]:
filename = (
source.name.lower()
)
if not filename.endswith(
".plugin.js"
):
return (
False,
"invalid extension",
)
if (
source.stat().st_size
> PLUGIN_MAX_SIZE
):
return (
False,
"plugin exceeds size limit",
)
content = (
source.read_text(
encoding="utf-8",
errors="ignore",
)
)
lowered = (
content.lower()
)
for pattern in (
PLUGIN_BLOCKED_PATTERNS
):
if (
pattern.lower()
in lowered
):
return (
False,
f"blocked pattern: {pattern}",
)
return (
True,
"",
)
def bootstrap_script(
self,
) -> QWebEngineScript:
script = (
QWebEngineScript()
)
script.setName(
"plugin-runtime"
)
script.setRunsOnSubFrames(
True
)
script.setInjectionPoint(
QWebEngineScript.InjectionPoint.DocumentCreation
)
script.setWorldId(
SCRIPT_WORLD
)
script.setSourceCode(
"""
(() => {
if (window.PluginRuntime)
return;
window.__bridgeReady =
new Promise(resolve => {
const complete = () => {
if (
window.bridge
) {
resolve(
window.bridge
);
return;
}
setTimeout(
complete,
25,
);
};
if (
typeof qt !== "undefined"
&& typeof QWebChannel !== "undefined"
) {
new QWebChannel(
qt.webChannelTransport,
channel => {
window.bridge =
channel.objects.bridge;
resolve(
window.bridge
);
}
);
return;
}
complete();
});
window.PluginRuntime = {
plugins:{},
failed:{},
started:{},
hasPermission(
plugin,
permission
){
return !!(
this.permissions[
plugin
]?.[permission]
);
},
async bridge(){
return await (
window.__bridgeReady
);
},
log(name,msg){
console.log(
`[Plugin:${name}]`,
msg
);
},
async fail(
name,
error
){
console.error(
`[Plugin:${name}]`,
error
);
this.failed[name] =
String(error);
try{
const bridge =
await this.bridge();
bridge.pluginCrash(
name,
String(error)
);
}
catch{}
}
};
window.BdApi = {
Data:{
async load(
plugin,
key
){
try{
const bridge =
await PluginRuntime.bridge();
const raw =
bridge.pluginLoad(
plugin,
key
);
return JSON.parse(
raw
);
}
catch{
return null;
}
},
async save(
plugin,
key,
value
){
try{
const bridge =
await PluginRuntime.bridge();
bridge.pluginSave(
plugin,
key,
JSON.stringify(
value
)
);
}
catch{}
}
},
Logger:{
async log(
...a
){
try{
const bridge =
await PluginRuntime.bridge();
bridge.pluginLog(
"runtime",
"info",
a.join(" ")
);
}
catch{}
console.log(
...a
);
},
async warn(
...a
){
try{
const bridge =
await PluginRuntime.bridge();
bridge.pluginLog(
"runtime",
"warning",
a.join(" ")
);
}
catch{}
console.warn(
...a
);
},
async error(
...a
){
try{
const bridge =
await PluginRuntime.bridge();
bridge.pluginLog(
"runtime",
"error",
a.join(" ")
);
}
catch{}
console.error(
...a
);
}
},
UI:{
showToast(
message
){
console.log(
"[Toast]",
message
);
}
},
DOM:{
addStyle(
id,
css
){
let style =
document.getElementById(
id
);
if(!style){
style =
document.createElement(
"style"
);
style.id =
id;
document.head.appendChild(
style
);
}
style.textContent =
css;
},
removeStyle(
id
){
document
.getElementById(
id
)
?.remove();
}
}
};
window.ZeresPluginLibrary = {
Logger:
BdApi.Logger,
Utilities:{},
DOMTools:
BdApi.DOM,
PluginUpdater:{
checkForUpdate(){}
},
Patcher:{
before(){},
after(){},
instead(){},
unpatchAll(){}
}
};
})();
"""
)
return script
def plugin_script(
self,
plugin_file: Path,
) -> QWebEngineScript:
meta = self.metadata(
plugin_file
)
source = (
plugin_file.read_text(
encoding="utf-8",
errors="ignore",
)
)
wrapped = f"""
(() => {{
const module = {{
exports:null
}};
const exports =
module.exports;
try {{
{source}
const PluginClass =
module.exports;
if(
typeof PluginClass
!== "function"
)
return;
const plugin =
new PluginClass();
plugin.__permissions =
Object.freeze(
%PERMISSIONS%
);
PluginRuntime.plugins[
{_js_string(meta["name"])}
] = plugin;
try {{
if(
typeof plugin.start
=== "function"
)
plugin.start();
else if(
typeof plugin.onStart
=== "function"
)
plugin.onStart();
PluginRuntime.started[
{_js_string(meta["name"])}
] = true;
}}
catch(error) {{
PluginRuntime.fail(
{_js_string(meta["name"])},
error
);
}}
}}
catch(error) {{
PluginRuntime.fail(
{_js_string(meta["name"])},
error
);
}}
}})();
"""
script = (
QWebEngineScript()
)
script.setName(
f"plugin:{meta['name']}"
)
script.setRunsOnSubFrames(
True
)
script.setInjectionPoint(
QWebEngineScript.InjectionPoint.DocumentCreation
)
script.setWorldId(
SCRIPT_WORLD
)
script.setSourceCode(
wrapped
)
return script
def load_scripts(
self,
) -> list[QWebEngineScript]:
scripts = [
self.bootstrap_script()
]
for plugin in (
self.discover()
):
try:
scripts.append(
self.plugin_script(
plugin
)
)
except Exception:
log.exception(
"plugin load failed: %s",
plugin,
)
if self.db:
self.db.plugin_crash(
self.profile_name,
plugin.stem,
"load failure",
)
return scripts
def install_file(
self,
source: Path,
) -> Path:
valid, error = (
self.validate_file(
plugin
)
)
if not valid:
self.quarantine_plugin(
plugin
)
continue
if self.is_quarantined(
meta["name"]
):
continue
filename = re.sub(
r"[^a-zA-Z0-9._-]",
"_",
source.name,
)
destination = (
self.directory
/ filename
)
destination.write_text(
source.read_text(
encoding="utf-8",
errors="ignore",
),
encoding="utf-8",
)
if (
self.db
and self.db.plugin_crash_count(
self.profile_name,
meta["name"],
)
>= PLUGIN_CRASH_LIMIT
):
self.quarantine_plugin(
plugin
)
continue
return destination
def uninstall(
self,
plugin_name: str,
) -> bool:
for plugin in (
self.directory.glob(
PLUGIN_FILE_PATTERN
)
):
meta = self.metadata(
plugin
)
if (
meta["name"]
!= plugin_name
):
continue
plugin.unlink(
missing_ok=True
)
if self.db:
self.db.delete_plugin(
self.profile_name,
plugin_name,
)
return True
return False
def enable(
self,
plugin_name: str,
) -> None:
if self.db:
self.db.set_plugin_enabled(
self.profile_name,
plugin_name,
True,
)
def disable(
self,
plugin_name: str,
) -> None:
if self.db:
self.db.set_plugin_enabled(
self.profile_name,
plugin_name,
False,
)
def inventory(
self,
) -> list[dict]:
rows = []
for plugin in sorted(
self.directory.glob(
PLUGIN_FILE_PATTERN
)
):
try:
rows.append(
self.metadata(
plugin
)
)
except Exception:
log.exception(
"inventory failed: %s",
plugin,
)
return rows
def diagnostics(
self,
) -> dict:
plugins = (
self.inventory()
)
return {
"profile": self.profile_name,
"plugin_count": len(
plugins
),
"enabled_count": len(
[
p
for p in plugins
if p.get(
"enabled"
)
]
),
"disabled_count": len(
[
p
for p in plugins
if not p.get(
"enabled"
)
]
),
"plugins": plugins,
}
def quarantine_plugin(
self,
plugin_file: Path,
) -> None:
try:
target = (
self.quarantine_dir
/ plugin_file.name
)
plugin_file.replace(
target
)
meta = self.metadata(
target
)
if self.db:
self.db.set_plugin_enabled(
self.profile_name,
meta["name"],
False,
)
self.db.set_plugin_quarantined(
self.profile_name,
meta["name"],
True,
)
except Exception:
log.exception(
"plugin quarantine failed"
)
def is_quarantined(
self,
plugin_name: str,
) -> bool:
if not self.db:
return False
return self.db.plugin_quarantined(
self.profile_name,
plugin_name,
)
# v8
class AddonManager:
def __init__(
self,
profile_name: str,
db=None,
):
self.profile_name = profile_name
self.db = db
self.themes = ThemeManager(
profile_name
)
self.extensions = (
ExtensionManager(
profile_name
)
)
self.plugins = (
PluginManager(
profile_name,
db,
)
)
def install(
self,
profile: QWebEngineProfile,
) -> None:
collection = (
profile.scripts()
)
existing = {
collection.script(
i
).name()
for i in range(
collection.count()
)
}
for script in (
self.themes.load_scripts()
):
if (
script.name()
not in existing
):
collection.insert(
script
)
for script in (
self.extensions.load_scripts()
):
if (
script.name()
not in existing
):
collection.insert(
script
)
for script in (
self.plugins.load_scripts()
):
if (
script.name()
not in existing
):
collection.insert(
script
)
def inventory(
self,
) -> dict:
themes = []
extensions = []
plugins = (
self.plugins.inventory()
)
for theme in (
self.themes.discover()
):
themes.append(
{
"name": theme.stem,
"type": "theme",
"enabled": True,
"path": str(
theme
),
}
)
for extension in (
self.extensions.discover()
):
manifest = (
extension
/ "manifest.json"
)
enabled = True
try:
if (
manifest.exists()
):
enabled = bool(
json.loads(
manifest.read_text(
encoding="utf-8"
)
).get(
"enabled",
True,
)
)
except Exception:
enabled = False
extensions.append(
{
"name": extension.name,
"type": "extension",
"enabled": enabled,
"path": str(
extension
),
}
)
return {
"themes": themes,
"extensions": extensions,
"plugins": plugins,
}
def diagnostics(
self,
) -> dict:
plugin_diag = (
self.plugins.diagnostics()
)
return {
"profile": self.profile_name,
"themes": len(
self.themes.discover()
),
"extensions": len(
self.extensions.discover()
),
"plugins": plugin_diag,
}
def reload(
self,
profile: QWebEngineProfile,
) -> None:
profile.scripts().clear()
self.install(
profile
)
def install_plugin(
self,
source: Path,
) -> Path:
return (
self.plugins.install_file(
source
)
)
def uninstall_plugin(
self,
plugin_name: str,
) -> bool:
return (
self.plugins.uninstall(
plugin_name
)
)
def enable_plugin(
self,
plugin_name: str,
) -> None:
self.plugins.enable(
plugin_name
)
def disable_plugin(
self,
plugin_name: str,
) -> None:
self.plugins.disable(
plugin_name
)
def plugin_inventory(
self,
) -> list[dict]:
return (
self.plugins.inventory()
)
def plugin_diagnostics(
self,
) -> dict:
return (
self.plugins.diagnostics()
)