Backend for SpacetimeDB-migrering: berikede WS-events + mixer-API

Fase M2 (oppgave 22.2): Portvokteren sender nå full raddata i
WebSocket-events (ikke bare ID), slik at frontend kan oppdatere
stores direkte uten ekstra API-kall.

Endringer:
- ws.rs: Berik node/edge/access-events med full PG-data etter NOTIFY
- ws.rs: Ny mixer_channel_changed event-type + initial_sync inkluderer mixer
- ws.rs: Resync ved lag (broadcast overflow)
- mixer.rs: Nye HTTP-endepunkter som erstatter STDB-reducers
  (create_mixer_channel, set_gain, set_mute, toggle_effect, set_mixer_role)
- 019_mixer_channels.sql: PG-tabell + NOTIFY-trigger for mixer-tilstand
This commit is contained in:
vegard 2026-03-18 12:16:36 +00:00
parent ce1c06d794
commit fcc9e671a5
4 changed files with 614 additions and 101 deletions

View file

@ -25,6 +25,7 @@ mod serving;
mod stdb;
pub mod summarize;
pub mod ws;
pub mod mixer;
pub mod tiptap;
pub mod transcribe;
pub mod tts;
@ -303,6 +304,12 @@ async fn main() {
.route("/custom-domain/sok", get(custom_domain::serve_custom_domain_search))
.route("/custom-domain/om", get(custom_domain::serve_custom_domain_about))
.route("/custom-domain/{article_id}", get(custom_domain::serve_custom_domain_article))
// Mixer-kanaler (oppgave 22.2 — erstatter STDB-reducers)
.route("/intentions/create_mixer_channel", post(mixer::create_mixer_channel))
.route("/intentions/set_gain", post(mixer::set_gain))
.route("/intentions/set_mute", post(mixer::set_mute))
.route("/intentions/toggle_effect", post(mixer::toggle_effect))
.route("/intentions/set_mixer_role", post(mixer::set_mixer_role))
// Observerbarhet (oppgave 12.1)
.route("/metrics", get(metrics::metrics_endpoint))
.layer(middleware::from_fn_with_state(state.clone(), metrics::latency_middleware))

226
maskinrommet/src/mixer.rs Normal file
View file

@ -0,0 +1,226 @@
//! Mixer-kanaler — HTTP API for delt lydmixer-tilstand.
//!
//! Erstatter SpacetimeDB-reducers for mixer (createMixerChannel, setGain,
//! setMute, toggleEffect, setMixerRole). Skriver direkte til PG;
//! NOTIFY-trigger propagerer endringer til WebSocket-klienter.
//!
//! Ref: oppgave 22.2 (SpacetimeDB-migrering)
use axum::{extract::State, http::StatusCode, Json};
use serde::{Deserialize, Serialize};
use crate::auth::AuthUser;
use crate::AppState;
#[derive(Serialize)]
pub struct ErrorResponse {
pub error: String,
}
fn bad_request(msg: &str) -> (StatusCode, Json<ErrorResponse>) {
(StatusCode::BAD_REQUEST, Json(ErrorResponse { error: msg.to_string() }))
}
fn internal_error(msg: &str) -> (StatusCode, Json<ErrorResponse>) {
(StatusCode::INTERNAL_SERVER_ERROR, Json(ErrorResponse { error: msg.to_string() }))
}
// =============================================================================
// Create mixer channel
// =============================================================================
#[derive(Deserialize)]
pub struct CreateMixerChannelRequest {
pub room_id: String,
pub target_user_id: String,
}
#[derive(Serialize)]
pub struct MixerChannelResponse {
pub ok: bool,
}
pub async fn create_mixer_channel(
State(state): State<AppState>,
_user: AuthUser,
Json(req): Json<CreateMixerChannelRequest>,
) -> Result<Json<MixerChannelResponse>, (StatusCode, Json<ErrorResponse>)> {
if req.room_id.is_empty() || req.target_user_id.is_empty() {
return Err(bad_request("room_id og target_user_id er påkrevd"));
}
sqlx::query(
r#"
INSERT INTO mixer_channels (room_id, target_user_id, updated_by, updated_at)
VALUES ($1, $2, $3, now())
ON CONFLICT (room_id, target_user_id) DO NOTHING
"#,
)
.bind(&req.room_id)
.bind(&req.target_user_id)
.bind(&req.target_user_id) // updated_by = the user joining
.execute(&state.db)
.await
.map_err(|e| {
tracing::error!("Feil ved opprettelse av mixer-kanal: {e}");
internal_error("Databasefeil")
})?;
Ok(Json(MixerChannelResponse { ok: true }))
}
// =============================================================================
// Set gain
// =============================================================================
#[derive(Deserialize)]
pub struct SetGainRequest {
pub room_id: String,
pub target_user_id: String,
pub gain: f64,
}
pub async fn set_gain(
State(state): State<AppState>,
_user: AuthUser,
Json(req): Json<SetGainRequest>,
) -> Result<Json<MixerChannelResponse>, (StatusCode, Json<ErrorResponse>)> {
sqlx::query(
"UPDATE mixer_channels SET gain = $1, updated_at = now() WHERE room_id = $2 AND target_user_id = $3"
)
.bind(req.gain.clamp(0.0, 1.5))
.bind(&req.room_id)
.bind(&req.target_user_id)
.execute(&state.db)
.await
.map_err(|e| {
tracing::error!("Feil ved set_gain: {e}");
internal_error("Databasefeil")
})?;
Ok(Json(MixerChannelResponse { ok: true }))
}
// =============================================================================
// Set mute
// =============================================================================
#[derive(Deserialize)]
pub struct SetMuteRequest {
pub room_id: String,
pub target_user_id: String,
pub is_muted: bool,
}
pub async fn set_mute(
State(state): State<AppState>,
_user: AuthUser,
Json(req): Json<SetMuteRequest>,
) -> Result<Json<MixerChannelResponse>, (StatusCode, Json<ErrorResponse>)> {
sqlx::query(
"UPDATE mixer_channels SET is_muted = $1, updated_at = now() WHERE room_id = $2 AND target_user_id = $3"
)
.bind(req.is_muted)
.bind(&req.room_id)
.bind(&req.target_user_id)
.execute(&state.db)
.await
.map_err(|e| {
tracing::error!("Feil ved set_mute: {e}");
internal_error("Databasefeil")
})?;
Ok(Json(MixerChannelResponse { ok: true }))
}
// =============================================================================
// Toggle effect
// =============================================================================
#[derive(Deserialize)]
pub struct ToggleEffectRequest {
pub room_id: String,
pub target_user_id: String,
pub effect_name: String,
}
pub async fn toggle_effect(
State(state): State<AppState>,
_user: AuthUser,
Json(req): Json<ToggleEffectRequest>,
) -> Result<Json<MixerChannelResponse>, (StatusCode, Json<ErrorResponse>)> {
// Hent nåværende active_effects JSON
let current: Option<String> = sqlx::query_scalar(
"SELECT active_effects FROM mixer_channels WHERE room_id = $1 AND target_user_id = $2"
)
.bind(&req.room_id)
.bind(&req.target_user_id)
.fetch_optional(&state.db)
.await
.map_err(|e| {
tracing::error!("Feil ved toggle_effect (les): {e}");
internal_error("Databasefeil")
})?;
let mut effects: serde_json::Value = current
.and_then(|s| serde_json::from_str(&s).ok())
.unwrap_or(serde_json::json!({}));
// Toggle effekten
let current_val = effects.get(&req.effect_name).and_then(|v| v.as_bool()).unwrap_or(false);
effects[&req.effect_name] = serde_json::Value::Bool(!current_val);
let new_effects = serde_json::to_string(&effects).unwrap_or_else(|_| "{}".to_string());
sqlx::query(
"UPDATE mixer_channels SET active_effects = $1, updated_at = now() WHERE room_id = $2 AND target_user_id = $3"
)
.bind(&new_effects)
.bind(&req.room_id)
.bind(&req.target_user_id)
.execute(&state.db)
.await
.map_err(|e| {
tracing::error!("Feil ved toggle_effect (skriv): {e}");
internal_error("Databasefeil")
})?;
Ok(Json(MixerChannelResponse { ok: true }))
}
// =============================================================================
// Set role
// =============================================================================
#[derive(Deserialize)]
pub struct SetMixerRoleRequest {
pub room_id: String,
pub target_user_id: String,
pub role: String,
}
pub async fn set_mixer_role(
State(state): State<AppState>,
_user: AuthUser,
Json(req): Json<SetMixerRoleRequest>,
) -> Result<Json<MixerChannelResponse>, (StatusCode, Json<ErrorResponse>)> {
let valid_roles = ["editor", "viewer"];
if !valid_roles.contains(&req.role.as_str()) {
return Err(bad_request("Ugyldig rolle. Gyldige: editor, viewer"));
}
sqlx::query(
"UPDATE mixer_channels SET role = $1, updated_at = now() WHERE room_id = $2 AND target_user_id = $3"
)
.bind(&req.role)
.bind(&req.room_id)
.bind(&req.target_user_id)
.execute(&state.db)
.await
.map_err(|e| {
tracing::error!("Feil ved set_mixer_role: {e}");
internal_error("Databasefeil")
})?;
Ok(Json(MixerChannelResponse { ok: true }))
}

View file

@ -1,10 +1,11 @@
//! WebSocket-lag for sanntid via PG LISTEN/NOTIFY.
//!
//! Portvokteren lytter på `node_changed`, `edge_changed` og `access_changed`
//! kanaler i PostgreSQL og videresender relevante endringer til tilkoblede
//! WebSocket-klienter, filtrert på tilgangsmatrisen (node_access).
//! Portvokteren lytter på `node_changed`, `edge_changed`, `access_changed`
//! og `mixer_channel_changed` kanaler i PostgreSQL og videresender relevante
//! endringer via WebSocket til tilkoblede klienter, filtrert på tilgangsmatrisen.
//!
//! Fase M1: Parallell med SpacetimeDB for verifisering.
//! Fase M2: Frontend bruker kun denne WebSocket-tilkoblingen (SpacetimeDB fjernet).
//! Events berikes med full raddata fra PG slik at klienten kan oppdatere stores direkte.
//! Ref: docs/retninger/datalaget.md
use std::sync::Arc;
@ -24,77 +25,186 @@ use uuid::Uuid;
// ---------------------------------------------------------------------------
// Typer for NOTIFY-payloads
// Typer for NOTIFY-payloads (minimale, fra PG triggers)
// ---------------------------------------------------------------------------
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NodeChanged {
pub op: String,
pub id: Uuid,
pub kind: String,
#[derive(Debug, Clone, Deserialize)]
struct NodeNotify {
op: String,
id: Uuid,
kind: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EdgeChanged {
pub op: String,
pub id: Uuid,
pub source_id: Uuid,
pub target_id: Uuid,
pub edge_type: String,
#[derive(Debug, Clone, Deserialize)]
struct EdgeNotify {
op: String,
id: Uuid,
source_id: Uuid,
target_id: Uuid,
edge_type: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AccessChanged {
pub op: String,
pub subject_id: Uuid,
pub object_id: Uuid,
#[serde(skip_serializing_if = "Option::is_none")]
pub access: Option<String>,
#[derive(Debug, Clone, Deserialize)]
struct AccessNotify {
op: String,
subject_id: Uuid,
object_id: Uuid,
access: Option<String>,
}
/// Samlet WebSocket-melding sendt til klienter.
#[derive(Debug, Clone, Deserialize)]
struct MixerChannelNotify {
op: String,
room_id: String,
target_user_id: String,
}
// ---------------------------------------------------------------------------
// Berikede WsEvent-typer (sendt til klienter, med full raddata)
// ---------------------------------------------------------------------------
/// WebSocket-melding sendt til klienter. Berikes med full raddata for
/// INSERT/UPDATE slik at frontend kan oppdatere stores direkte.
#[derive(Debug, Clone, Serialize)]
#[serde(tag = "type")]
pub enum WsEvent {
#[serde(rename = "node_changed")]
NodeChanged(NodeChanged),
NodeChanged {
op: String,
id: String,
kind: String,
/// Full node-data for INSERT/UPDATE, None for DELETE.
#[serde(skip_serializing_if = "Option::is_none")]
node: Option<serde_json::Value>,
},
#[serde(rename = "edge_changed")]
EdgeChanged(EdgeChanged),
EdgeChanged {
op: String,
id: String,
source_id: String,
target_id: String,
edge_type: String,
/// Full edge-data for INSERT/UPDATE, None for DELETE.
#[serde(skip_serializing_if = "Option::is_none")]
edge: Option<serde_json::Value>,
},
#[serde(rename = "access_changed")]
AccessChanged(AccessChanged),
/// Initiell snapshot av alle noder, edges og access for denne brukeren.
AccessChanged {
op: String,
subject_id: String,
object_id: String,
#[serde(skip_serializing_if = "Option::is_none")]
access: Option<String>,
/// Full access-rad for INSERT/UPDATE.
#[serde(skip_serializing_if = "Option::is_none")]
row: Option<serde_json::Value>,
},
#[serde(rename = "mixer_channel_changed")]
MixerChannelChanged {
op: String,
room_id: String,
target_user_id: String,
/// Full mixer_channel-data for INSERT/UPDATE, None for DELETE.
#[serde(skip_serializing_if = "Option::is_none")]
channel: Option<serde_json::Value>,
},
/// Initiell snapshot av alle noder, edges, access og mixer_channels.
#[serde(rename = "initial_sync")]
InitialSync {
nodes: Vec<serde_json::Value>,
edges: Vec<serde_json::Value>,
access: Vec<serde_json::Value>,
mixer_channels: Vec<serde_json::Value>,
},
}
/// Intern enum for broadcast — bærer UUID-er for filtrering.
/// Konverteres til WsEvent ved serialisering.
#[derive(Debug, Clone)]
enum BroadcastEvent {
NodeChanged {
op: String,
id: Uuid,
kind: String,
node: Option<serde_json::Value>,
},
EdgeChanged {
op: String,
id: Uuid,
source_id: Uuid,
target_id: Uuid,
edge_type: String,
edge: Option<serde_json::Value>,
},
AccessChanged {
op: String,
subject_id: Uuid,
object_id: Uuid,
access: Option<String>,
row: Option<serde_json::Value>,
},
MixerChannelChanged {
op: String,
room_id: String,
target_user_id: String,
channel: Option<serde_json::Value>,
},
}
impl BroadcastEvent {
fn to_ws_event(&self) -> WsEvent {
match self {
BroadcastEvent::NodeChanged { op, id, kind, node } => WsEvent::NodeChanged {
op: op.clone(),
id: id.to_string(),
kind: kind.clone(),
node: node.clone(),
},
BroadcastEvent::EdgeChanged { op, id, source_id, target_id, edge_type, edge } => WsEvent::EdgeChanged {
op: op.clone(),
id: id.to_string(),
source_id: source_id.to_string(),
target_id: target_id.to_string(),
edge_type: edge_type.clone(),
edge: edge.clone(),
},
BroadcastEvent::AccessChanged { op, subject_id, object_id, access, row } => WsEvent::AccessChanged {
op: op.clone(),
subject_id: subject_id.to_string(),
object_id: object_id.to_string(),
access: access.clone(),
row: row.clone(),
},
BroadcastEvent::MixerChannelChanged { op, room_id, target_user_id, channel } => WsEvent::MixerChannelChanged {
op: op.clone(),
room_id: room_id.clone(),
target_user_id: target_user_id.clone(),
channel: channel.clone(),
},
}
}
}
// ---------------------------------------------------------------------------
// Broadcast-kanal for alle NOTIFY-events
// Broadcast-kanal
// ---------------------------------------------------------------------------
/// Delt broadcast-kanal som PG-lytteren publiserer til.
/// Alle WebSocket-tilkoblinger abonnerer.
#[derive(Clone)]
pub struct WsBroadcast {
tx: broadcast::Sender<WsEvent>,
tx: broadcast::Sender<BroadcastEvent>,
}
impl WsBroadcast {
pub fn new() -> Self {
// 4096 meldinger i buffer — dropper eldste ved overflyt
let (tx, _) = broadcast::channel(4096);
Self { tx }
}
}
// ---------------------------------------------------------------------------
// PG LISTEN-bakgrunnstråd
// PG LISTEN + berikelse
// ---------------------------------------------------------------------------
/// Start bakgrunnsoppgave som lytter på PG NOTIFY og publiserer til broadcast.
pub fn start_pg_listener(db: PgPool, ws: WsBroadcast) {
tokio::spawn(async move {
loop {
@ -109,10 +219,10 @@ pub fn start_pg_listener(db: PgPool, ws: WsBroadcast) {
async fn pg_listen_loop(db: &PgPool, ws: &WsBroadcast) -> Result<(), sqlx::Error> {
let mut listener = PgListener::connect_with(db).await?;
listener
.listen_all(["node_changed", "edge_changed", "access_changed"])
.listen_all(["node_changed", "edge_changed", "access_changed", "mixer_channel_changed"])
.await?;
tracing::info!("PG LISTEN startet — lytter på node_changed, edge_changed, access_changed");
tracing::info!("PG LISTEN startet — lytter på node_changed, edge_changed, access_changed, mixer_channel_changed");
loop {
let notification = listener.recv().await?;
@ -120,67 +230,147 @@ async fn pg_listen_loop(db: &PgPool, ws: &WsBroadcast) -> Result<(), sqlx::Error
let payload = notification.payload();
let event = match channel {
"node_changed" => match serde_json::from_str::<NodeChanged>(payload) {
Ok(n) => WsEvent::NodeChanged(n),
Err(e) => {
tracing::warn!("Ugyldig node_changed-payload: {e}");
continue;
"node_changed" => {
let n: NodeNotify = match serde_json::from_str(payload) {
Ok(v) => v,
Err(e) => { tracing::warn!("Ugyldig node_changed-payload: {e}"); continue; }
};
let node = if n.op != "DELETE" {
enrich_node(db, n.id).await
} else {
None
};
BroadcastEvent::NodeChanged { op: n.op, id: n.id, kind: n.kind, node }
}
},
"edge_changed" => match serde_json::from_str::<EdgeChanged>(payload) {
Ok(e) => WsEvent::EdgeChanged(e),
Err(e) => {
tracing::warn!("Ugyldig edge_changed-payload: {e}");
continue;
"edge_changed" => {
let e: EdgeNotify = match serde_json::from_str(payload) {
Ok(v) => v,
Err(e) => { tracing::warn!("Ugyldig edge_changed-payload: {e}"); continue; }
};
let edge = if e.op != "DELETE" {
enrich_edge(db, e.id).await
} else {
None
};
BroadcastEvent::EdgeChanged {
op: e.op, id: e.id, source_id: e.source_id,
target_id: e.target_id, edge_type: e.edge_type, edge,
}
}
"access_changed" => {
let a: AccessNotify = match serde_json::from_str(payload) {
Ok(v) => v,
Err(e) => { tracing::warn!("Ugyldig access_changed-payload: {e}"); continue; }
};
let row = if a.op != "DELETE" {
enrich_access(db, a.subject_id, a.object_id).await
} else {
None
};
BroadcastEvent::AccessChanged {
op: a.op, subject_id: a.subject_id, object_id: a.object_id,
access: a.access, row,
}
}
"mixer_channel_changed" => {
let m: MixerChannelNotify = match serde_json::from_str(payload) {
Ok(v) => v,
Err(e) => { tracing::warn!("Ugyldig mixer_channel_changed-payload: {e}"); continue; }
};
let channel_data = if m.op != "DELETE" {
enrich_mixer_channel(db, &m.room_id, &m.target_user_id).await
} else {
None
};
BroadcastEvent::MixerChannelChanged {
op: m.op, room_id: m.room_id, target_user_id: m.target_user_id,
channel: channel_data,
}
},
"access_changed" => match serde_json::from_str::<AccessChanged>(payload) {
Ok(a) => WsEvent::AccessChanged(a),
Err(e) => {
tracing::warn!("Ugyldig access_changed-payload: {e}");
continue;
}
},
_ => continue,
};
// Broadcast til alle tilkoblede klienter (filtrering skjer per klient)
let _ = ws.tx.send(event);
}
}
// ---------------------------------------------------------------------------
// Berikelse: hent full rad fra PG etter NOTIFY
// ---------------------------------------------------------------------------
async fn enrich_node(db: &PgPool, id: Uuid) -> Option<serde_json::Value> {
let row = sqlx::query_as::<_, NodeRow>(
"SELECT id, node_kind, title, content, visibility::text, metadata, created_at, created_by FROM nodes WHERE id = $1"
)
.bind(id)
.fetch_optional(db)
.await
.ok()
.flatten()?;
Some(row.to_json())
}
async fn enrich_edge(db: &PgPool, id: Uuid) -> Option<serde_json::Value> {
let row = sqlx::query_as::<_, EdgeRow>(
"SELECT id, source_id, target_id, edge_type, metadata, system, created_at, created_by FROM edges WHERE id = $1"
)
.bind(id)
.fetch_optional(db)
.await
.ok()
.flatten()?;
Some(row.to_json())
}
async fn enrich_access(db: &PgPool, subject_id: Uuid, object_id: Uuid) -> Option<serde_json::Value> {
let row = sqlx::query_as::<_, AccessRow>(
"SELECT subject_id, object_id, access::text, via_edge FROM node_access WHERE subject_id = $1 AND object_id = $2"
)
.bind(subject_id)
.bind(object_id)
.fetch_optional(db)
.await
.ok()
.flatten()?;
Some(row.to_json())
}
async fn enrich_mixer_channel(db: &PgPool, room_id: &str, target_user_id: &str) -> Option<serde_json::Value> {
let row = sqlx::query_as::<_, MixerChannelRow>(
"SELECT room_id, target_user_id, gain, is_muted, active_effects, role, updated_by, updated_at FROM mixer_channels WHERE room_id = $1 AND target_user_id = $2"
)
.bind(room_id)
.bind(target_user_id)
.fetch_optional(db)
.await
.ok()
.flatten()?;
Some(row.to_json())
}
// ---------------------------------------------------------------------------
// WebSocket-endepunkt
// ---------------------------------------------------------------------------
/// Query-parameter for WebSocket-autentisering.
/// Nettlesere kan ikke sende Authorization-header ved WS-oppgradering,
/// så token sendes som ?token=<jwt> i URL-en.
#[derive(Deserialize)]
pub struct WsQuery {
token: String,
}
/// GET /ws?token=<jwt> — WebSocket-oppgradering for sanntidsstrøm.
pub async fn ws_handler(
ws: WebSocketUpgrade,
axum::extract::Query(query): axum::extract::Query<WsQuery>,
State(state): State<crate::AppState>,
) -> Result<Response, (axum::http::StatusCode, String)> {
// Valider JWT manuelt (kan ikke bruke AuthUser-ekstraktor for WS)
let user = validate_ws_token(&query.token, &state)
.await
.map_err(|e| (axum::http::StatusCode::UNAUTHORIZED, e))?;
tracing::info!(
node_id = %user.node_id,
"WebSocket-tilkobling fra bruker"
);
tracing::info!(node_id = %user.node_id, "WebSocket-tilkobling fra bruker");
Ok(ws.on_upgrade(move |socket| handle_socket(socket, user, state)))
}
/// Valider JWT fra query-parameter — samme logikk som AuthUser-ekstraktor.
async fn validate_ws_token(
token: &str,
state: &crate::AppState,
@ -224,18 +414,14 @@ struct WsUser {
node_id: Uuid,
}
/// Håndter en individuell WebSocket-tilkobling.
async fn handle_socket(mut socket: WebSocket, user: WsUser, state: crate::AppState) {
let user_id = user.node_id;
// Last brukerens tilgangsmatrise fra PG
let visible_nodes = match load_visible_nodes(&state.db, user_id).await {
Ok(v) => Arc::new(tokio::sync::RwLock::new(v)),
Err(e) => {
tracing::error!("Kunne ikke laste tilgangsmatrise: {e}");
let _ = socket
.send(Message::Close(None))
.await;
let _ = socket.send(Message::Close(None)).await;
return;
}
};
@ -253,35 +439,34 @@ async fn handle_socket(mut socket: WebSocket, user: WsUser, state: crate::AppSta
}
}
// Abonner på broadcast
let mut rx = state.ws_broadcast.tx.subscribe();
loop {
tokio::select! {
// Motta broadcast-events
result = rx.recv() => {
match result {
Ok(event) => {
let should_send = should_send_to_user(&event, user_id, &visible_nodes).await;
if should_send {
// Oppdater tilgangsmatrise ved access-endringer
if let WsEvent::AccessChanged(ref ac) = event {
if let BroadcastEvent::AccessChanged { ref op, subject_id, object_id, .. } = event {
if subject_id == user_id {
let mut vn = visible_nodes.write().await;
if ac.subject_id == user_id {
if ac.op == "DELETE" {
vn.remove(&ac.object_id);
if op == "DELETE" {
vn.remove(&object_id);
} else {
vn.insert(ac.object_id);
vn.insert(object_id);
}
}
}
let json = match serde_json::to_string(&event) {
let ws_event = event.to_ws_event();
let json = match serde_json::to_string(&ws_event) {
Ok(j) => j,
Err(_) => continue,
};
if socket.send(Message::Text(json.into())).await.is_err() {
break; // Klient frakoblet
break;
}
}
}
@ -289,14 +474,19 @@ async fn handle_socket(mut socket: WebSocket, user: WsUser, state: crate::AppSta
tracing::warn!(
node_id = %user_id,
skipped = n,
"WebSocket-klient sakket etter, mistet {n} meldinger"
"WebSocket-klient sakket etter, mistet {n} meldinger — sender resync"
);
// Kunne sendt full resync her, men for M1 er det OK
// Send full resync ved lag
if let Ok(sync) = build_initial_sync(&state.db, user_id).await {
let json = serde_json::to_string(&sync).unwrap_or_default();
if socket.send(Message::Text(json.into())).await.is_err() {
break;
}
}
}
Err(broadcast::error::RecvError::Closed) => break,
}
}
// Motta meldinger fra klienten (ping/pong, close)
msg = socket.recv() => {
match msg {
Some(Ok(Message::Close(_))) | None => break,
@ -306,7 +496,7 @@ async fn handle_socket(mut socket: WebSocket, user: WsUser, state: crate::AppSta
}
}
Some(Err(_)) => break,
_ => {} // Ignorerer tekst/binær fra klient i M1
_ => {}
}
}
}
@ -319,27 +509,30 @@ async fn handle_socket(mut socket: WebSocket, user: WsUser, state: crate::AppSta
// Tilgangskontroll
// ---------------------------------------------------------------------------
/// Sjekk om en event skal sendes til denne brukeren.
async fn should_send_to_user(
event: &WsEvent,
event: &BroadcastEvent,
user_id: Uuid,
visible_nodes: &tokio::sync::RwLock<std::collections::HashSet<Uuid>>,
) -> bool {
let vn = visible_nodes.read().await;
match event {
WsEvent::NodeChanged(n) => vn.contains(&n.id),
WsEvent::EdgeChanged(e) => vn.contains(&e.source_id) || vn.contains(&e.target_id),
WsEvent::AccessChanged(a) => a.subject_id == user_id,
WsEvent::InitialSync { .. } => true,
BroadcastEvent::NodeChanged { id, .. } => {
let vn = visible_nodes.read().await;
vn.contains(id)
}
BroadcastEvent::EdgeChanged { source_id, target_id, .. } => {
let vn = visible_nodes.read().await;
vn.contains(source_id) || vn.contains(target_id)
}
BroadcastEvent::AccessChanged { subject_id, .. } => *subject_id == user_id,
// Mixer-kanaler sendes til alle (de er per-rom, ingen node-tilgangskontroll)
BroadcastEvent::MixerChannelChanged { .. } => true,
}
}
/// Last alle node-IDer som brukeren har tilgang til.
async fn load_visible_nodes(
db: &PgPool,
user_id: Uuid,
) -> Result<std::collections::HashSet<Uuid>, sqlx::Error> {
// Brukerens egne noder + noder via node_access + offentlige noder
let rows = sqlx::query_scalar::<_, Uuid>(
r#"
SELECT id FROM nodes WHERE created_by = $1
@ -356,12 +549,10 @@ async fn load_visible_nodes(
Ok(rows.into_iter().collect())
}
/// Bygg initial_sync-melding med alle data brukeren kan se.
async fn build_initial_sync(
db: &PgPool,
user_id: Uuid,
) -> Result<WsEvent, sqlx::Error> {
// Noder: egne + tilgang + offentlige
let nodes = sqlx::query_as::<_, NodeRow>(
r#"
SELECT n.id, n.node_kind, n.title, n.content, n.visibility::text,
@ -378,7 +569,6 @@ async fn build_initial_sync(
let node_ids: Vec<Uuid> = nodes.iter().map(|n| n.id).collect();
// Edges der minst én side er synlig
let edges = sqlx::query_as::<_, EdgeRow>(
r#"
SELECT e.id, e.source_id, e.target_id, e.edge_type,
@ -391,7 +581,6 @@ async fn build_initial_sync(
.fetch_all(db)
.await?;
// Access-entries for denne brukeren
let access = sqlx::query_as::<_, AccessRow>(
r#"
SELECT subject_id, object_id, access::text, via_edge
@ -403,15 +592,23 @@ async fn build_initial_sync(
.fetch_all(db)
.await?;
// Mixer-kanaler: alle (ingen tilgangskontroll på rom-nivå foreløpig)
let mixer_channels = sqlx::query_as::<_, MixerChannelRow>(
"SELECT room_id, target_user_id, gain, is_muted, active_effects, role, updated_by, updated_at FROM mixer_channels"
)
.fetch_all(db)
.await?;
Ok(WsEvent::InitialSync {
nodes: nodes.into_iter().map(|n| n.to_json()).collect(),
edges: edges.into_iter().map(|e| e.to_json()).collect(),
access: access.into_iter().map(|a| a.to_json()).collect(),
mixer_channels: mixer_channels.into_iter().map(|m| m.to_json()).collect(),
})
}
// ---------------------------------------------------------------------------
// DB-rader for initial_sync
// DB-rader
// ---------------------------------------------------------------------------
#[derive(sqlx::FromRow)]
@ -478,7 +675,6 @@ struct AccessRow {
impl AccessRow {
fn to_json(&self) -> serde_json::Value {
// Composite key som STDB bruker: "subject_id:object_id"
serde_json::json!({
"id": format!("{}:{}", self.subject_id, self.object_id),
"subjectId": self.subject_id.to_string(),
@ -488,3 +684,31 @@ impl AccessRow {
})
}
}
#[derive(sqlx::FromRow)]
struct MixerChannelRow {
room_id: String,
target_user_id: String,
gain: f64,
is_muted: bool,
active_effects: String,
role: String,
updated_by: String,
updated_at: chrono::DateTime<chrono::Utc>,
}
impl MixerChannelRow {
fn to_json(&self) -> serde_json::Value {
serde_json::json!({
"id": format!("{}:{}", self.room_id, self.target_user_id),
"roomId": self.room_id,
"targetUserId": self.target_user_id,
"gain": self.gain,
"isMuted": self.is_muted,
"activeEffects": self.active_effects,
"role": self.role,
"updatedBy": self.updated_by,
"updatedAt": self.updated_at.timestamp_micros(),
})
}
}

View file

@ -0,0 +1,56 @@
-- 019_mixer_channels.sql
-- Mixer-kanaler for delt lydmixer-tilstand mellom deltakere i LiveKit-rom.
--
-- Migrert fra SpacetimeDB til PostgreSQL som del av Fase M2 (oppgave 22.2).
-- Portvokteren sender endringer via WebSocket til tilkoblede klienter.
BEGIN;
-- =============================================================================
-- mixer_channels: Delt mixer-tilstand per deltaker per rom.
-- =============================================================================
CREATE TABLE IF NOT EXISTS mixer_channels (
room_id TEXT NOT NULL,
target_user_id TEXT NOT NULL,
gain DOUBLE PRECISION NOT NULL DEFAULT 1.0,
is_muted BOOLEAN NOT NULL DEFAULT false,
active_effects TEXT NOT NULL DEFAULT '{}', -- JSON-streng med effekttilstand
role TEXT NOT NULL DEFAULT 'editor',
updated_by TEXT NOT NULL DEFAULT '',
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
PRIMARY KEY (room_id, target_user_id)
);
-- =============================================================================
-- notify_mixer_channel_change: NOTIFY ved endringer i mixer_channels.
-- =============================================================================
CREATE OR REPLACE FUNCTION notify_mixer_channel_change()
RETURNS trigger AS $$
DECLARE
payload json;
target_row mixer_channels%ROWTYPE;
BEGIN
IF TG_OP = 'DELETE' THEN
target_row := OLD;
ELSE
target_row := NEW;
END IF;
payload := json_build_object(
'op', TG_OP,
'room_id', target_row.room_id,
'target_user_id', target_row.target_user_id
);
PERFORM pg_notify('mixer_channel_changed', payload::text);
RETURN target_row;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER mixer_channels_notify
AFTER INSERT OR UPDATE OR DELETE ON mixer_channels
FOR EACH ROW EXECUTE FUNCTION notify_mixer_channel_change();
COMMIT;