diff --git a/docs/infra/api_grensesnitt.md b/docs/infra/api_grensesnitt.md index 29e5f1e..0a5311e 100644 --- a/docs/infra/api_grensesnitt.md +++ b/docs/infra/api_grensesnitt.md @@ -3,10 +3,12 @@ ## 1. Konsept Frontend sender **intensjoner** til maskinrommet (Rust/axum HTTP API). -Maskinrommet eier alle skrivinger: det validerer, skriver til SpacetimeDB -(instant), persisterer til PG (asynk), og orkestrerer konsekvenser. +Maskinrommet eier alle skrivinger: det validerer, skriver til PG, +og orkestrerer konsekvenser. -Lesestien for sanntid går direkte fra SpacetimeDB til frontend via WebSocket. +Sanntid: PG LISTEN/NOTIFY → maskinrommet → WebSocket `/ws` → frontend. +SpacetimeDB er under utfasing — frontend kobler til begge i parallell +under verifiseringsfasen (Fase M1). Tunge spørringer (søk, statistikk, graftraversering) går via maskinrommet → PG. ## 2. Kommunikasjonskart @@ -60,6 +62,15 @@ Tunge spørringer (søk, statistikk, graftraversering) går via maskinrommet → ### Offentlige - `GET /health` — Helsesjekk. Verifiserer PG- og STDB-tilkobling. +### WebSocket (sanntid, oppgave 22.1) +- `GET /ws?token=` — WebSocket-oppgradering for sanntidsstrøm. + Token sendes som query-parameter (nettlesere støtter ikke custom headers ved WS upgrade). + - Ved tilkobling: sender `initial_sync` med alle noder, edges og access brukeren kan se. + - Deretter: strømmer `node_changed`, `edge_changed` og `access_changed` events, + filtrert på brukerens tilgangsmatrise (node_access). + - Kilder: PG LISTEN/NOTIFY-triggere på nodes, edges og node_access-tabellene. + - Events er JSON med `{ "type": "node_changed"|"edge_changed"|"access_changed", ... }`. + ### Autentiserte (krever `Authorization: Bearer `) - `GET /me` — Returnerer autentisert brukers `node_id` og `authentik_sub`. - `POST /intentions/create_node` — Opprett node. Skriv til STDB (instant), diff --git a/frontend/src/lib/spacetime/index.ts b/frontend/src/lib/spacetime/index.ts index 3a1c890..2c4f7fa 100644 --- a/frontend/src/lib/spacetime/index.ts +++ b/frontend/src/lib/spacetime/index.ts @@ -7,4 +7,5 @@ export { stdb, connectionState } from './connection.svelte'; export { nodeStore, edgeStore, nodeAccessStore, mixerChannelStore, nodeVisibility } from './stores.svelte'; +export { pgWsConnect, pgWsDisconnect, pgWsState } from './pg-ws.svelte'; export type { Node, Edge, NodeAccess, MixerChannel } from './module_bindings/types'; diff --git a/frontend/src/lib/spacetime/pg-ws.svelte.ts b/frontend/src/lib/spacetime/pg-ws.svelte.ts new file mode 100644 index 0000000..1dfc1fa --- /dev/null +++ b/frontend/src/lib/spacetime/pg-ws.svelte.ts @@ -0,0 +1,119 @@ +/** + * PG WebSocket verification connection (Fase M1). + * + * Connects to portvokterens /ws endpoint in parallel with SpacetimeDB + * for verification. Logs received events to console for comparison. + * + * This module will replace SpacetimeDB entirely in Fase M2. + */ + +import { page } from '$app/stores'; +import { get } from 'svelte/store'; + +export type PgWsState = 'disconnected' | 'connecting' | 'connected' | 'error'; + +let ws: WebSocket | null = null; +let _state = $state('disconnected'); +let _eventCount = $state(0); +let reconnectTimer: ReturnType | null = null; + +/** Reactive connection state for the PG WebSocket. */ +export const pgWsState = { + get current() { + return _state; + }, + get eventCount() { + return _eventCount; + }, +}; + +/** + * Connect to portvokterens WebSocket endpoint. + * Uses the same access token as maskinrommet API calls. + */ +export function pgWsConnect(accessToken: string) { + if (ws) return; + + // Build WebSocket URL from maskinrommet base URL + const apiBase = import.meta.env.VITE_API_URL ?? '/api'; + let wsUrl: string; + + if (apiBase.startsWith('http')) { + // Absolute URL: convert http(s) → ws(s) + wsUrl = apiBase.replace(/^http/, 'ws') + '/ws'; + } else { + // Relative URL: build from current page origin + const proto = window.location.protocol === 'https:' ? 'wss:' : 'ws:'; + wsUrl = `${proto}//${window.location.host}${apiBase}/ws`; + } + + // Add token as query parameter (browsers can't send headers on WS upgrade) + wsUrl += `?token=${encodeURIComponent(accessToken)}`; + + _state = 'connecting'; + console.log('[pg-ws] Connecting to', wsUrl.replace(/token=.*/, 'token=***')); + + ws = new WebSocket(wsUrl); + + ws.onopen = () => { + console.log('[pg-ws] Connected'); + _state = 'connected'; + }; + + ws.onmessage = (event) => { + _eventCount++; + try { + const data = JSON.parse(event.data); + const type = data.type; + + if (type === 'initial_sync') { + console.log( + `[pg-ws] Initial sync: ${data.nodes?.length ?? 0} nodes, ` + + `${data.edges?.length ?? 0} edges, ${data.access?.length ?? 0} access`, + ); + } else { + console.log(`[pg-ws] Event #${_eventCount}:`, type, data); + } + } catch { + console.warn('[pg-ws] Non-JSON message:', event.data); + } + }; + + ws.onerror = (event) => { + console.error('[pg-ws] Error:', event); + _state = 'error'; + }; + + ws.onclose = (event) => { + console.log('[pg-ws] Disconnected:', event.code, event.reason); + ws = null; + _state = 'disconnected'; + + // Auto-reconnect after 5 seconds (for verification phase) + if (!reconnectTimer) { + reconnectTimer = setTimeout(() => { + reconnectTimer = null; + // Re-read token from session + const session = get(page)?.data?.session as Record | undefined; + const token = session?.accessToken as string | undefined; + if (token) { + console.log('[pg-ws] Auto-reconnecting...'); + pgWsConnect(token); + } + }, 5000); + } + }; +} + +/** Disconnect the PG WebSocket. */ +export function pgWsDisconnect() { + if (reconnectTimer) { + clearTimeout(reconnectTimer); + reconnectTimer = null; + } + if (ws) { + ws.close(); + ws = null; + _state = 'disconnected'; + } +} diff --git a/frontend/src/routes/+layout.svelte b/frontend/src/routes/+layout.svelte index 694b6ae..b977154 100644 --- a/frontend/src/routes/+layout.svelte +++ b/frontend/src/routes/+layout.svelte @@ -1,7 +1,7 @@ diff --git a/maskinrommet/src/auth.rs b/maskinrommet/src/auth.rs index 5800f6f..ffc9053 100644 --- a/maskinrommet/src/auth.rs +++ b/maskinrommet/src/auth.rs @@ -64,7 +64,7 @@ impl JwksKeys { } /// Find decoding key by kid, or use the first key if no kid in header. - fn decoding_key(&self, kid: Option<&str>) -> Result { + pub fn decoding_key(&self, kid: Option<&str>) -> Result { let key = match kid { Some(kid) => self .keys diff --git a/maskinrommet/src/ws.rs b/maskinrommet/src/ws.rs index 4431dca..a681202 100644 --- a/maskinrommet/src/ws.rs +++ b/maskinrommet/src/ws.rs @@ -22,7 +22,6 @@ use sqlx::PgPool; use tokio::sync::broadcast; use uuid::Uuid; -use crate::auth::AuthUser; // --------------------------------------------------------------------------- // Typer for NOTIFY-payloads @@ -154,23 +153,79 @@ async fn pg_listen_loop(db: &PgPool, ws: &WsBroadcast) -> Result<(), sqlx::Error // WebSocket-endepunkt // --------------------------------------------------------------------------- -/// GET /ws — WebSocket-oppgradering for sanntidsstrøm. -/// Krever gyldig JWT (AuthUser-ekstraktor). +/// Query-parameter for WebSocket-autentisering. +/// Nettlesere kan ikke sende Authorization-header ved WS-oppgradering, +/// så token sendes som ?token= i URL-en. +#[derive(Deserialize)] +pub struct WsQuery { + token: String, +} + +/// GET /ws?token= — WebSocket-oppgradering for sanntidsstrøm. pub async fn ws_handler( ws: WebSocketUpgrade, - user: AuthUser, + axum::extract::Query(query): axum::extract::Query, State(state): State, -) -> Response { +) -> Result { + // Valider JWT manuelt (kan ikke bruke AuthUser-ekstraktor for WS) + let user = validate_ws_token(&query.token, &state) + .await + .map_err(|e| (axum::http::StatusCode::UNAUTHORIZED, e))?; + tracing::info!( node_id = %user.node_id, "WebSocket-tilkobling fra bruker" ); - ws.on_upgrade(move |socket| handle_socket(socket, user, state)) + Ok(ws.on_upgrade(move |socket| handle_socket(socket, user, state))) +} + +/// Valider JWT fra query-parameter — samme logikk som AuthUser-ekstraktor. +async fn validate_ws_token( + token: &str, + state: &crate::AppState, +) -> Result { + use jsonwebtoken::{decode, decode_header, Algorithm, Validation}; + + let header = decode_header(token) + .map_err(|e| format!("Ugyldig token-header: {e}"))?; + + let decoding_key = state.jwks.decoding_key(header.kid.as_deref()) + .map_err(|e| format!("JWKS-nøkkel ikke funnet: {e}"))?; + + let mut validation = Validation::new(Algorithm::RS256); + validation.set_issuer(&[&state.jwks.issuer]); + validation.set_audience(&[&state.jwks.audience]); + + let token_data = decode::(token, &decoding_key, &validation) + .map_err(|e| format!("JWT-validering feilet: {e}"))?; + + let authentik_sub = &token_data.claims.sub; + + let node_id = sqlx::query_scalar::<_, Uuid>( + "SELECT node_id FROM auth_identities WHERE authentik_sub = $1", + ) + .bind(authentik_sub) + .fetch_optional(&state.db) + .await + .map_err(|e| format!("DB-feil: {e}"))? + .ok_or_else(|| format!("Ukjent brukeridentitet: {authentik_sub}"))?; + + Ok(WsUser { node_id }) +} + +#[derive(Deserialize)] +struct WsClaims { + sub: String, +} + +#[derive(Debug, Clone)] +struct WsUser { + node_id: Uuid, } /// Håndter en individuell WebSocket-tilkobling. -async fn handle_socket(mut socket: WebSocket, user: AuthUser, state: crate::AppState) { +async fn handle_socket(mut socket: WebSocket, user: WsUser, state: crate::AppState) { let user_id = user.node_id; // Last brukerens tilgangsmatrise fra PG diff --git a/tasks.md b/tasks.md index eeae68b..7335497 100644 --- a/tasks.md +++ b/tasks.md @@ -278,8 +278,7 @@ Ref: `docs/retninger/datalaget.md` (revidert mars 2026). Faser ut SpacetimeDB til fordel for PG LISTEN/NOTIFY + WebSocket i portvokteren. Én datakilde, ingen synk-kompleksitet. -- [~] 22.1 WebSocket-lag i portvokteren: implementer PG LISTEN/NOTIFY-lytter og WebSocket-endepunkt. Legg til PG-triggers (`notify_node_change`, `notify_edge_change`) for nodes og edges. Frontend kobler til begge (STDB + nytt WS) i parallell for verifisering. - > Påbegynt: 2026-03-18T11:47 +- [x] 22.1 WebSocket-lag i portvokteren: implementer PG LISTEN/NOTIFY-lytter og WebSocket-endepunkt. Legg til PG-triggers (`notify_node_change`, `notify_edge_change`) for nodes og edges. Frontend kobler til begge (STDB + nytt WS) i parallell for verifisering. - [ ] 22.2 Frontend-migrering: erstatt SpacetimeDB-klient med vanlig WebSocket til portvokteren. Erstatt STDB-stores med reaktive stores som lytter på WebSocket. Verifiser all sanntidsfunksjonalitet (chat, kanban, kalender, mixer, canvas). - [ ] 22.3 Fjern STDB-skrivestien: portvokteren slutter å skrive til SpacetimeDB. All skriving går kun til PG. NOTIFY-triggere er eneste push-mekanisme. Verifiser at ingenting avhenger av STDB-data. - [ ] 22.4 Fjern SpacetimeDB: stopp Docker-container, fjern STDB-modul, fjern STDB-klient fra portvokteren og frontend, fjern synkroniseringskode, oppdater docs og CLAUDE.md.