From 0ecb7104c0a307f9cf5ea130b51ccc04cc17e554 Mon Sep 17 00:00:00 2001 From: vegard Date: Wed, 18 Mar 2026 11:54:34 +0000 Subject: [PATCH] WebSocket-lag i portvokteren: PG LISTEN/NOTIFY + WS-endepunkt (oppgave 22.1) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implementerer Fase M1 av SpacetimeDB-migrasjonen: - SQL-migrasjon 018: Triggers for notify_node_change, notify_edge_change og notify_access_change på nodes, edges og node_access-tabellene - Ny ws.rs-modul i maskinrommet med: - PG LISTEN bakgrunnsoppgave som lytter på tre kanaler - Broadcast-kanal for å videresende events til alle WS-klienter - WebSocket-endepunkt (/ws) med JWT-autentisering - Initiell snapshot (initial_sync) ved tilkobling - Tilgangskontrollfiltrering per klient via node_access-matrisen - Oppdatert AppState med WsBroadcast og /ws-rute Frontend dual-tilkobling (STDB + nytt WS) kommer i neste commit. --- maskinrommet/Cargo.lock | 44 +++ maskinrommet/Cargo.toml | 2 +- maskinrommet/src/main.rs | 12 +- maskinrommet/src/ws.rs | 435 ++++++++++++++++++++++++++ migrations/018_pg_notify_triggers.sql | 110 +++++++ 5 files changed, 601 insertions(+), 2 deletions(-) create mode 100644 maskinrommet/src/ws.rs create mode 100644 migrations/018_pg_notify_triggers.sql diff --git a/maskinrommet/Cargo.lock b/maskinrommet/Cargo.lock index 9ce5fac..182f407 100644 --- a/maskinrommet/Cargo.lock +++ b/maskinrommet/Cargo.lock @@ -60,6 +60,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b52af3cb4058c895d37317bb27508dccc8e5f2d39454016b297bf4a400597b8" dependencies = [ "axum-core", + "base64", "bytes", "form_urlencoded", "futures-util", @@ -79,8 +80,10 @@ dependencies = [ "serde_json", "serde_path_to_error", "serde_urlencoded", + "sha1", "sync_wrapper", "tokio", + "tokio-tungstenite", "tower", "tower-layer", "tower-service", @@ -311,6 +314,12 @@ dependencies = [ "typenum", ] +[[package]] +name = "data-encoding" +version = "2.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7a1e2f27636f116493b8b860f5546edb47c8d8f8ea73e1d2a20be88e28d1fea" + [[package]] name = "der" version = "0.7.10" @@ -2418,6 +2427,18 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-tungstenite" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d25a406cddcc431a75d3d9afc6a7c0f7428d4891dd973e4d54c56b46127bf857" +dependencies = [ + "futures-util", + "log", + "tokio", + "tungstenite", +] + [[package]] name = "tokio-util" version = "0.7.18" @@ -2559,6 +2580,23 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "tungstenite" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8628dcc84e5a09eb3d8423d6cb682965dea9133204e8fb3efee74c2a0c259442" +dependencies = [ + "bytes", + "data-encoding", + "http", + "httparse", + "log", + "rand 0.9.2", + "sha1", + "thiserror", + "utf-8", +] + [[package]] name = "typenum" version = "1.19.0" @@ -2634,6 +2672,12 @@ dependencies = [ "serde", ] +[[package]] +name = "utf-8" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" + [[package]] name = "utf8_iter" version = "1.0.4" diff --git a/maskinrommet/Cargo.toml b/maskinrommet/Cargo.toml index 05cbde8..0b74707 100644 --- a/maskinrommet/Cargo.toml +++ b/maskinrommet/Cargo.toml @@ -4,7 +4,7 @@ version = "0.1.0" edition = "2024" [dependencies] -axum = { version = "0.8", features = ["multipart"] } +axum = { version = "0.8", features = ["multipart", "ws"] } tokio = { version = "1", features = ["full"] } sqlx = { version = "0.8", features = ["runtime-tokio", "tls-rustls", "postgres", "uuid", "chrono", "json"] } serde = { version = "1", features = ["derive"] } diff --git a/maskinrommet/src/main.rs b/maskinrommet/src/main.rs index 3564026..1e0e62a 100644 --- a/maskinrommet/src/main.rs +++ b/maskinrommet/src/main.rs @@ -24,6 +24,7 @@ mod rss; mod serving; mod stdb; pub mod summarize; +pub mod ws; pub mod tiptap; pub mod transcribe; pub mod tts; @@ -42,6 +43,7 @@ use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilte use auth::{AuthUser, JwksKeys}; use cas::CasStore; use stdb::StdbClient; +use ws::WsBroadcast; #[derive(Clone)] pub struct AppState { @@ -54,6 +56,7 @@ pub struct AppState { pub maintenance: maintenance::MaintenanceState, pub priority_rules: resources::PriorityRules, pub metrics: metrics::MetricsCollector, + pub ws_broadcast: WsBroadcast, } #[derive(Serialize)] @@ -194,12 +197,19 @@ async fn main() { cas::start_tmp_cleanup_loop(cas.clone()); let dynamic_page_cache = publishing::new_dynamic_page_cache(); let metrics = metrics::MetricsCollector::new(); - let state = AppState { db, jwks, stdb, cas, index_cache, dynamic_page_cache, maintenance, priority_rules, metrics }; + + // WebSocket broadcast-kanal og PG LISTEN/NOTIFY-lytter (oppgave 22.1) + let ws_broadcast = WsBroadcast::new(); + ws::start_pg_listener(db.clone(), ws_broadcast.clone()); + + let state = AppState { db, jwks, stdb, cas, index_cache, dynamic_page_cache, maintenance, priority_rules, metrics, ws_broadcast }; // Ruter: /health er offentlig, /me krever gyldig JWT let app = Router::new() .route("/health", get(health)) .route("/me", get(me)) + // WebSocket-endepunkt for sanntid via PG LISTEN/NOTIFY (oppgave 22.1) + .route("/ws", get(ws::ws_handler)) .route("/intentions/create_node", post(intentions::create_node)) .route("/intentions/create_edge", post(intentions::create_edge)) .route("/intentions/update_node", post(intentions::update_node)) diff --git a/maskinrommet/src/ws.rs b/maskinrommet/src/ws.rs new file mode 100644 index 0000000..4431dca --- /dev/null +++ b/maskinrommet/src/ws.rs @@ -0,0 +1,435 @@ +//! WebSocket-lag for sanntid via PG LISTEN/NOTIFY. +//! +//! Portvokteren lytter på `node_changed`, `edge_changed` og `access_changed` +//! kanaler i PostgreSQL og videresender relevante endringer til tilkoblede +//! WebSocket-klienter, filtrert på tilgangsmatrisen (node_access). +//! +//! Fase M1: Parallell med SpacetimeDB for verifisering. +//! Ref: docs/retninger/datalaget.md + +use std::sync::Arc; + +use axum::{ + extract::{ + ws::{Message, WebSocket}, + State, WebSocketUpgrade, + }, + response::Response, +}; +use serde::{Deserialize, Serialize}; +use sqlx::postgres::PgListener; +use sqlx::PgPool; +use tokio::sync::broadcast; +use uuid::Uuid; + +use crate::auth::AuthUser; + +// --------------------------------------------------------------------------- +// Typer for NOTIFY-payloads +// --------------------------------------------------------------------------- + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct NodeChanged { + pub op: String, + pub id: Uuid, + pub kind: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct EdgeChanged { + pub op: String, + pub id: Uuid, + pub source_id: Uuid, + pub target_id: Uuid, + pub edge_type: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct AccessChanged { + pub op: String, + pub subject_id: Uuid, + pub object_id: Uuid, + #[serde(skip_serializing_if = "Option::is_none")] + pub access: Option, +} + +/// Samlet WebSocket-melding sendt til klienter. +#[derive(Debug, Clone, Serialize)] +#[serde(tag = "type")] +pub enum WsEvent { + #[serde(rename = "node_changed")] + NodeChanged(NodeChanged), + #[serde(rename = "edge_changed")] + EdgeChanged(EdgeChanged), + #[serde(rename = "access_changed")] + AccessChanged(AccessChanged), + /// Initiell snapshot av alle noder, edges og access for denne brukeren. + #[serde(rename = "initial_sync")] + InitialSync { + nodes: Vec, + edges: Vec, + access: Vec, + }, +} + +// --------------------------------------------------------------------------- +// Broadcast-kanal for alle NOTIFY-events +// --------------------------------------------------------------------------- + +/// Delt broadcast-kanal som PG-lytteren publiserer til. +/// Alle WebSocket-tilkoblinger abonnerer. +#[derive(Clone)] +pub struct WsBroadcast { + tx: broadcast::Sender, +} + +impl WsBroadcast { + pub fn new() -> Self { + // 4096 meldinger i buffer — dropper eldste ved overflyt + let (tx, _) = broadcast::channel(4096); + Self { tx } + } +} + +// --------------------------------------------------------------------------- +// PG LISTEN-bakgrunnstråd +// --------------------------------------------------------------------------- + +/// Start bakgrunnsoppgave som lytter på PG NOTIFY og publiserer til broadcast. +pub fn start_pg_listener(db: PgPool, ws: WsBroadcast) { + tokio::spawn(async move { + loop { + if let Err(e) = pg_listen_loop(&db, &ws).await { + tracing::error!("PG LISTEN feilet: {e}, prøver igjen om 2s"); + tokio::time::sleep(std::time::Duration::from_secs(2)).await; + } + } + }); +} + +async fn pg_listen_loop(db: &PgPool, ws: &WsBroadcast) -> Result<(), sqlx::Error> { + let mut listener = PgListener::connect_with(db).await?; + listener + .listen_all(["node_changed", "edge_changed", "access_changed"]) + .await?; + + tracing::info!("PG LISTEN startet — lytter på node_changed, edge_changed, access_changed"); + + loop { + let notification = listener.recv().await?; + let channel = notification.channel(); + let payload = notification.payload(); + + let event = match channel { + "node_changed" => match serde_json::from_str::(payload) { + Ok(n) => WsEvent::NodeChanged(n), + Err(e) => { + tracing::warn!("Ugyldig node_changed-payload: {e}"); + continue; + } + }, + "edge_changed" => match serde_json::from_str::(payload) { + Ok(e) => WsEvent::EdgeChanged(e), + Err(e) => { + tracing::warn!("Ugyldig edge_changed-payload: {e}"); + continue; + } + }, + "access_changed" => match serde_json::from_str::(payload) { + Ok(a) => WsEvent::AccessChanged(a), + Err(e) => { + tracing::warn!("Ugyldig access_changed-payload: {e}"); + continue; + } + }, + _ => continue, + }; + + // Broadcast til alle tilkoblede klienter (filtrering skjer per klient) + let _ = ws.tx.send(event); + } +} + +// --------------------------------------------------------------------------- +// WebSocket-endepunkt +// --------------------------------------------------------------------------- + +/// GET /ws — WebSocket-oppgradering for sanntidsstrøm. +/// Krever gyldig JWT (AuthUser-ekstraktor). +pub async fn ws_handler( + ws: WebSocketUpgrade, + user: AuthUser, + State(state): State, +) -> Response { + tracing::info!( + node_id = %user.node_id, + "WebSocket-tilkobling fra bruker" + ); + + ws.on_upgrade(move |socket| handle_socket(socket, user, state)) +} + +/// Håndter en individuell WebSocket-tilkobling. +async fn handle_socket(mut socket: WebSocket, user: AuthUser, state: crate::AppState) { + let user_id = user.node_id; + + // Last brukerens tilgangsmatrise fra PG + let visible_nodes = match load_visible_nodes(&state.db, user_id).await { + Ok(v) => Arc::new(tokio::sync::RwLock::new(v)), + Err(e) => { + tracing::error!("Kunne ikke laste tilgangsmatrise: {e}"); + let _ = socket + .send(Message::Close(None)) + .await; + return; + } + }; + + // Send initiell snapshot + match build_initial_sync(&state.db, user_id).await { + Ok(sync) => { + let json = serde_json::to_string(&sync).unwrap_or_default(); + if socket.send(Message::Text(json.into())).await.is_err() { + return; + } + } + Err(e) => { + tracing::error!("Feil ved bygging av initial_sync: {e}"); + } + } + + // Abonner på broadcast + let mut rx = state.ws_broadcast.tx.subscribe(); + + loop { + tokio::select! { + // Motta broadcast-events + result = rx.recv() => { + match result { + Ok(event) => { + let should_send = should_send_to_user(&event, user_id, &visible_nodes).await; + if should_send { + // Oppdater tilgangsmatrise ved access-endringer + if let WsEvent::AccessChanged(ref ac) = event { + let mut vn = visible_nodes.write().await; + if ac.subject_id == user_id { + if ac.op == "DELETE" { + vn.remove(&ac.object_id); + } else { + vn.insert(ac.object_id); + } + } + } + + let json = match serde_json::to_string(&event) { + Ok(j) => j, + Err(_) => continue, + }; + if socket.send(Message::Text(json.into())).await.is_err() { + break; // Klient frakoblet + } + } + } + Err(broadcast::error::RecvError::Lagged(n)) => { + tracing::warn!( + node_id = %user_id, + skipped = n, + "WebSocket-klient sakket etter, mistet {n} meldinger" + ); + // Kunne sendt full resync her, men for M1 er det OK + } + Err(broadcast::error::RecvError::Closed) => break, + } + } + // Motta meldinger fra klienten (ping/pong, close) + msg = socket.recv() => { + match msg { + Some(Ok(Message::Close(_))) | None => break, + Some(Ok(Message::Ping(data))) => { + if socket.send(Message::Pong(data)).await.is_err() { + break; + } + } + Some(Err(_)) => break, + _ => {} // Ignorerer tekst/binær fra klient i M1 + } + } + } + } + + tracing::info!(node_id = %user_id, "WebSocket-tilkobling lukket"); +} + +// --------------------------------------------------------------------------- +// Tilgangskontroll +// --------------------------------------------------------------------------- + +/// Sjekk om en event skal sendes til denne brukeren. +async fn should_send_to_user( + event: &WsEvent, + user_id: Uuid, + visible_nodes: &tokio::sync::RwLock>, +) -> bool { + let vn = visible_nodes.read().await; + match event { + WsEvent::NodeChanged(n) => vn.contains(&n.id), + WsEvent::EdgeChanged(e) => vn.contains(&e.source_id) || vn.contains(&e.target_id), + WsEvent::AccessChanged(a) => a.subject_id == user_id, + WsEvent::InitialSync { .. } => true, + } +} + +/// Last alle node-IDer som brukeren har tilgang til. +async fn load_visible_nodes( + db: &PgPool, + user_id: Uuid, +) -> Result, sqlx::Error> { + // Brukerens egne noder + noder via node_access + offentlige noder + let rows = sqlx::query_scalar::<_, Uuid>( + r#" + SELECT id FROM nodes WHERE created_by = $1 + UNION + SELECT object_id FROM node_access WHERE subject_id = $1 + UNION + SELECT id FROM nodes WHERE visibility IN ('readable', 'open') + "#, + ) + .bind(user_id) + .fetch_all(db) + .await?; + + Ok(rows.into_iter().collect()) +} + +/// Bygg initial_sync-melding med alle data brukeren kan se. +async fn build_initial_sync( + db: &PgPool, + user_id: Uuid, +) -> Result { + // Noder: egne + tilgang + offentlige + let nodes = sqlx::query_as::<_, NodeRow>( + r#" + SELECT n.id, n.node_kind, n.title, n.content, n.visibility::text, + n.metadata, n.created_at, n.created_by + FROM nodes n + WHERE n.created_by = $1 + OR n.id IN (SELECT object_id FROM node_access WHERE subject_id = $1) + OR n.visibility IN ('readable', 'open') + "#, + ) + .bind(user_id) + .fetch_all(db) + .await?; + + let node_ids: Vec = nodes.iter().map(|n| n.id).collect(); + + // Edges der minst én side er synlig + let edges = sqlx::query_as::<_, EdgeRow>( + r#" + SELECT e.id, e.source_id, e.target_id, e.edge_type, + e.metadata, e.system, e.created_at, e.created_by + FROM edges e + WHERE e.source_id = ANY($1) OR e.target_id = ANY($1) + "#, + ) + .bind(&node_ids) + .fetch_all(db) + .await?; + + // Access-entries for denne brukeren + let access = sqlx::query_as::<_, AccessRow>( + r#" + SELECT subject_id, object_id, access::text, via_edge + FROM node_access + WHERE subject_id = $1 + "#, + ) + .bind(user_id) + .fetch_all(db) + .await?; + + Ok(WsEvent::InitialSync { + nodes: nodes.into_iter().map(|n| n.to_json()).collect(), + edges: edges.into_iter().map(|e| e.to_json()).collect(), + access: access.into_iter().map(|a| a.to_json()).collect(), + }) +} + +// --------------------------------------------------------------------------- +// DB-rader for initial_sync +// --------------------------------------------------------------------------- + +#[derive(sqlx::FromRow)] +struct NodeRow { + id: Uuid, + node_kind: String, + title: Option, + content: Option, + visibility: String, + metadata: serde_json::Value, + created_at: chrono::DateTime, + created_by: Option, +} + +impl NodeRow { + fn to_json(&self) -> serde_json::Value { + serde_json::json!({ + "id": self.id.to_string(), + "nodeKind": self.node_kind, + "title": self.title.as_deref().unwrap_or(""), + "content": self.content.as_deref().unwrap_or(""), + "visibility": self.visibility, + "metadata": self.metadata.to_string(), + "createdAt": self.created_at.timestamp_micros(), + "createdBy": self.created_by.map(|u| u.to_string()).unwrap_or_default(), + }) + } +} + +#[derive(sqlx::FromRow)] +struct EdgeRow { + id: Uuid, + source_id: Uuid, + target_id: Uuid, + edge_type: String, + metadata: serde_json::Value, + system: bool, + created_at: chrono::DateTime, + created_by: Option, +} + +impl EdgeRow { + fn to_json(&self) -> serde_json::Value { + serde_json::json!({ + "id": self.id.to_string(), + "sourceId": self.source_id.to_string(), + "targetId": self.target_id.to_string(), + "edgeType": self.edge_type, + "metadata": self.metadata.to_string(), + "system": self.system, + "createdAt": self.created_at.timestamp_micros(), + "createdBy": self.created_by.map(|u| u.to_string()).unwrap_or_default(), + }) + } +} + +#[derive(sqlx::FromRow)] +struct AccessRow { + subject_id: Uuid, + object_id: Uuid, + access: String, + via_edge: Option, +} + +impl AccessRow { + fn to_json(&self) -> serde_json::Value { + // Composite key som STDB bruker: "subject_id:object_id" + serde_json::json!({ + "id": format!("{}:{}", self.subject_id, self.object_id), + "subjectId": self.subject_id.to_string(), + "objectId": self.object_id.to_string(), + "access": self.access, + "viaEdge": self.via_edge.map(|u| u.to_string()).unwrap_or_default(), + }) + } +} diff --git a/migrations/018_pg_notify_triggers.sql b/migrations/018_pg_notify_triggers.sql new file mode 100644 index 0000000..ea90ac1 --- /dev/null +++ b/migrations/018_pg_notify_triggers.sql @@ -0,0 +1,110 @@ +-- 018_pg_notify_triggers.sql +-- PG LISTEN/NOTIFY triggers for sanntid: nodes, edges og node_access. +-- +-- Portvokteren (maskinrommet) lytter på disse kanalene og videresender +-- endringer via WebSocket til tilkoblede klienter. +-- Ref: docs/retninger/datalaget.md (Fase M1) + +BEGIN; + +-- ============================================================================= +-- notify_node_change: Sender NOTIFY ved INSERT, UPDATE eller DELETE på nodes. +-- Payload er JSON med operasjon, node-ID og node_kind. +-- ============================================================================= + +CREATE OR REPLACE FUNCTION notify_node_change() +RETURNS trigger AS $$ +DECLARE + payload json; + target_row nodes%ROWTYPE; +BEGIN + IF TG_OP = 'DELETE' THEN + target_row := OLD; + ELSE + target_row := NEW; + END IF; + + payload := json_build_object( + 'op', TG_OP, + 'id', target_row.id, + 'kind', target_row.node_kind + ); + + PERFORM pg_notify('node_changed', payload::text); + RETURN target_row; +END; +$$ LANGUAGE plpgsql; + +CREATE TRIGGER nodes_notify + AFTER INSERT OR UPDATE OR DELETE ON nodes + FOR EACH ROW EXECUTE FUNCTION notify_node_change(); + +-- ============================================================================= +-- notify_edge_change: Sender NOTIFY ved INSERT, UPDATE eller DELETE på edges. +-- Payload er JSON med operasjon, edge-ID, source, target og type. +-- ============================================================================= + +CREATE OR REPLACE FUNCTION notify_edge_change() +RETURNS trigger AS $$ +DECLARE + payload json; + target_row edges%ROWTYPE; +BEGIN + IF TG_OP = 'DELETE' THEN + target_row := OLD; + ELSE + target_row := NEW; + END IF; + + payload := json_build_object( + 'op', TG_OP, + 'id', target_row.id, + 'source_id', target_row.source_id, + 'target_id', target_row.target_id, + 'edge_type', target_row.edge_type + ); + + PERFORM pg_notify('edge_changed', payload::text); + RETURN target_row; +END; +$$ LANGUAGE plpgsql; + +CREATE TRIGGER edges_notify + AFTER INSERT OR UPDATE OR DELETE ON edges + FOR EACH ROW EXECUTE FUNCTION notify_edge_change(); + +-- ============================================================================= +-- notify_access_change: Sender NOTIFY ved endringer i node_access. +-- Brukes av portvokteren for å oppdatere klientenes tilgangsmatrise. +-- ============================================================================= + +CREATE OR REPLACE FUNCTION notify_access_change() +RETURNS trigger AS $$ +DECLARE + payload json; +BEGIN + IF TG_OP = 'DELETE' THEN + payload := json_build_object( + 'op', TG_OP, + 'subject_id', OLD.subject_id, + 'object_id', OLD.object_id + ); + ELSE + payload := json_build_object( + 'op', TG_OP, + 'subject_id', NEW.subject_id, + 'object_id', NEW.object_id, + 'access', NEW.access + ); + END IF; + + PERFORM pg_notify('access_changed', payload::text); + RETURN COALESCE(NEW, OLD); +END; +$$ LANGUAGE plpgsql; + +CREATE TRIGGER node_access_notify + AFTER INSERT OR UPDATE OR DELETE ON node_access + FOR EACH ROW EXECUTE FUNCTION notify_access_change(); + +COMMIT;