From fcc9e671a5b50b507d31c80fea873b421073bf4e Mon Sep 17 00:00:00 2001 From: vegard Date: Wed, 18 Mar 2026 12:16:36 +0000 Subject: [PATCH] Backend for SpacetimeDB-migrering: berikede WS-events + mixer-API MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fase M2 (oppgave 22.2): Portvokteren sender nå full raddata i WebSocket-events (ikke bare ID), slik at frontend kan oppdatere stores direkte uten ekstra API-kall. Endringer: - ws.rs: Berik node/edge/access-events med full PG-data etter NOTIFY - ws.rs: Ny mixer_channel_changed event-type + initial_sync inkluderer mixer - ws.rs: Resync ved lag (broadcast overflow) - mixer.rs: Nye HTTP-endepunkter som erstatter STDB-reducers (create_mixer_channel, set_gain, set_mute, toggle_effect, set_mixer_role) - 019_mixer_channels.sql: PG-tabell + NOTIFY-trigger for mixer-tilstand --- maskinrommet/src/main.rs | 7 + maskinrommet/src/mixer.rs | 226 ++++++++++++++++ maskinrommet/src/ws.rs | 426 +++++++++++++++++++++++------- migrations/019_mixer_channels.sql | 56 ++++ 4 files changed, 614 insertions(+), 101 deletions(-) create mode 100644 maskinrommet/src/mixer.rs create mode 100644 migrations/019_mixer_channels.sql diff --git a/maskinrommet/src/main.rs b/maskinrommet/src/main.rs index 1e0e62a..007a3ae 100644 --- a/maskinrommet/src/main.rs +++ b/maskinrommet/src/main.rs @@ -25,6 +25,7 @@ mod serving; mod stdb; pub mod summarize; pub mod ws; +pub mod mixer; pub mod tiptap; pub mod transcribe; pub mod tts; @@ -303,6 +304,12 @@ async fn main() { .route("/custom-domain/sok", get(custom_domain::serve_custom_domain_search)) .route("/custom-domain/om", get(custom_domain::serve_custom_domain_about)) .route("/custom-domain/{article_id}", get(custom_domain::serve_custom_domain_article)) + // Mixer-kanaler (oppgave 22.2 — erstatter STDB-reducers) + .route("/intentions/create_mixer_channel", post(mixer::create_mixer_channel)) + .route("/intentions/set_gain", post(mixer::set_gain)) + .route("/intentions/set_mute", post(mixer::set_mute)) + .route("/intentions/toggle_effect", post(mixer::toggle_effect)) + .route("/intentions/set_mixer_role", post(mixer::set_mixer_role)) // Observerbarhet (oppgave 12.1) .route("/metrics", get(metrics::metrics_endpoint)) .layer(middleware::from_fn_with_state(state.clone(), metrics::latency_middleware)) diff --git a/maskinrommet/src/mixer.rs b/maskinrommet/src/mixer.rs new file mode 100644 index 0000000..9603b65 --- /dev/null +++ b/maskinrommet/src/mixer.rs @@ -0,0 +1,226 @@ +//! Mixer-kanaler — HTTP API for delt lydmixer-tilstand. +//! +//! Erstatter SpacetimeDB-reducers for mixer (createMixerChannel, setGain, +//! setMute, toggleEffect, setMixerRole). Skriver direkte til PG; +//! NOTIFY-trigger propagerer endringer til WebSocket-klienter. +//! +//! Ref: oppgave 22.2 (SpacetimeDB-migrering) + +use axum::{extract::State, http::StatusCode, Json}; +use serde::{Deserialize, Serialize}; + +use crate::auth::AuthUser; +use crate::AppState; + +#[derive(Serialize)] +pub struct ErrorResponse { + pub error: String, +} + +fn bad_request(msg: &str) -> (StatusCode, Json) { + (StatusCode::BAD_REQUEST, Json(ErrorResponse { error: msg.to_string() })) +} + +fn internal_error(msg: &str) -> (StatusCode, Json) { + (StatusCode::INTERNAL_SERVER_ERROR, Json(ErrorResponse { error: msg.to_string() })) +} + +// ============================================================================= +// Create mixer channel +// ============================================================================= + +#[derive(Deserialize)] +pub struct CreateMixerChannelRequest { + pub room_id: String, + pub target_user_id: String, +} + +#[derive(Serialize)] +pub struct MixerChannelResponse { + pub ok: bool, +} + +pub async fn create_mixer_channel( + State(state): State, + _user: AuthUser, + Json(req): Json, +) -> Result, (StatusCode, Json)> { + if req.room_id.is_empty() || req.target_user_id.is_empty() { + return Err(bad_request("room_id og target_user_id er påkrevd")); + } + + sqlx::query( + r#" + INSERT INTO mixer_channels (room_id, target_user_id, updated_by, updated_at) + VALUES ($1, $2, $3, now()) + ON CONFLICT (room_id, target_user_id) DO NOTHING + "#, + ) + .bind(&req.room_id) + .bind(&req.target_user_id) + .bind(&req.target_user_id) // updated_by = the user joining + .execute(&state.db) + .await + .map_err(|e| { + tracing::error!("Feil ved opprettelse av mixer-kanal: {e}"); + internal_error("Databasefeil") + })?; + + Ok(Json(MixerChannelResponse { ok: true })) +} + +// ============================================================================= +// Set gain +// ============================================================================= + +#[derive(Deserialize)] +pub struct SetGainRequest { + pub room_id: String, + pub target_user_id: String, + pub gain: f64, +} + +pub async fn set_gain( + State(state): State, + _user: AuthUser, + Json(req): Json, +) -> Result, (StatusCode, Json)> { + sqlx::query( + "UPDATE mixer_channels SET gain = $1, updated_at = now() WHERE room_id = $2 AND target_user_id = $3" + ) + .bind(req.gain.clamp(0.0, 1.5)) + .bind(&req.room_id) + .bind(&req.target_user_id) + .execute(&state.db) + .await + .map_err(|e| { + tracing::error!("Feil ved set_gain: {e}"); + internal_error("Databasefeil") + })?; + + Ok(Json(MixerChannelResponse { ok: true })) +} + +// ============================================================================= +// Set mute +// ============================================================================= + +#[derive(Deserialize)] +pub struct SetMuteRequest { + pub room_id: String, + pub target_user_id: String, + pub is_muted: bool, +} + +pub async fn set_mute( + State(state): State, + _user: AuthUser, + Json(req): Json, +) -> Result, (StatusCode, Json)> { + sqlx::query( + "UPDATE mixer_channels SET is_muted = $1, updated_at = now() WHERE room_id = $2 AND target_user_id = $3" + ) + .bind(req.is_muted) + .bind(&req.room_id) + .bind(&req.target_user_id) + .execute(&state.db) + .await + .map_err(|e| { + tracing::error!("Feil ved set_mute: {e}"); + internal_error("Databasefeil") + })?; + + Ok(Json(MixerChannelResponse { ok: true })) +} + +// ============================================================================= +// Toggle effect +// ============================================================================= + +#[derive(Deserialize)] +pub struct ToggleEffectRequest { + pub room_id: String, + pub target_user_id: String, + pub effect_name: String, +} + +pub async fn toggle_effect( + State(state): State, + _user: AuthUser, + Json(req): Json, +) -> Result, (StatusCode, Json)> { + // Hent nåværende active_effects JSON + let current: Option = sqlx::query_scalar( + "SELECT active_effects FROM mixer_channels WHERE room_id = $1 AND target_user_id = $2" + ) + .bind(&req.room_id) + .bind(&req.target_user_id) + .fetch_optional(&state.db) + .await + .map_err(|e| { + tracing::error!("Feil ved toggle_effect (les): {e}"); + internal_error("Databasefeil") + })?; + + let mut effects: serde_json::Value = current + .and_then(|s| serde_json::from_str(&s).ok()) + .unwrap_or(serde_json::json!({})); + + // Toggle effekten + let current_val = effects.get(&req.effect_name).and_then(|v| v.as_bool()).unwrap_or(false); + effects[&req.effect_name] = serde_json::Value::Bool(!current_val); + + let new_effects = serde_json::to_string(&effects).unwrap_or_else(|_| "{}".to_string()); + + sqlx::query( + "UPDATE mixer_channels SET active_effects = $1, updated_at = now() WHERE room_id = $2 AND target_user_id = $3" + ) + .bind(&new_effects) + .bind(&req.room_id) + .bind(&req.target_user_id) + .execute(&state.db) + .await + .map_err(|e| { + tracing::error!("Feil ved toggle_effect (skriv): {e}"); + internal_error("Databasefeil") + })?; + + Ok(Json(MixerChannelResponse { ok: true })) +} + +// ============================================================================= +// Set role +// ============================================================================= + +#[derive(Deserialize)] +pub struct SetMixerRoleRequest { + pub room_id: String, + pub target_user_id: String, + pub role: String, +} + +pub async fn set_mixer_role( + State(state): State, + _user: AuthUser, + Json(req): Json, +) -> Result, (StatusCode, Json)> { + let valid_roles = ["editor", "viewer"]; + if !valid_roles.contains(&req.role.as_str()) { + return Err(bad_request("Ugyldig rolle. Gyldige: editor, viewer")); + } + + sqlx::query( + "UPDATE mixer_channels SET role = $1, updated_at = now() WHERE room_id = $2 AND target_user_id = $3" + ) + .bind(&req.role) + .bind(&req.room_id) + .bind(&req.target_user_id) + .execute(&state.db) + .await + .map_err(|e| { + tracing::error!("Feil ved set_mixer_role: {e}"); + internal_error("Databasefeil") + })?; + + Ok(Json(MixerChannelResponse { ok: true })) +} diff --git a/maskinrommet/src/ws.rs b/maskinrommet/src/ws.rs index a681202..ca26cb5 100644 --- a/maskinrommet/src/ws.rs +++ b/maskinrommet/src/ws.rs @@ -1,10 +1,11 @@ //! WebSocket-lag for sanntid via PG LISTEN/NOTIFY. //! -//! Portvokteren lytter på `node_changed`, `edge_changed` og `access_changed` -//! kanaler i PostgreSQL og videresender relevante endringer til tilkoblede -//! WebSocket-klienter, filtrert på tilgangsmatrisen (node_access). +//! Portvokteren lytter på `node_changed`, `edge_changed`, `access_changed` +//! og `mixer_channel_changed` kanaler i PostgreSQL og videresender relevante +//! endringer via WebSocket til tilkoblede klienter, filtrert på tilgangsmatrisen. //! -//! Fase M1: Parallell med SpacetimeDB for verifisering. +//! Fase M2: Frontend bruker kun denne WebSocket-tilkoblingen (SpacetimeDB fjernet). +//! Events berikes med full raddata fra PG slik at klienten kan oppdatere stores direkte. //! Ref: docs/retninger/datalaget.md use std::sync::Arc; @@ -24,77 +25,186 @@ use uuid::Uuid; // --------------------------------------------------------------------------- -// Typer for NOTIFY-payloads +// Typer for NOTIFY-payloads (minimale, fra PG triggers) // --------------------------------------------------------------------------- -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct NodeChanged { - pub op: String, - pub id: Uuid, - pub kind: String, +#[derive(Debug, Clone, Deserialize)] +struct NodeNotify { + op: String, + id: Uuid, + kind: String, } -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct EdgeChanged { - pub op: String, - pub id: Uuid, - pub source_id: Uuid, - pub target_id: Uuid, - pub edge_type: String, +#[derive(Debug, Clone, Deserialize)] +struct EdgeNotify { + op: String, + id: Uuid, + source_id: Uuid, + target_id: Uuid, + edge_type: String, } -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct AccessChanged { - pub op: String, - pub subject_id: Uuid, - pub object_id: Uuid, - #[serde(skip_serializing_if = "Option::is_none")] - pub access: Option, +#[derive(Debug, Clone, Deserialize)] +struct AccessNotify { + op: String, + subject_id: Uuid, + object_id: Uuid, + access: Option, } -/// Samlet WebSocket-melding sendt til klienter. +#[derive(Debug, Clone, Deserialize)] +struct MixerChannelNotify { + op: String, + room_id: String, + target_user_id: String, +} + +// --------------------------------------------------------------------------- +// Berikede WsEvent-typer (sendt til klienter, med full raddata) +// --------------------------------------------------------------------------- + +/// WebSocket-melding sendt til klienter. Berikes med full raddata for +/// INSERT/UPDATE slik at frontend kan oppdatere stores direkte. #[derive(Debug, Clone, Serialize)] #[serde(tag = "type")] pub enum WsEvent { #[serde(rename = "node_changed")] - NodeChanged(NodeChanged), + NodeChanged { + op: String, + id: String, + kind: String, + /// Full node-data for INSERT/UPDATE, None for DELETE. + #[serde(skip_serializing_if = "Option::is_none")] + node: Option, + }, #[serde(rename = "edge_changed")] - EdgeChanged(EdgeChanged), + EdgeChanged { + op: String, + id: String, + source_id: String, + target_id: String, + edge_type: String, + /// Full edge-data for INSERT/UPDATE, None for DELETE. + #[serde(skip_serializing_if = "Option::is_none")] + edge: Option, + }, #[serde(rename = "access_changed")] - AccessChanged(AccessChanged), - /// Initiell snapshot av alle noder, edges og access for denne brukeren. + AccessChanged { + op: String, + subject_id: String, + object_id: String, + #[serde(skip_serializing_if = "Option::is_none")] + access: Option, + /// Full access-rad for INSERT/UPDATE. + #[serde(skip_serializing_if = "Option::is_none")] + row: Option, + }, + #[serde(rename = "mixer_channel_changed")] + MixerChannelChanged { + op: String, + room_id: String, + target_user_id: String, + /// Full mixer_channel-data for INSERT/UPDATE, None for DELETE. + #[serde(skip_serializing_if = "Option::is_none")] + channel: Option, + }, + /// Initiell snapshot av alle noder, edges, access og mixer_channels. #[serde(rename = "initial_sync")] InitialSync { nodes: Vec, edges: Vec, access: Vec, + mixer_channels: Vec, }, } +/// Intern enum for broadcast — bærer UUID-er for filtrering. +/// Konverteres til WsEvent ved serialisering. +#[derive(Debug, Clone)] +enum BroadcastEvent { + NodeChanged { + op: String, + id: Uuid, + kind: String, + node: Option, + }, + EdgeChanged { + op: String, + id: Uuid, + source_id: Uuid, + target_id: Uuid, + edge_type: String, + edge: Option, + }, + AccessChanged { + op: String, + subject_id: Uuid, + object_id: Uuid, + access: Option, + row: Option, + }, + MixerChannelChanged { + op: String, + room_id: String, + target_user_id: String, + channel: Option, + }, +} + +impl BroadcastEvent { + fn to_ws_event(&self) -> WsEvent { + match self { + BroadcastEvent::NodeChanged { op, id, kind, node } => WsEvent::NodeChanged { + op: op.clone(), + id: id.to_string(), + kind: kind.clone(), + node: node.clone(), + }, + BroadcastEvent::EdgeChanged { op, id, source_id, target_id, edge_type, edge } => WsEvent::EdgeChanged { + op: op.clone(), + id: id.to_string(), + source_id: source_id.to_string(), + target_id: target_id.to_string(), + edge_type: edge_type.clone(), + edge: edge.clone(), + }, + BroadcastEvent::AccessChanged { op, subject_id, object_id, access, row } => WsEvent::AccessChanged { + op: op.clone(), + subject_id: subject_id.to_string(), + object_id: object_id.to_string(), + access: access.clone(), + row: row.clone(), + }, + BroadcastEvent::MixerChannelChanged { op, room_id, target_user_id, channel } => WsEvent::MixerChannelChanged { + op: op.clone(), + room_id: room_id.clone(), + target_user_id: target_user_id.clone(), + channel: channel.clone(), + }, + } + } +} + // --------------------------------------------------------------------------- -// Broadcast-kanal for alle NOTIFY-events +// Broadcast-kanal // --------------------------------------------------------------------------- -/// Delt broadcast-kanal som PG-lytteren publiserer til. -/// Alle WebSocket-tilkoblinger abonnerer. #[derive(Clone)] pub struct WsBroadcast { - tx: broadcast::Sender, + tx: broadcast::Sender, } impl WsBroadcast { pub fn new() -> Self { - // 4096 meldinger i buffer — dropper eldste ved overflyt let (tx, _) = broadcast::channel(4096); Self { tx } } } // --------------------------------------------------------------------------- -// PG LISTEN-bakgrunnstråd +// PG LISTEN + berikelse // --------------------------------------------------------------------------- -/// Start bakgrunnsoppgave som lytter på PG NOTIFY og publiserer til broadcast. pub fn start_pg_listener(db: PgPool, ws: WsBroadcast) { tokio::spawn(async move { loop { @@ -109,10 +219,10 @@ pub fn start_pg_listener(db: PgPool, ws: WsBroadcast) { async fn pg_listen_loop(db: &PgPool, ws: &WsBroadcast) -> Result<(), sqlx::Error> { let mut listener = PgListener::connect_with(db).await?; listener - .listen_all(["node_changed", "edge_changed", "access_changed"]) + .listen_all(["node_changed", "edge_changed", "access_changed", "mixer_channel_changed"]) .await?; - tracing::info!("PG LISTEN startet — lytter på node_changed, edge_changed, access_changed"); + tracing::info!("PG LISTEN startet — lytter på node_changed, edge_changed, access_changed, mixer_channel_changed"); loop { let notification = listener.recv().await?; @@ -120,67 +230,147 @@ async fn pg_listen_loop(db: &PgPool, ws: &WsBroadcast) -> Result<(), sqlx::Error let payload = notification.payload(); let event = match channel { - "node_changed" => match serde_json::from_str::(payload) { - Ok(n) => WsEvent::NodeChanged(n), - Err(e) => { - tracing::warn!("Ugyldig node_changed-payload: {e}"); - continue; + "node_changed" => { + let n: NodeNotify = match serde_json::from_str(payload) { + Ok(v) => v, + Err(e) => { tracing::warn!("Ugyldig node_changed-payload: {e}"); continue; } + }; + let node = if n.op != "DELETE" { + enrich_node(db, n.id).await + } else { + None + }; + BroadcastEvent::NodeChanged { op: n.op, id: n.id, kind: n.kind, node } + } + "edge_changed" => { + let e: EdgeNotify = match serde_json::from_str(payload) { + Ok(v) => v, + Err(e) => { tracing::warn!("Ugyldig edge_changed-payload: {e}"); continue; } + }; + let edge = if e.op != "DELETE" { + enrich_edge(db, e.id).await + } else { + None + }; + BroadcastEvent::EdgeChanged { + op: e.op, id: e.id, source_id: e.source_id, + target_id: e.target_id, edge_type: e.edge_type, edge, } - }, - "edge_changed" => match serde_json::from_str::(payload) { - Ok(e) => WsEvent::EdgeChanged(e), - Err(e) => { - tracing::warn!("Ugyldig edge_changed-payload: {e}"); - continue; + } + "access_changed" => { + let a: AccessNotify = match serde_json::from_str(payload) { + Ok(v) => v, + Err(e) => { tracing::warn!("Ugyldig access_changed-payload: {e}"); continue; } + }; + let row = if a.op != "DELETE" { + enrich_access(db, a.subject_id, a.object_id).await + } else { + None + }; + BroadcastEvent::AccessChanged { + op: a.op, subject_id: a.subject_id, object_id: a.object_id, + access: a.access, row, } - }, - "access_changed" => match serde_json::from_str::(payload) { - Ok(a) => WsEvent::AccessChanged(a), - Err(e) => { - tracing::warn!("Ugyldig access_changed-payload: {e}"); - continue; + } + "mixer_channel_changed" => { + let m: MixerChannelNotify = match serde_json::from_str(payload) { + Ok(v) => v, + Err(e) => { tracing::warn!("Ugyldig mixer_channel_changed-payload: {e}"); continue; } + }; + let channel_data = if m.op != "DELETE" { + enrich_mixer_channel(db, &m.room_id, &m.target_user_id).await + } else { + None + }; + BroadcastEvent::MixerChannelChanged { + op: m.op, room_id: m.room_id, target_user_id: m.target_user_id, + channel: channel_data, } - }, + } _ => continue, }; - // Broadcast til alle tilkoblede klienter (filtrering skjer per klient) let _ = ws.tx.send(event); } } +// --------------------------------------------------------------------------- +// Berikelse: hent full rad fra PG etter NOTIFY +// --------------------------------------------------------------------------- + +async fn enrich_node(db: &PgPool, id: Uuid) -> Option { + let row = sqlx::query_as::<_, NodeRow>( + "SELECT id, node_kind, title, content, visibility::text, metadata, created_at, created_by FROM nodes WHERE id = $1" + ) + .bind(id) + .fetch_optional(db) + .await + .ok() + .flatten()?; + Some(row.to_json()) +} + +async fn enrich_edge(db: &PgPool, id: Uuid) -> Option { + let row = sqlx::query_as::<_, EdgeRow>( + "SELECT id, source_id, target_id, edge_type, metadata, system, created_at, created_by FROM edges WHERE id = $1" + ) + .bind(id) + .fetch_optional(db) + .await + .ok() + .flatten()?; + Some(row.to_json()) +} + +async fn enrich_access(db: &PgPool, subject_id: Uuid, object_id: Uuid) -> Option { + let row = sqlx::query_as::<_, AccessRow>( + "SELECT subject_id, object_id, access::text, via_edge FROM node_access WHERE subject_id = $1 AND object_id = $2" + ) + .bind(subject_id) + .bind(object_id) + .fetch_optional(db) + .await + .ok() + .flatten()?; + Some(row.to_json()) +} + +async fn enrich_mixer_channel(db: &PgPool, room_id: &str, target_user_id: &str) -> Option { + let row = sqlx::query_as::<_, MixerChannelRow>( + "SELECT room_id, target_user_id, gain, is_muted, active_effects, role, updated_by, updated_at FROM mixer_channels WHERE room_id = $1 AND target_user_id = $2" + ) + .bind(room_id) + .bind(target_user_id) + .fetch_optional(db) + .await + .ok() + .flatten()?; + Some(row.to_json()) +} + // --------------------------------------------------------------------------- // WebSocket-endepunkt // --------------------------------------------------------------------------- -/// Query-parameter for WebSocket-autentisering. -/// Nettlesere kan ikke sende Authorization-header ved WS-oppgradering, -/// så token sendes som ?token= i URL-en. #[derive(Deserialize)] pub struct WsQuery { token: String, } -/// GET /ws?token= — WebSocket-oppgradering for sanntidsstrøm. pub async fn ws_handler( ws: WebSocketUpgrade, axum::extract::Query(query): axum::extract::Query, State(state): State, ) -> Result { - // Valider JWT manuelt (kan ikke bruke AuthUser-ekstraktor for WS) let user = validate_ws_token(&query.token, &state) .await .map_err(|e| (axum::http::StatusCode::UNAUTHORIZED, e))?; - tracing::info!( - node_id = %user.node_id, - "WebSocket-tilkobling fra bruker" - ); + tracing::info!(node_id = %user.node_id, "WebSocket-tilkobling fra bruker"); Ok(ws.on_upgrade(move |socket| handle_socket(socket, user, state))) } -/// Valider JWT fra query-parameter — samme logikk som AuthUser-ekstraktor. async fn validate_ws_token( token: &str, state: &crate::AppState, @@ -224,18 +414,14 @@ struct WsUser { node_id: Uuid, } -/// Håndter en individuell WebSocket-tilkobling. async fn handle_socket(mut socket: WebSocket, user: WsUser, state: crate::AppState) { let user_id = user.node_id; - // Last brukerens tilgangsmatrise fra PG let visible_nodes = match load_visible_nodes(&state.db, user_id).await { Ok(v) => Arc::new(tokio::sync::RwLock::new(v)), Err(e) => { tracing::error!("Kunne ikke laste tilgangsmatrise: {e}"); - let _ = socket - .send(Message::Close(None)) - .await; + let _ = socket.send(Message::Close(None)).await; return; } }; @@ -253,35 +439,34 @@ async fn handle_socket(mut socket: WebSocket, user: WsUser, state: crate::AppSta } } - // Abonner på broadcast let mut rx = state.ws_broadcast.tx.subscribe(); loop { tokio::select! { - // Motta broadcast-events result = rx.recv() => { match result { Ok(event) => { let should_send = should_send_to_user(&event, user_id, &visible_nodes).await; if should_send { // Oppdater tilgangsmatrise ved access-endringer - if let WsEvent::AccessChanged(ref ac) = event { - let mut vn = visible_nodes.write().await; - if ac.subject_id == user_id { - if ac.op == "DELETE" { - vn.remove(&ac.object_id); + if let BroadcastEvent::AccessChanged { ref op, subject_id, object_id, .. } = event { + if subject_id == user_id { + let mut vn = visible_nodes.write().await; + if op == "DELETE" { + vn.remove(&object_id); } else { - vn.insert(ac.object_id); + vn.insert(object_id); } } } - let json = match serde_json::to_string(&event) { + let ws_event = event.to_ws_event(); + let json = match serde_json::to_string(&ws_event) { Ok(j) => j, Err(_) => continue, }; if socket.send(Message::Text(json.into())).await.is_err() { - break; // Klient frakoblet + break; } } } @@ -289,14 +474,19 @@ async fn handle_socket(mut socket: WebSocket, user: WsUser, state: crate::AppSta tracing::warn!( node_id = %user_id, skipped = n, - "WebSocket-klient sakket etter, mistet {n} meldinger" + "WebSocket-klient sakket etter, mistet {n} meldinger — sender resync" ); - // Kunne sendt full resync her, men for M1 er det OK + // Send full resync ved lag + if let Ok(sync) = build_initial_sync(&state.db, user_id).await { + let json = serde_json::to_string(&sync).unwrap_or_default(); + if socket.send(Message::Text(json.into())).await.is_err() { + break; + } + } } Err(broadcast::error::RecvError::Closed) => break, } } - // Motta meldinger fra klienten (ping/pong, close) msg = socket.recv() => { match msg { Some(Ok(Message::Close(_))) | None => break, @@ -306,7 +496,7 @@ async fn handle_socket(mut socket: WebSocket, user: WsUser, state: crate::AppSta } } Some(Err(_)) => break, - _ => {} // Ignorerer tekst/binær fra klient i M1 + _ => {} } } } @@ -319,27 +509,30 @@ async fn handle_socket(mut socket: WebSocket, user: WsUser, state: crate::AppSta // Tilgangskontroll // --------------------------------------------------------------------------- -/// Sjekk om en event skal sendes til denne brukeren. async fn should_send_to_user( - event: &WsEvent, + event: &BroadcastEvent, user_id: Uuid, visible_nodes: &tokio::sync::RwLock>, ) -> bool { - let vn = visible_nodes.read().await; match event { - WsEvent::NodeChanged(n) => vn.contains(&n.id), - WsEvent::EdgeChanged(e) => vn.contains(&e.source_id) || vn.contains(&e.target_id), - WsEvent::AccessChanged(a) => a.subject_id == user_id, - WsEvent::InitialSync { .. } => true, + BroadcastEvent::NodeChanged { id, .. } => { + let vn = visible_nodes.read().await; + vn.contains(id) + } + BroadcastEvent::EdgeChanged { source_id, target_id, .. } => { + let vn = visible_nodes.read().await; + vn.contains(source_id) || vn.contains(target_id) + } + BroadcastEvent::AccessChanged { subject_id, .. } => *subject_id == user_id, + // Mixer-kanaler sendes til alle (de er per-rom, ingen node-tilgangskontroll) + BroadcastEvent::MixerChannelChanged { .. } => true, } } -/// Last alle node-IDer som brukeren har tilgang til. async fn load_visible_nodes( db: &PgPool, user_id: Uuid, ) -> Result, sqlx::Error> { - // Brukerens egne noder + noder via node_access + offentlige noder let rows = sqlx::query_scalar::<_, Uuid>( r#" SELECT id FROM nodes WHERE created_by = $1 @@ -356,12 +549,10 @@ async fn load_visible_nodes( Ok(rows.into_iter().collect()) } -/// Bygg initial_sync-melding med alle data brukeren kan se. async fn build_initial_sync( db: &PgPool, user_id: Uuid, ) -> Result { - // Noder: egne + tilgang + offentlige let nodes = sqlx::query_as::<_, NodeRow>( r#" SELECT n.id, n.node_kind, n.title, n.content, n.visibility::text, @@ -378,7 +569,6 @@ async fn build_initial_sync( let node_ids: Vec = nodes.iter().map(|n| n.id).collect(); - // Edges der minst én side er synlig let edges = sqlx::query_as::<_, EdgeRow>( r#" SELECT e.id, e.source_id, e.target_id, e.edge_type, @@ -391,7 +581,6 @@ async fn build_initial_sync( .fetch_all(db) .await?; - // Access-entries for denne brukeren let access = sqlx::query_as::<_, AccessRow>( r#" SELECT subject_id, object_id, access::text, via_edge @@ -403,15 +592,23 @@ async fn build_initial_sync( .fetch_all(db) .await?; + // Mixer-kanaler: alle (ingen tilgangskontroll på rom-nivå foreløpig) + let mixer_channels = sqlx::query_as::<_, MixerChannelRow>( + "SELECT room_id, target_user_id, gain, is_muted, active_effects, role, updated_by, updated_at FROM mixer_channels" + ) + .fetch_all(db) + .await?; + Ok(WsEvent::InitialSync { nodes: nodes.into_iter().map(|n| n.to_json()).collect(), edges: edges.into_iter().map(|e| e.to_json()).collect(), access: access.into_iter().map(|a| a.to_json()).collect(), + mixer_channels: mixer_channels.into_iter().map(|m| m.to_json()).collect(), }) } // --------------------------------------------------------------------------- -// DB-rader for initial_sync +// DB-rader // --------------------------------------------------------------------------- #[derive(sqlx::FromRow)] @@ -478,7 +675,6 @@ struct AccessRow { impl AccessRow { fn to_json(&self) -> serde_json::Value { - // Composite key som STDB bruker: "subject_id:object_id" serde_json::json!({ "id": format!("{}:{}", self.subject_id, self.object_id), "subjectId": self.subject_id.to_string(), @@ -488,3 +684,31 @@ impl AccessRow { }) } } + +#[derive(sqlx::FromRow)] +struct MixerChannelRow { + room_id: String, + target_user_id: String, + gain: f64, + is_muted: bool, + active_effects: String, + role: String, + updated_by: String, + updated_at: chrono::DateTime, +} + +impl MixerChannelRow { + fn to_json(&self) -> serde_json::Value { + serde_json::json!({ + "id": format!("{}:{}", self.room_id, self.target_user_id), + "roomId": self.room_id, + "targetUserId": self.target_user_id, + "gain": self.gain, + "isMuted": self.is_muted, + "activeEffects": self.active_effects, + "role": self.role, + "updatedBy": self.updated_by, + "updatedAt": self.updated_at.timestamp_micros(), + }) + } +} diff --git a/migrations/019_mixer_channels.sql b/migrations/019_mixer_channels.sql new file mode 100644 index 0000000..aa200bf --- /dev/null +++ b/migrations/019_mixer_channels.sql @@ -0,0 +1,56 @@ +-- 019_mixer_channels.sql +-- Mixer-kanaler for delt lydmixer-tilstand mellom deltakere i LiveKit-rom. +-- +-- Migrert fra SpacetimeDB til PostgreSQL som del av Fase M2 (oppgave 22.2). +-- Portvokteren sender endringer via WebSocket til tilkoblede klienter. + +BEGIN; + +-- ============================================================================= +-- mixer_channels: Delt mixer-tilstand per deltaker per rom. +-- ============================================================================= + +CREATE TABLE IF NOT EXISTS mixer_channels ( + room_id TEXT NOT NULL, + target_user_id TEXT NOT NULL, + gain DOUBLE PRECISION NOT NULL DEFAULT 1.0, + is_muted BOOLEAN NOT NULL DEFAULT false, + active_effects TEXT NOT NULL DEFAULT '{}', -- JSON-streng med effekttilstand + role TEXT NOT NULL DEFAULT 'editor', + updated_by TEXT NOT NULL DEFAULT '', + updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), + PRIMARY KEY (room_id, target_user_id) +); + +-- ============================================================================= +-- notify_mixer_channel_change: NOTIFY ved endringer i mixer_channels. +-- ============================================================================= + +CREATE OR REPLACE FUNCTION notify_mixer_channel_change() +RETURNS trigger AS $$ +DECLARE + payload json; + target_row mixer_channels%ROWTYPE; +BEGIN + IF TG_OP = 'DELETE' THEN + target_row := OLD; + ELSE + target_row := NEW; + END IF; + + payload := json_build_object( + 'op', TG_OP, + 'room_id', target_row.room_id, + 'target_user_id', target_row.target_user_id + ); + + PERFORM pg_notify('mixer_channel_changed', payload::text); + RETURN target_row; +END; +$$ LANGUAGE plpgsql; + +CREATE TRIGGER mixer_channels_notify + AFTER INSERT OR UPDATE OR DELETE ON mixer_channels + FOR EACH ROW EXECUTE FUNCTION notify_mixer_channel_change(); + +COMMIT;