//! WebSocket-lag for sanntid via PG LISTEN/NOTIFY. //! //! Portvokteren lytter på `node_changed`, `edge_changed`, `access_changed` //! og `mixer_channel_changed` kanaler i PostgreSQL og videresender relevante //! endringer via WebSocket til tilkoblede klienter, filtrert på tilgangsmatrisen. //! //! Events berikes med full raddata fra PG slik at klienten kan oppdatere stores direkte. //! Ref: docs/retninger/datalaget.md use std::sync::Arc; use axum::{ extract::{ ws::{Message, WebSocket}, State, WebSocketUpgrade, }, response::Response, }; use serde::{Deserialize, Serialize}; use sqlx::postgres::PgListener; use sqlx::PgPool; use tokio::sync::broadcast; use uuid::Uuid; // --------------------------------------------------------------------------- // Typer for NOTIFY-payloads (minimale, fra PG triggers) // --------------------------------------------------------------------------- #[derive(Debug, Clone, Deserialize)] struct NodeNotify { op: String, id: Uuid, kind: String, } #[derive(Debug, Clone, Deserialize)] struct EdgeNotify { op: String, id: Uuid, source_id: Uuid, target_id: Uuid, edge_type: String, } #[derive(Debug, Clone, Deserialize)] struct AccessNotify { op: String, subject_id: Uuid, object_id: Uuid, access: Option, } #[derive(Debug, Clone, Deserialize)] struct MixerChannelNotify { op: String, room_id: String, target_user_id: String, } // --------------------------------------------------------------------------- // Berikede WsEvent-typer (sendt til klienter, med full raddata) // --------------------------------------------------------------------------- /// WebSocket-melding sendt til klienter. Berikes med full raddata for /// INSERT/UPDATE slik at frontend kan oppdatere stores direkte. #[derive(Debug, Clone, Serialize)] #[serde(tag = "type")] pub enum WsEvent { #[serde(rename = "node_changed")] NodeChanged { op: String, id: String, kind: String, /// Full node-data for INSERT/UPDATE, None for DELETE. #[serde(skip_serializing_if = "Option::is_none")] node: Option, }, #[serde(rename = "edge_changed")] EdgeChanged { op: String, id: String, source_id: String, target_id: String, edge_type: String, /// Full edge-data for INSERT/UPDATE, None for DELETE. #[serde(skip_serializing_if = "Option::is_none")] edge: Option, }, #[serde(rename = "access_changed")] AccessChanged { op: String, subject_id: String, object_id: String, #[serde(skip_serializing_if = "Option::is_none")] access: Option, /// Full access-rad for INSERT/UPDATE. #[serde(skip_serializing_if = "Option::is_none")] row: Option, }, #[serde(rename = "mixer_channel_changed")] MixerChannelChanged { op: String, room_id: String, target_user_id: String, /// Full mixer_channel-data for INSERT/UPDATE, None for DELETE. #[serde(skip_serializing_if = "Option::is_none")] channel: Option, }, /// Initiell snapshot av alle noder, edges, access og mixer_channels. #[serde(rename = "initial_sync")] InitialSync { nodes: Vec, edges: Vec, access: Vec, mixer_channels: Vec, }, } /// Intern enum for broadcast — bærer UUID-er for filtrering. /// Konverteres til WsEvent ved serialisering. #[derive(Debug, Clone)] enum BroadcastEvent { NodeChanged { op: String, id: Uuid, kind: String, node: Option, }, EdgeChanged { op: String, id: Uuid, source_id: Uuid, target_id: Uuid, edge_type: String, edge: Option, }, AccessChanged { op: String, subject_id: Uuid, object_id: Uuid, access: Option, row: Option, }, MixerChannelChanged { op: String, room_id: String, target_user_id: String, channel: Option, }, } impl BroadcastEvent { fn to_ws_event(&self) -> WsEvent { match self { BroadcastEvent::NodeChanged { op, id, kind, node } => WsEvent::NodeChanged { op: op.clone(), id: id.to_string(), kind: kind.clone(), node: node.clone(), }, BroadcastEvent::EdgeChanged { op, id, source_id, target_id, edge_type, edge } => WsEvent::EdgeChanged { op: op.clone(), id: id.to_string(), source_id: source_id.to_string(), target_id: target_id.to_string(), edge_type: edge_type.clone(), edge: edge.clone(), }, BroadcastEvent::AccessChanged { op, subject_id, object_id, access, row } => WsEvent::AccessChanged { op: op.clone(), subject_id: subject_id.to_string(), object_id: object_id.to_string(), access: access.clone(), row: row.clone(), }, BroadcastEvent::MixerChannelChanged { op, room_id, target_user_id, channel } => WsEvent::MixerChannelChanged { op: op.clone(), room_id: room_id.clone(), target_user_id: target_user_id.clone(), channel: channel.clone(), }, } } } // --------------------------------------------------------------------------- // Broadcast-kanal // --------------------------------------------------------------------------- #[derive(Clone)] pub struct WsBroadcast { tx: broadcast::Sender, } impl WsBroadcast { pub fn new() -> Self { let (tx, _) = broadcast::channel(4096); Self { tx } } } // --------------------------------------------------------------------------- // PG LISTEN + berikelse // --------------------------------------------------------------------------- pub fn start_pg_listener(db: PgPool, ws: WsBroadcast) { tokio::spawn(async move { loop { if let Err(e) = pg_listen_loop(&db, &ws).await { tracing::error!("PG LISTEN feilet: {e}, prøver igjen om 2s"); tokio::time::sleep(std::time::Duration::from_secs(2)).await; } } }); } async fn pg_listen_loop(db: &PgPool, ws: &WsBroadcast) -> Result<(), sqlx::Error> { let mut listener = PgListener::connect_with(db).await?; listener .listen_all(["node_changed", "edge_changed", "access_changed", "mixer_channel_changed"]) .await?; tracing::info!("PG LISTEN startet — lytter på node_changed, edge_changed, access_changed, mixer_channel_changed"); loop { let notification = listener.recv().await?; let channel = notification.channel(); let payload = notification.payload(); let event = match channel { "node_changed" => { let n: NodeNotify = match serde_json::from_str(payload) { Ok(v) => v, Err(e) => { tracing::warn!("Ugyldig node_changed-payload: {e}"); continue; } }; let node = if n.op != "DELETE" { enrich_node(db, n.id).await } else { None }; BroadcastEvent::NodeChanged { op: n.op, id: n.id, kind: n.kind, node } } "edge_changed" => { let e: EdgeNotify = match serde_json::from_str(payload) { Ok(v) => v, Err(e) => { tracing::warn!("Ugyldig edge_changed-payload: {e}"); continue; } }; let edge = if e.op != "DELETE" { enrich_edge(db, e.id).await } else { None }; BroadcastEvent::EdgeChanged { op: e.op, id: e.id, source_id: e.source_id, target_id: e.target_id, edge_type: e.edge_type, edge, } } "access_changed" => { let a: AccessNotify = match serde_json::from_str(payload) { Ok(v) => v, Err(e) => { tracing::warn!("Ugyldig access_changed-payload: {e}"); continue; } }; let row = if a.op != "DELETE" { enrich_access(db, a.subject_id, a.object_id).await } else { None }; BroadcastEvent::AccessChanged { op: a.op, subject_id: a.subject_id, object_id: a.object_id, access: a.access, row, } } "mixer_channel_changed" => { let m: MixerChannelNotify = match serde_json::from_str(payload) { Ok(v) => v, Err(e) => { tracing::warn!("Ugyldig mixer_channel_changed-payload: {e}"); continue; } }; let channel_data = if m.op != "DELETE" { enrich_mixer_channel(db, &m.room_id, &m.target_user_id).await } else { None }; BroadcastEvent::MixerChannelChanged { op: m.op, room_id: m.room_id, target_user_id: m.target_user_id, channel: channel_data, } } _ => continue, }; let _ = ws.tx.send(event); } } // --------------------------------------------------------------------------- // Berikelse: hent full rad fra PG etter NOTIFY // --------------------------------------------------------------------------- async fn enrich_node(db: &PgPool, id: Uuid) -> Option { let row = sqlx::query_as::<_, NodeRow>( "SELECT id, node_kind, title, content, visibility::text, metadata, created_at, created_by FROM nodes WHERE id = $1" ) .bind(id) .fetch_optional(db) .await .ok() .flatten()?; Some(row.to_json()) } async fn enrich_edge(db: &PgPool, id: Uuid) -> Option { let row = sqlx::query_as::<_, EdgeRow>( "SELECT id, source_id, target_id, edge_type, metadata, system, created_at, created_by FROM edges WHERE id = $1" ) .bind(id) .fetch_optional(db) .await .ok() .flatten()?; Some(row.to_json()) } async fn enrich_access(db: &PgPool, subject_id: Uuid, object_id: Uuid) -> Option { let row = sqlx::query_as::<_, AccessRow>( "SELECT subject_id, object_id, access::text, via_edge FROM node_access WHERE subject_id = $1 AND object_id = $2" ) .bind(subject_id) .bind(object_id) .fetch_optional(db) .await .ok() .flatten()?; Some(row.to_json()) } async fn enrich_mixer_channel(db: &PgPool, room_id: &str, target_user_id: &str) -> Option { let row = sqlx::query_as::<_, MixerChannelRow>( "SELECT room_id, target_user_id, gain, is_muted, active_effects, role, updated_by, updated_at FROM mixer_channels WHERE room_id = $1 AND target_user_id = $2" ) .bind(room_id) .bind(target_user_id) .fetch_optional(db) .await .ok() .flatten()?; Some(row.to_json()) } // --------------------------------------------------------------------------- // WebSocket-endepunkt // --------------------------------------------------------------------------- #[derive(Deserialize)] pub struct WsQuery { token: String, } pub async fn ws_handler( ws: WebSocketUpgrade, axum::extract::Query(query): axum::extract::Query, State(state): State, ) -> Result { let user = validate_ws_token(&query.token, &state) .await .map_err(|e| (axum::http::StatusCode::UNAUTHORIZED, e))?; tracing::info!(node_id = %user.node_id, "WebSocket-tilkobling fra bruker"); Ok(ws.on_upgrade(move |socket| handle_socket(socket, user, state))) } async fn validate_ws_token( token: &str, state: &crate::AppState, ) -> Result { use jsonwebtoken::{decode, decode_header, Algorithm, Validation}; let header = decode_header(token) .map_err(|e| format!("Ugyldig token-header: {e}"))?; let decoding_key = state.jwks.decoding_key(header.kid.as_deref()) .map_err(|e| format!("JWKS-nøkkel ikke funnet: {e}"))?; let mut validation = Validation::new(Algorithm::RS256); validation.set_issuer(&[&state.jwks.issuer]); validation.set_audience(&[&state.jwks.audience]); let token_data = decode::(token, &decoding_key, &validation) .map_err(|e| format!("JWT-validering feilet: {e}"))?; let authentik_sub = &token_data.claims.sub; let node_id = sqlx::query_scalar::<_, Uuid>( "SELECT node_id FROM auth_identities WHERE authentik_sub = $1", ) .bind(authentik_sub) .fetch_optional(&state.db) .await .map_err(|e| format!("DB-feil: {e}"))? .ok_or_else(|| format!("Ukjent brukeridentitet: {authentik_sub}"))?; Ok(WsUser { node_id }) } #[derive(Deserialize)] struct WsClaims { sub: String, } #[derive(Debug, Clone)] struct WsUser { node_id: Uuid, } async fn handle_socket(mut socket: WebSocket, user: WsUser, state: crate::AppState) { let user_id = user.node_id; let visible_nodes = match load_visible_nodes(&state.db, user_id).await { Ok(v) => Arc::new(tokio::sync::RwLock::new(v)), Err(e) => { tracing::error!("Kunne ikke laste tilgangsmatrise: {e}"); let _ = socket.send(Message::Close(None)).await; return; } }; // Send initiell snapshot match build_initial_sync(&state.db, user_id).await { Ok(sync) => { let json = serde_json::to_string(&sync).unwrap_or_default(); if socket.send(Message::Text(json.into())).await.is_err() { return; } } Err(e) => { tracing::error!("Feil ved bygging av initial_sync: {e}"); } } let mut rx = state.ws_broadcast.tx.subscribe(); loop { tokio::select! { result = rx.recv() => { match result { Ok(event) => { let should_send = should_send_to_user(&event, user_id, &visible_nodes).await; if should_send { // Oppdater tilgangsmatrise ved access-endringer if let BroadcastEvent::AccessChanged { ref op, subject_id, object_id, .. } = event { if subject_id == user_id { let mut vn = visible_nodes.write().await; if op == "DELETE" { vn.remove(&object_id); } else { vn.insert(object_id); } } } let ws_event = event.to_ws_event(); let json = match serde_json::to_string(&ws_event) { Ok(j) => j, Err(_) => continue, }; if socket.send(Message::Text(json.into())).await.is_err() { break; } } } Err(broadcast::error::RecvError::Lagged(n)) => { tracing::warn!( node_id = %user_id, skipped = n, "WebSocket-klient sakket etter, mistet {n} meldinger — sender resync" ); // Send full resync ved lag if let Ok(sync) = build_initial_sync(&state.db, user_id).await { let json = serde_json::to_string(&sync).unwrap_or_default(); if socket.send(Message::Text(json.into())).await.is_err() { break; } } } Err(broadcast::error::RecvError::Closed) => break, } } msg = socket.recv() => { match msg { Some(Ok(Message::Close(_))) | None => break, Some(Ok(Message::Ping(data))) => { if socket.send(Message::Pong(data)).await.is_err() { break; } } Some(Err(_)) => break, _ => {} } } } } tracing::info!(node_id = %user_id, "WebSocket-tilkobling lukket"); } // --------------------------------------------------------------------------- // Tilgangskontroll // --------------------------------------------------------------------------- async fn should_send_to_user( event: &BroadcastEvent, user_id: Uuid, visible_nodes: &tokio::sync::RwLock>, ) -> bool { match event { BroadcastEvent::NodeChanged { id, .. } => { let vn = visible_nodes.read().await; vn.contains(id) } BroadcastEvent::EdgeChanged { source_id, target_id, .. } => { let vn = visible_nodes.read().await; vn.contains(source_id) || vn.contains(target_id) } BroadcastEvent::AccessChanged { subject_id, .. } => *subject_id == user_id, // Mixer-kanaler sendes til alle (de er per-rom, ingen node-tilgangskontroll) BroadcastEvent::MixerChannelChanged { .. } => true, } } async fn load_visible_nodes( db: &PgPool, user_id: Uuid, ) -> Result, sqlx::Error> { let rows = sqlx::query_scalar::<_, Uuid>( r#" SELECT id FROM nodes WHERE created_by = $1 UNION SELECT object_id FROM node_access WHERE subject_id = $1 UNION SELECT id FROM nodes WHERE visibility IN ('readable', 'open') "#, ) .bind(user_id) .fetch_all(db) .await?; Ok(rows.into_iter().collect()) } async fn build_initial_sync( db: &PgPool, user_id: Uuid, ) -> Result { let nodes = sqlx::query_as::<_, NodeRow>( r#" SELECT n.id, n.node_kind, n.title, n.content, n.visibility::text, n.metadata, n.created_at, n.created_by FROM nodes n WHERE n.created_by = $1 OR n.id IN (SELECT object_id FROM node_access WHERE subject_id = $1) OR n.visibility IN ('readable', 'open') "#, ) .bind(user_id) .fetch_all(db) .await?; let node_ids: Vec = nodes.iter().map(|n| n.id).collect(); let edges = sqlx::query_as::<_, EdgeRow>( r#" SELECT e.id, e.source_id, e.target_id, e.edge_type, e.metadata, e.system, e.created_at, e.created_by FROM edges e WHERE e.source_id = ANY($1) OR e.target_id = ANY($1) "#, ) .bind(&node_ids) .fetch_all(db) .await?; let access = sqlx::query_as::<_, AccessRow>( r#" SELECT subject_id, object_id, access::text, via_edge FROM node_access WHERE subject_id = $1 "#, ) .bind(user_id) .fetch_all(db) .await?; // Mixer-kanaler: alle (ingen tilgangskontroll på rom-nivå foreløpig) let mixer_channels = sqlx::query_as::<_, MixerChannelRow>( "SELECT room_id, target_user_id, gain, is_muted, active_effects, role, updated_by, updated_at FROM mixer_channels" ) .fetch_all(db) .await?; Ok(WsEvent::InitialSync { nodes: nodes.into_iter().map(|n| n.to_json()).collect(), edges: edges.into_iter().map(|e| e.to_json()).collect(), access: access.into_iter().map(|a| a.to_json()).collect(), mixer_channels: mixer_channels.into_iter().map(|m| m.to_json()).collect(), }) } // --------------------------------------------------------------------------- // DB-rader // --------------------------------------------------------------------------- #[derive(sqlx::FromRow)] struct NodeRow { id: Uuid, node_kind: String, title: Option, content: Option, visibility: String, metadata: serde_json::Value, created_at: chrono::DateTime, created_by: Option, } impl NodeRow { fn to_json(&self) -> serde_json::Value { serde_json::json!({ "id": self.id.to_string(), "nodeKind": self.node_kind, "title": self.title.as_deref().unwrap_or(""), "content": self.content.as_deref().unwrap_or(""), "visibility": self.visibility, "metadata": self.metadata.to_string(), "createdAt": self.created_at.timestamp_micros(), "createdBy": self.created_by.map(|u| u.to_string()).unwrap_or_default(), }) } } #[derive(sqlx::FromRow)] struct EdgeRow { id: Uuid, source_id: Uuid, target_id: Uuid, edge_type: String, metadata: serde_json::Value, system: bool, created_at: chrono::DateTime, created_by: Option, } impl EdgeRow { fn to_json(&self) -> serde_json::Value { serde_json::json!({ "id": self.id.to_string(), "sourceId": self.source_id.to_string(), "targetId": self.target_id.to_string(), "edgeType": self.edge_type, "metadata": self.metadata.to_string(), "system": self.system, "createdAt": self.created_at.timestamp_micros(), "createdBy": self.created_by.map(|u| u.to_string()).unwrap_or_default(), }) } } #[derive(sqlx::FromRow)] struct AccessRow { subject_id: Uuid, object_id: Uuid, access: String, via_edge: Option, } impl AccessRow { fn to_json(&self) -> serde_json::Value { serde_json::json!({ "id": format!("{}:{}", self.subject_id, self.object_id), "subjectId": self.subject_id.to_string(), "objectId": self.object_id.to_string(), "access": self.access, "viaEdge": self.via_edge.map(|u| u.to_string()).unwrap_or_default(), }) } } #[derive(sqlx::FromRow)] struct MixerChannelRow { room_id: String, target_user_id: String, gain: f64, is_muted: bool, active_effects: String, role: String, updated_by: String, updated_at: chrono::DateTime, } impl MixerChannelRow { fn to_json(&self) -> serde_json::Value { serde_json::json!({ "id": format!("{}:{}", self.room_id, self.target_user_id), "roomId": self.room_id, "targetUserId": self.target_user_id, "gain": self.gain, "isMuted": self.is_muted, "activeEffects": self.active_effects, "role": self.role, "updatedBy": self.updated_by, "updatedAt": self.updated_at.timestamp_micros(), }) } }