Initial commit

This commit is contained in:
2026-06-04 17:11:29 +02:00
commit 6e5fcefa42
13 changed files with 520 additions and 0 deletions

112
notification_server.py Normal file
View File

@@ -0,0 +1,112 @@
import json
import threading
from http.server import BaseHTTPRequestHandler, HTTPServer
from typing import Callable
from PyQt5.QtCore import QObject, pyqtSignal
from qgis.core import QgsMessageLog, Qgis
class _Signals(QObject):
"""Thin QObject wrapper so we can emit signals from the server thread.
Qt auto-connection (the default) marshals cross-thread signals via the
event queue, so connected slots always run in the main Qt thread.
"""
reload = pyqtSignal()
clear = pyqtSignal()
class NotificationServer:
"""Lightweight HTTP server that listens on localhost:22651.
Exposes:
POST /reload → triggers the reload callback in the main Qt thread
POST /clear → triggers the clear callback in the main Qt thread
Both respond with {"status": "ok"}.
The server runs in a dedicated daemon thread so it never blocks QGIS.
"""
def __init__(self, reload_callback: Callable[[], None], clear_callback: Callable[[], None], port: int = 8765):
self._port = port
self._signal = _Signals()
self._signal.reload.connect(reload_callback)
self._signal.clear.connect(clear_callback)
self._server: HTTPServer | None = None
self._thread: threading.Thread | None = None
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
def start(self) -> None:
"""Bind to localhost:{port} and begin serving in a background thread.
Raises:
RuntimeError: if the port is already in use.
"""
signal = self._signal # captured by the handler class below
class _Handler(BaseHTTPRequestHandler):
def do_POST(self): # noqa: N802
if self.path == "/reload":
signal.reload.emit()
self._respond_ok()
elif self.path == "/clear":
signal.clear.emit()
self._respond_ok()
else:
self.send_response(404)
self.end_headers()
def _respond_ok(self):
body = json.dumps({"status": "ok"}).encode()
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
# Suppress the default access log to keep the QGIS console clean.
def log_message(self, fmt, *args): # noqa: ANN001
QgsMessageLog.logMessage(
fmt % args, "BAPEBridge", Qgis.Info
)
try:
self._server = HTTPServer(("127.0.0.1", self._port), _Handler)
except OSError as exc:
raise RuntimeError(
f"Cannot bind notification server to port {self._port}: {exc}"
) from exc
self._thread = threading.Thread(
target=self._server.serve_forever,
name="BAPEBridge-NotifServer",
daemon=True,
)
self._thread.start()
QgsMessageLog.logMessage(
f"Notification server started on http://127.0.0.1:{self._port}",
"BAPEBridge",
Qgis.Info,
)
def stop(self) -> None:
"""Shut down the server and wait for the thread to exit."""
if self._server is not None:
self._server.shutdown()
self._server.server_close()
self._server = None
if self._thread is not None:
self._thread.join(timeout=5)
self._thread = None
QgsMessageLog.logMessage(
"Notification server stopped.",
"BAPEBridge",
Qgis.Info,
)