WebSocket-lag i portvokteren: PG LISTEN/NOTIFY + WS-endepunkt (oppgave 22.1)

Implementerer Fase M1 av SpacetimeDB-migrasjonen:

- SQL-migrasjon 018: Triggers for notify_node_change, notify_edge_change
  og notify_access_change på nodes, edges og node_access-tabellene
- Ny ws.rs-modul i maskinrommet med:
  - PG LISTEN bakgrunnsoppgave som lytter på tre kanaler
  - Broadcast-kanal for å videresende events til alle WS-klienter
  - WebSocket-endepunkt (/ws) med JWT-autentisering
  - Initiell snapshot (initial_sync) ved tilkobling
  - Tilgangskontrollfiltrering per klient via node_access-matrisen
- Oppdatert AppState med WsBroadcast og /ws-rute

Frontend dual-tilkobling (STDB + nytt WS) kommer i neste commit.
This commit is contained in:
vegard 2026-03-18 11:54:34 +00:00
parent 6ee50e937d
commit 0ecb7104c0
5 changed files with 601 additions and 2 deletions

View file

@ -60,6 +60,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b52af3cb4058c895d37317bb27508dccc8e5f2d39454016b297bf4a400597b8"
dependencies = [
"axum-core",
"base64",
"bytes",
"form_urlencoded",
"futures-util",
@ -79,8 +80,10 @@ dependencies = [
"serde_json",
"serde_path_to_error",
"serde_urlencoded",
"sha1",
"sync_wrapper",
"tokio",
"tokio-tungstenite",
"tower",
"tower-layer",
"tower-service",
@ -311,6 +314,12 @@ dependencies = [
"typenum",
]
[[package]]
name = "data-encoding"
version = "2.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d7a1e2f27636f116493b8b860f5546edb47c8d8f8ea73e1d2a20be88e28d1fea"
[[package]]
name = "der"
version = "0.7.10"
@ -2418,6 +2427,18 @@ dependencies = [
"tokio",
]
[[package]]
name = "tokio-tungstenite"
version = "0.28.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d25a406cddcc431a75d3d9afc6a7c0f7428d4891dd973e4d54c56b46127bf857"
dependencies = [
"futures-util",
"log",
"tokio",
"tungstenite",
]
[[package]]
name = "tokio-util"
version = "0.7.18"
@ -2559,6 +2580,23 @@ version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b"
[[package]]
name = "tungstenite"
version = "0.28.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8628dcc84e5a09eb3d8423d6cb682965dea9133204e8fb3efee74c2a0c259442"
dependencies = [
"bytes",
"data-encoding",
"http",
"httparse",
"log",
"rand 0.9.2",
"sha1",
"thiserror",
"utf-8",
]
[[package]]
name = "typenum"
version = "1.19.0"
@ -2634,6 +2672,12 @@ dependencies = [
"serde",
]
[[package]]
name = "utf-8"
version = "0.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9"
[[package]]
name = "utf8_iter"
version = "1.0.4"

View file

@ -4,7 +4,7 @@ version = "0.1.0"
edition = "2024"
[dependencies]
axum = { version = "0.8", features = ["multipart"] }
axum = { version = "0.8", features = ["multipart", "ws"] }
tokio = { version = "1", features = ["full"] }
sqlx = { version = "0.8", features = ["runtime-tokio", "tls-rustls", "postgres", "uuid", "chrono", "json"] }
serde = { version = "1", features = ["derive"] }

View file

@ -24,6 +24,7 @@ mod rss;
mod serving;
mod stdb;
pub mod summarize;
pub mod ws;
pub mod tiptap;
pub mod transcribe;
pub mod tts;
@ -42,6 +43,7 @@ use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilte
use auth::{AuthUser, JwksKeys};
use cas::CasStore;
use stdb::StdbClient;
use ws::WsBroadcast;
#[derive(Clone)]
pub struct AppState {
@ -54,6 +56,7 @@ pub struct AppState {
pub maintenance: maintenance::MaintenanceState,
pub priority_rules: resources::PriorityRules,
pub metrics: metrics::MetricsCollector,
pub ws_broadcast: WsBroadcast,
}
#[derive(Serialize)]
@ -194,12 +197,19 @@ async fn main() {
cas::start_tmp_cleanup_loop(cas.clone());
let dynamic_page_cache = publishing::new_dynamic_page_cache();
let metrics = metrics::MetricsCollector::new();
let state = AppState { db, jwks, stdb, cas, index_cache, dynamic_page_cache, maintenance, priority_rules, metrics };
// WebSocket broadcast-kanal og PG LISTEN/NOTIFY-lytter (oppgave 22.1)
let ws_broadcast = WsBroadcast::new();
ws::start_pg_listener(db.clone(), ws_broadcast.clone());
let state = AppState { db, jwks, stdb, cas, index_cache, dynamic_page_cache, maintenance, priority_rules, metrics, ws_broadcast };
// Ruter: /health er offentlig, /me krever gyldig JWT
let app = Router::new()
.route("/health", get(health))
.route("/me", get(me))
// WebSocket-endepunkt for sanntid via PG LISTEN/NOTIFY (oppgave 22.1)
.route("/ws", get(ws::ws_handler))
.route("/intentions/create_node", post(intentions::create_node))
.route("/intentions/create_edge", post(intentions::create_edge))
.route("/intentions/update_node", post(intentions::update_node))

435
maskinrommet/src/ws.rs Normal file
View file

@ -0,0 +1,435 @@
//! WebSocket-lag for sanntid via PG LISTEN/NOTIFY.
//!
//! Portvokteren lytter på `node_changed`, `edge_changed` og `access_changed`
//! kanaler i PostgreSQL og videresender relevante endringer til tilkoblede
//! WebSocket-klienter, filtrert på tilgangsmatrisen (node_access).
//!
//! Fase M1: Parallell med SpacetimeDB for verifisering.
//! Ref: docs/retninger/datalaget.md
use std::sync::Arc;
use axum::{
extract::{
ws::{Message, WebSocket},
State, WebSocketUpgrade,
},
response::Response,
};
use serde::{Deserialize, Serialize};
use sqlx::postgres::PgListener;
use sqlx::PgPool;
use tokio::sync::broadcast;
use uuid::Uuid;
use crate::auth::AuthUser;
// ---------------------------------------------------------------------------
// Typer for NOTIFY-payloads
// ---------------------------------------------------------------------------
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NodeChanged {
pub op: String,
pub id: Uuid,
pub kind: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EdgeChanged {
pub op: String,
pub id: Uuid,
pub source_id: Uuid,
pub target_id: Uuid,
pub edge_type: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AccessChanged {
pub op: String,
pub subject_id: Uuid,
pub object_id: Uuid,
#[serde(skip_serializing_if = "Option::is_none")]
pub access: Option<String>,
}
/// Samlet WebSocket-melding sendt til klienter.
#[derive(Debug, Clone, Serialize)]
#[serde(tag = "type")]
pub enum WsEvent {
#[serde(rename = "node_changed")]
NodeChanged(NodeChanged),
#[serde(rename = "edge_changed")]
EdgeChanged(EdgeChanged),
#[serde(rename = "access_changed")]
AccessChanged(AccessChanged),
/// Initiell snapshot av alle noder, edges og access for denne brukeren.
#[serde(rename = "initial_sync")]
InitialSync {
nodes: Vec<serde_json::Value>,
edges: Vec<serde_json::Value>,
access: Vec<serde_json::Value>,
},
}
// ---------------------------------------------------------------------------
// Broadcast-kanal for alle NOTIFY-events
// ---------------------------------------------------------------------------
/// Delt broadcast-kanal som PG-lytteren publiserer til.
/// Alle WebSocket-tilkoblinger abonnerer.
#[derive(Clone)]
pub struct WsBroadcast {
tx: broadcast::Sender<WsEvent>,
}
impl WsBroadcast {
pub fn new() -> Self {
// 4096 meldinger i buffer — dropper eldste ved overflyt
let (tx, _) = broadcast::channel(4096);
Self { tx }
}
}
// ---------------------------------------------------------------------------
// PG LISTEN-bakgrunnstråd
// ---------------------------------------------------------------------------
/// Start bakgrunnsoppgave som lytter på PG NOTIFY og publiserer til broadcast.
pub fn start_pg_listener(db: PgPool, ws: WsBroadcast) {
tokio::spawn(async move {
loop {
if let Err(e) = pg_listen_loop(&db, &ws).await {
tracing::error!("PG LISTEN feilet: {e}, prøver igjen om 2s");
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
}
}
});
}
async fn pg_listen_loop(db: &PgPool, ws: &WsBroadcast) -> Result<(), sqlx::Error> {
let mut listener = PgListener::connect_with(db).await?;
listener
.listen_all(["node_changed", "edge_changed", "access_changed"])
.await?;
tracing::info!("PG LISTEN startet — lytter på node_changed, edge_changed, access_changed");
loop {
let notification = listener.recv().await?;
let channel = notification.channel();
let payload = notification.payload();
let event = match channel {
"node_changed" => match serde_json::from_str::<NodeChanged>(payload) {
Ok(n) => WsEvent::NodeChanged(n),
Err(e) => {
tracing::warn!("Ugyldig node_changed-payload: {e}");
continue;
}
},
"edge_changed" => match serde_json::from_str::<EdgeChanged>(payload) {
Ok(e) => WsEvent::EdgeChanged(e),
Err(e) => {
tracing::warn!("Ugyldig edge_changed-payload: {e}");
continue;
}
},
"access_changed" => match serde_json::from_str::<AccessChanged>(payload) {
Ok(a) => WsEvent::AccessChanged(a),
Err(e) => {
tracing::warn!("Ugyldig access_changed-payload: {e}");
continue;
}
},
_ => continue,
};
// Broadcast til alle tilkoblede klienter (filtrering skjer per klient)
let _ = ws.tx.send(event);
}
}
// ---------------------------------------------------------------------------
// WebSocket-endepunkt
// ---------------------------------------------------------------------------
/// GET /ws — WebSocket-oppgradering for sanntidsstrøm.
/// Krever gyldig JWT (AuthUser-ekstraktor).
pub async fn ws_handler(
ws: WebSocketUpgrade,
user: AuthUser,
State(state): State<crate::AppState>,
) -> Response {
tracing::info!(
node_id = %user.node_id,
"WebSocket-tilkobling fra bruker"
);
ws.on_upgrade(move |socket| handle_socket(socket, user, state))
}
/// Håndter en individuell WebSocket-tilkobling.
async fn handle_socket(mut socket: WebSocket, user: AuthUser, state: crate::AppState) {
let user_id = user.node_id;
// Last brukerens tilgangsmatrise fra PG
let visible_nodes = match load_visible_nodes(&state.db, user_id).await {
Ok(v) => Arc::new(tokio::sync::RwLock::new(v)),
Err(e) => {
tracing::error!("Kunne ikke laste tilgangsmatrise: {e}");
let _ = socket
.send(Message::Close(None))
.await;
return;
}
};
// Send initiell snapshot
match build_initial_sync(&state.db, user_id).await {
Ok(sync) => {
let json = serde_json::to_string(&sync).unwrap_or_default();
if socket.send(Message::Text(json.into())).await.is_err() {
return;
}
}
Err(e) => {
tracing::error!("Feil ved bygging av initial_sync: {e}");
}
}
// Abonner på broadcast
let mut rx = state.ws_broadcast.tx.subscribe();
loop {
tokio::select! {
// Motta broadcast-events
result = rx.recv() => {
match result {
Ok(event) => {
let should_send = should_send_to_user(&event, user_id, &visible_nodes).await;
if should_send {
// Oppdater tilgangsmatrise ved access-endringer
if let WsEvent::AccessChanged(ref ac) = event {
let mut vn = visible_nodes.write().await;
if ac.subject_id == user_id {
if ac.op == "DELETE" {
vn.remove(&ac.object_id);
} else {
vn.insert(ac.object_id);
}
}
}
let json = match serde_json::to_string(&event) {
Ok(j) => j,
Err(_) => continue,
};
if socket.send(Message::Text(json.into())).await.is_err() {
break; // Klient frakoblet
}
}
}
Err(broadcast::error::RecvError::Lagged(n)) => {
tracing::warn!(
node_id = %user_id,
skipped = n,
"WebSocket-klient sakket etter, mistet {n} meldinger"
);
// Kunne sendt full resync her, men for M1 er det OK
}
Err(broadcast::error::RecvError::Closed) => break,
}
}
// Motta meldinger fra klienten (ping/pong, close)
msg = socket.recv() => {
match msg {
Some(Ok(Message::Close(_))) | None => break,
Some(Ok(Message::Ping(data))) => {
if socket.send(Message::Pong(data)).await.is_err() {
break;
}
}
Some(Err(_)) => break,
_ => {} // Ignorerer tekst/binær fra klient i M1
}
}
}
}
tracing::info!(node_id = %user_id, "WebSocket-tilkobling lukket");
}
// ---------------------------------------------------------------------------
// Tilgangskontroll
// ---------------------------------------------------------------------------
/// Sjekk om en event skal sendes til denne brukeren.
async fn should_send_to_user(
event: &WsEvent,
user_id: Uuid,
visible_nodes: &tokio::sync::RwLock<std::collections::HashSet<Uuid>>,
) -> bool {
let vn = visible_nodes.read().await;
match event {
WsEvent::NodeChanged(n) => vn.contains(&n.id),
WsEvent::EdgeChanged(e) => vn.contains(&e.source_id) || vn.contains(&e.target_id),
WsEvent::AccessChanged(a) => a.subject_id == user_id,
WsEvent::InitialSync { .. } => true,
}
}
/// Last alle node-IDer som brukeren har tilgang til.
async fn load_visible_nodes(
db: &PgPool,
user_id: Uuid,
) -> Result<std::collections::HashSet<Uuid>, sqlx::Error> {
// Brukerens egne noder + noder via node_access + offentlige noder
let rows = sqlx::query_scalar::<_, Uuid>(
r#"
SELECT id FROM nodes WHERE created_by = $1
UNION
SELECT object_id FROM node_access WHERE subject_id = $1
UNION
SELECT id FROM nodes WHERE visibility IN ('readable', 'open')
"#,
)
.bind(user_id)
.fetch_all(db)
.await?;
Ok(rows.into_iter().collect())
}
/// Bygg initial_sync-melding med alle data brukeren kan se.
async fn build_initial_sync(
db: &PgPool,
user_id: Uuid,
) -> Result<WsEvent, sqlx::Error> {
// Noder: egne + tilgang + offentlige
let nodes = sqlx::query_as::<_, NodeRow>(
r#"
SELECT n.id, n.node_kind, n.title, n.content, n.visibility::text,
n.metadata, n.created_at, n.created_by
FROM nodes n
WHERE n.created_by = $1
OR n.id IN (SELECT object_id FROM node_access WHERE subject_id = $1)
OR n.visibility IN ('readable', 'open')
"#,
)
.bind(user_id)
.fetch_all(db)
.await?;
let node_ids: Vec<Uuid> = nodes.iter().map(|n| n.id).collect();
// Edges der minst én side er synlig
let edges = sqlx::query_as::<_, EdgeRow>(
r#"
SELECT e.id, e.source_id, e.target_id, e.edge_type,
e.metadata, e.system, e.created_at, e.created_by
FROM edges e
WHERE e.source_id = ANY($1) OR e.target_id = ANY($1)
"#,
)
.bind(&node_ids)
.fetch_all(db)
.await?;
// Access-entries for denne brukeren
let access = sqlx::query_as::<_, AccessRow>(
r#"
SELECT subject_id, object_id, access::text, via_edge
FROM node_access
WHERE subject_id = $1
"#,
)
.bind(user_id)
.fetch_all(db)
.await?;
Ok(WsEvent::InitialSync {
nodes: nodes.into_iter().map(|n| n.to_json()).collect(),
edges: edges.into_iter().map(|e| e.to_json()).collect(),
access: access.into_iter().map(|a| a.to_json()).collect(),
})
}
// ---------------------------------------------------------------------------
// DB-rader for initial_sync
// ---------------------------------------------------------------------------
#[derive(sqlx::FromRow)]
struct NodeRow {
id: Uuid,
node_kind: String,
title: Option<String>,
content: Option<String>,
visibility: String,
metadata: serde_json::Value,
created_at: chrono::DateTime<chrono::Utc>,
created_by: Option<Uuid>,
}
impl NodeRow {
fn to_json(&self) -> serde_json::Value {
serde_json::json!({
"id": self.id.to_string(),
"nodeKind": self.node_kind,
"title": self.title.as_deref().unwrap_or(""),
"content": self.content.as_deref().unwrap_or(""),
"visibility": self.visibility,
"metadata": self.metadata.to_string(),
"createdAt": self.created_at.timestamp_micros(),
"createdBy": self.created_by.map(|u| u.to_string()).unwrap_or_default(),
})
}
}
#[derive(sqlx::FromRow)]
struct EdgeRow {
id: Uuid,
source_id: Uuid,
target_id: Uuid,
edge_type: String,
metadata: serde_json::Value,
system: bool,
created_at: chrono::DateTime<chrono::Utc>,
created_by: Option<Uuid>,
}
impl EdgeRow {
fn to_json(&self) -> serde_json::Value {
serde_json::json!({
"id": self.id.to_string(),
"sourceId": self.source_id.to_string(),
"targetId": self.target_id.to_string(),
"edgeType": self.edge_type,
"metadata": self.metadata.to_string(),
"system": self.system,
"createdAt": self.created_at.timestamp_micros(),
"createdBy": self.created_by.map(|u| u.to_string()).unwrap_or_default(),
})
}
}
#[derive(sqlx::FromRow)]
struct AccessRow {
subject_id: Uuid,
object_id: Uuid,
access: String,
via_edge: Option<Uuid>,
}
impl AccessRow {
fn to_json(&self) -> serde_json::Value {
// Composite key som STDB bruker: "subject_id:object_id"
serde_json::json!({
"id": format!("{}:{}", self.subject_id, self.object_id),
"subjectId": self.subject_id.to_string(),
"objectId": self.object_id.to_string(),
"access": self.access,
"viaEdge": self.via_edge.map(|u| u.to_string()).unwrap_or_default(),
})
}
}

View file

@ -0,0 +1,110 @@
-- 018_pg_notify_triggers.sql
-- PG LISTEN/NOTIFY triggers for sanntid: nodes, edges og node_access.
--
-- Portvokteren (maskinrommet) lytter på disse kanalene og videresender
-- endringer via WebSocket til tilkoblede klienter.
-- Ref: docs/retninger/datalaget.md (Fase M1)
BEGIN;
-- =============================================================================
-- notify_node_change: Sender NOTIFY ved INSERT, UPDATE eller DELETE på nodes.
-- Payload er JSON med operasjon, node-ID og node_kind.
-- =============================================================================
CREATE OR REPLACE FUNCTION notify_node_change()
RETURNS trigger AS $$
DECLARE
payload json;
target_row nodes%ROWTYPE;
BEGIN
IF TG_OP = 'DELETE' THEN
target_row := OLD;
ELSE
target_row := NEW;
END IF;
payload := json_build_object(
'op', TG_OP,
'id', target_row.id,
'kind', target_row.node_kind
);
PERFORM pg_notify('node_changed', payload::text);
RETURN target_row;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER nodes_notify
AFTER INSERT OR UPDATE OR DELETE ON nodes
FOR EACH ROW EXECUTE FUNCTION notify_node_change();
-- =============================================================================
-- notify_edge_change: Sender NOTIFY ved INSERT, UPDATE eller DELETE på edges.
-- Payload er JSON med operasjon, edge-ID, source, target og type.
-- =============================================================================
CREATE OR REPLACE FUNCTION notify_edge_change()
RETURNS trigger AS $$
DECLARE
payload json;
target_row edges%ROWTYPE;
BEGIN
IF TG_OP = 'DELETE' THEN
target_row := OLD;
ELSE
target_row := NEW;
END IF;
payload := json_build_object(
'op', TG_OP,
'id', target_row.id,
'source_id', target_row.source_id,
'target_id', target_row.target_id,
'edge_type', target_row.edge_type
);
PERFORM pg_notify('edge_changed', payload::text);
RETURN target_row;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER edges_notify
AFTER INSERT OR UPDATE OR DELETE ON edges
FOR EACH ROW EXECUTE FUNCTION notify_edge_change();
-- =============================================================================
-- notify_access_change: Sender NOTIFY ved endringer i node_access.
-- Brukes av portvokteren for å oppdatere klientenes tilgangsmatrise.
-- =============================================================================
CREATE OR REPLACE FUNCTION notify_access_change()
RETURNS trigger AS $$
DECLARE
payload json;
BEGIN
IF TG_OP = 'DELETE' THEN
payload := json_build_object(
'op', TG_OP,
'subject_id', OLD.subject_id,
'object_id', OLD.object_id
);
ELSE
payload := json_build_object(
'op', TG_OP,
'subject_id', NEW.subject_id,
'object_id', NEW.object_id,
'access', NEW.access
);
END IF;
PERFORM pg_notify('access_changed', payload::text);
RETURN COALESCE(NEW, OLD);
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER node_access_notify
AFTER INSERT OR UPDATE OR DELETE ON node_access
FOR EACH ROW EXECUTE FUNCTION notify_access_change();
COMMIT;