Fullfør oppgave 22.1: WebSocket-lag med PG LISTEN/NOTIFY og frontend dual-tilkobling

Backend:
- ws.rs: Fikset WS auth via query-param (browser kan ikke sende headers ved WS upgrade)
- auth.rs: Gjort decoding_key pub for gjenbruk i ws-modulen

Frontend:
- pg-ws.svelte.ts: Ny PG WebSocket-klient med auto-reconnect og event-logging
- index.ts: Eksporterer pgWsConnect/pgWsDisconnect/pgWsState
- +layout.svelte: Kobler til PG WS i parallell med STDB for verifisering

Docs:
- api_grensesnitt.md: Dokumentert /ws endepunkt og sanntidsarkitektur
- tasks.md: Merket 22.1 som ferdig

Deploy: Krever restart av maskinrommet + rebuild av frontend.
This commit is contained in:
vegard 2026-03-18 12:01:10 +00:00
parent 0ecb7104c0
commit d7a9f3816d
7 changed files with 211 additions and 14 deletions

View file

@ -3,10 +3,12 @@
## 1. Konsept ## 1. Konsept
Frontend sender **intensjoner** til maskinrommet (Rust/axum HTTP API). Frontend sender **intensjoner** til maskinrommet (Rust/axum HTTP API).
Maskinrommet eier alle skrivinger: det validerer, skriver til SpacetimeDB Maskinrommet eier alle skrivinger: det validerer, skriver til PG,
(instant), persisterer til PG (asynk), og orkestrerer konsekvenser. og orkestrerer konsekvenser.
Lesestien for sanntid går direkte fra SpacetimeDB til frontend via WebSocket. Sanntid: PG LISTEN/NOTIFY → maskinrommet → WebSocket `/ws` → frontend.
SpacetimeDB er under utfasing — frontend kobler til begge i parallell
under verifiseringsfasen (Fase M1).
Tunge spørringer (søk, statistikk, graftraversering) går via maskinrommet → PG. Tunge spørringer (søk, statistikk, graftraversering) går via maskinrommet → PG.
## 2. Kommunikasjonskart ## 2. Kommunikasjonskart
@ -60,6 +62,15 @@ Tunge spørringer (søk, statistikk, graftraversering) går via maskinrommet →
### Offentlige ### Offentlige
- `GET /health` — Helsesjekk. Verifiserer PG- og STDB-tilkobling. - `GET /health` — Helsesjekk. Verifiserer PG- og STDB-tilkobling.
### WebSocket (sanntid, oppgave 22.1)
- `GET /ws?token=<JWT>` — WebSocket-oppgradering for sanntidsstrøm.
Token sendes som query-parameter (nettlesere støtter ikke custom headers ved WS upgrade).
- Ved tilkobling: sender `initial_sync` med alle noder, edges og access brukeren kan se.
- Deretter: strømmer `node_changed`, `edge_changed` og `access_changed` events,
filtrert på brukerens tilgangsmatrise (node_access).
- Kilder: PG LISTEN/NOTIFY-triggere på nodes, edges og node_access-tabellene.
- Events er JSON med `{ "type": "node_changed"|"edge_changed"|"access_changed", ... }`.
### Autentiserte (krever `Authorization: Bearer <JWT>`) ### Autentiserte (krever `Authorization: Bearer <JWT>`)
- `GET /me` — Returnerer autentisert brukers `node_id` og `authentik_sub`. - `GET /me` — Returnerer autentisert brukers `node_id` og `authentik_sub`.
- `POST /intentions/create_node` — Opprett node. Skriv til STDB (instant), - `POST /intentions/create_node` — Opprett node. Skriv til STDB (instant),

View file

@ -7,4 +7,5 @@
export { stdb, connectionState } from './connection.svelte'; export { stdb, connectionState } from './connection.svelte';
export { nodeStore, edgeStore, nodeAccessStore, mixerChannelStore, nodeVisibility } from './stores.svelte'; export { nodeStore, edgeStore, nodeAccessStore, mixerChannelStore, nodeVisibility } from './stores.svelte';
export { pgWsConnect, pgWsDisconnect, pgWsState } from './pg-ws.svelte';
export type { Node, Edge, NodeAccess, MixerChannel } from './module_bindings/types'; export type { Node, Edge, NodeAccess, MixerChannel } from './module_bindings/types';

View file

@ -0,0 +1,119 @@
/**
* PG WebSocket verification connection (Fase M1).
*
* Connects to portvokterens /ws endpoint in parallel with SpacetimeDB
* for verification. Logs received events to console for comparison.
*
* This module will replace SpacetimeDB entirely in Fase M2.
*/
import { page } from '$app/stores';
import { get } from 'svelte/store';
export type PgWsState = 'disconnected' | 'connecting' | 'connected' | 'error';
let ws: WebSocket | null = null;
let _state = $state<PgWsState>('disconnected');
let _eventCount = $state(0);
let reconnectTimer: ReturnType<typeof setTimeout> | null = null;
/** Reactive connection state for the PG WebSocket. */
export const pgWsState = {
get current() {
return _state;
},
get eventCount() {
return _eventCount;
},
};
/**
* Connect to portvokterens WebSocket endpoint.
* Uses the same access token as maskinrommet API calls.
*/
export function pgWsConnect(accessToken: string) {
if (ws) return;
// Build WebSocket URL from maskinrommet base URL
const apiBase = import.meta.env.VITE_API_URL ?? '/api';
let wsUrl: string;
if (apiBase.startsWith('http')) {
// Absolute URL: convert http(s) → ws(s)
wsUrl = apiBase.replace(/^http/, 'ws') + '/ws';
} else {
// Relative URL: build from current page origin
const proto = window.location.protocol === 'https:' ? 'wss:' : 'ws:';
wsUrl = `${proto}//${window.location.host}${apiBase}/ws`;
}
// Add token as query parameter (browsers can't send headers on WS upgrade)
wsUrl += `?token=${encodeURIComponent(accessToken)}`;
_state = 'connecting';
console.log('[pg-ws] Connecting to', wsUrl.replace(/token=.*/, 'token=***'));
ws = new WebSocket(wsUrl);
ws.onopen = () => {
console.log('[pg-ws] Connected');
_state = 'connected';
};
ws.onmessage = (event) => {
_eventCount++;
try {
const data = JSON.parse(event.data);
const type = data.type;
if (type === 'initial_sync') {
console.log(
`[pg-ws] Initial sync: ${data.nodes?.length ?? 0} nodes, ` +
`${data.edges?.length ?? 0} edges, ${data.access?.length ?? 0} access`,
);
} else {
console.log(`[pg-ws] Event #${_eventCount}:`, type, data);
}
} catch {
console.warn('[pg-ws] Non-JSON message:', event.data);
}
};
ws.onerror = (event) => {
console.error('[pg-ws] Error:', event);
_state = 'error';
};
ws.onclose = (event) => {
console.log('[pg-ws] Disconnected:', event.code, event.reason);
ws = null;
_state = 'disconnected';
// Auto-reconnect after 5 seconds (for verification phase)
if (!reconnectTimer) {
reconnectTimer = setTimeout(() => {
reconnectTimer = null;
// Re-read token from session
const session = get(page)?.data?.session as Record<string, unknown> | undefined;
const token = session?.accessToken as string | undefined;
if (token) {
console.log('[pg-ws] Auto-reconnecting...');
pgWsConnect(token);
}
}, 5000);
}
};
}
/** Disconnect the PG WebSocket. */
export function pgWsDisconnect() {
if (reconnectTimer) {
clearTimeout(reconnectTimer);
reconnectTimer = null;
}
if (ws) {
ws.close();
ws = null;
_state = 'disconnected';
}
}

View file

@ -1,7 +1,7 @@
<script lang="ts"> <script lang="ts">
import '../app.css'; import '../app.css';
import { page } from '$app/stores'; import { page } from '$app/stores';
import { stdb } from '$lib/spacetime'; import { stdb, pgWsConnect, pgWsDisconnect } from '$lib/spacetime';
import { browser } from '$app/environment'; import { browser } from '$app/environment';
import SystemAnnouncements from '$lib/components/SystemAnnouncements.svelte'; import SystemAnnouncements from '$lib/components/SystemAnnouncements.svelte';
@ -16,6 +16,18 @@
if (browser) stdb.disconnect(); if (browser) stdb.disconnect();
}; };
}); });
// Connect PG WebSocket in parallel for verification (Fase M1, oppgave 22.1)
$effect(() => {
const session = $page.data.session as Record<string, unknown> | undefined;
const accessToken = session?.accessToken as string | undefined;
if (browser && accessToken) {
pgWsConnect(accessToken);
}
return () => {
if (browser) pgWsDisconnect();
};
});
</script> </script>
<SystemAnnouncements /> <SystemAnnouncements />

View file

@ -64,7 +64,7 @@ impl JwksKeys {
} }
/// Find decoding key by kid, or use the first key if no kid in header. /// Find decoding key by kid, or use the first key if no kid in header.
fn decoding_key(&self, kid: Option<&str>) -> Result<DecodingKey, String> { pub fn decoding_key(&self, kid: Option<&str>) -> Result<DecodingKey, String> {
let key = match kid { let key = match kid {
Some(kid) => self Some(kid) => self
.keys .keys

View file

@ -22,7 +22,6 @@ use sqlx::PgPool;
use tokio::sync::broadcast; use tokio::sync::broadcast;
use uuid::Uuid; use uuid::Uuid;
use crate::auth::AuthUser;
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// Typer for NOTIFY-payloads // Typer for NOTIFY-payloads
@ -154,23 +153,79 @@ async fn pg_listen_loop(db: &PgPool, ws: &WsBroadcast) -> Result<(), sqlx::Error
// WebSocket-endepunkt // WebSocket-endepunkt
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
/// GET /ws — WebSocket-oppgradering for sanntidsstrøm. /// Query-parameter for WebSocket-autentisering.
/// Krever gyldig JWT (AuthUser-ekstraktor). /// Nettlesere kan ikke sende Authorization-header ved WS-oppgradering,
/// så token sendes som ?token=<jwt> i URL-en.
#[derive(Deserialize)]
pub struct WsQuery {
token: String,
}
/// GET /ws?token=<jwt> — WebSocket-oppgradering for sanntidsstrøm.
pub async fn ws_handler( pub async fn ws_handler(
ws: WebSocketUpgrade, ws: WebSocketUpgrade,
user: AuthUser, axum::extract::Query(query): axum::extract::Query<WsQuery>,
State(state): State<crate::AppState>, State(state): State<crate::AppState>,
) -> Response { ) -> Result<Response, (axum::http::StatusCode, String)> {
// Valider JWT manuelt (kan ikke bruke AuthUser-ekstraktor for WS)
let user = validate_ws_token(&query.token, &state)
.await
.map_err(|e| (axum::http::StatusCode::UNAUTHORIZED, e))?;
tracing::info!( tracing::info!(
node_id = %user.node_id, node_id = %user.node_id,
"WebSocket-tilkobling fra bruker" "WebSocket-tilkobling fra bruker"
); );
ws.on_upgrade(move |socket| handle_socket(socket, user, state)) Ok(ws.on_upgrade(move |socket| handle_socket(socket, user, state)))
}
/// Valider JWT fra query-parameter — samme logikk som AuthUser-ekstraktor.
async fn validate_ws_token(
token: &str,
state: &crate::AppState,
) -> Result<WsUser, String> {
use jsonwebtoken::{decode, decode_header, Algorithm, Validation};
let header = decode_header(token)
.map_err(|e| format!("Ugyldig token-header: {e}"))?;
let decoding_key = state.jwks.decoding_key(header.kid.as_deref())
.map_err(|e| format!("JWKS-nøkkel ikke funnet: {e}"))?;
let mut validation = Validation::new(Algorithm::RS256);
validation.set_issuer(&[&state.jwks.issuer]);
validation.set_audience(&[&state.jwks.audience]);
let token_data = decode::<WsClaims>(token, &decoding_key, &validation)
.map_err(|e| format!("JWT-validering feilet: {e}"))?;
let authentik_sub = &token_data.claims.sub;
let node_id = sqlx::query_scalar::<_, Uuid>(
"SELECT node_id FROM auth_identities WHERE authentik_sub = $1",
)
.bind(authentik_sub)
.fetch_optional(&state.db)
.await
.map_err(|e| format!("DB-feil: {e}"))?
.ok_or_else(|| format!("Ukjent brukeridentitet: {authentik_sub}"))?;
Ok(WsUser { node_id })
}
#[derive(Deserialize)]
struct WsClaims {
sub: String,
}
#[derive(Debug, Clone)]
struct WsUser {
node_id: Uuid,
} }
/// Håndter en individuell WebSocket-tilkobling. /// Håndter en individuell WebSocket-tilkobling.
async fn handle_socket(mut socket: WebSocket, user: AuthUser, state: crate::AppState) { async fn handle_socket(mut socket: WebSocket, user: WsUser, state: crate::AppState) {
let user_id = user.node_id; let user_id = user.node_id;
// Last brukerens tilgangsmatrise fra PG // Last brukerens tilgangsmatrise fra PG

View file

@ -278,8 +278,7 @@ Ref: `docs/retninger/datalaget.md` (revidert mars 2026). Faser ut SpacetimeDB
til fordel for PG LISTEN/NOTIFY + WebSocket i portvokteren. Én datakilde, til fordel for PG LISTEN/NOTIFY + WebSocket i portvokteren. Én datakilde,
ingen synk-kompleksitet. ingen synk-kompleksitet.
- [~] 22.1 WebSocket-lag i portvokteren: implementer PG LISTEN/NOTIFY-lytter og WebSocket-endepunkt. Legg til PG-triggers (`notify_node_change`, `notify_edge_change`) for nodes og edges. Frontend kobler til begge (STDB + nytt WS) i parallell for verifisering. - [x] 22.1 WebSocket-lag i portvokteren: implementer PG LISTEN/NOTIFY-lytter og WebSocket-endepunkt. Legg til PG-triggers (`notify_node_change`, `notify_edge_change`) for nodes og edges. Frontend kobler til begge (STDB + nytt WS) i parallell for verifisering.
> Påbegynt: 2026-03-18T11:47
- [ ] 22.2 Frontend-migrering: erstatt SpacetimeDB-klient med vanlig WebSocket til portvokteren. Erstatt STDB-stores med reaktive stores som lytter på WebSocket. Verifiser all sanntidsfunksjonalitet (chat, kanban, kalender, mixer, canvas). - [ ] 22.2 Frontend-migrering: erstatt SpacetimeDB-klient med vanlig WebSocket til portvokteren. Erstatt STDB-stores med reaktive stores som lytter på WebSocket. Verifiser all sanntidsfunksjonalitet (chat, kanban, kalender, mixer, canvas).
- [ ] 22.3 Fjern STDB-skrivestien: portvokteren slutter å skrive til SpacetimeDB. All skriving går kun til PG. NOTIFY-triggere er eneste push-mekanisme. Verifiser at ingenting avhenger av STDB-data. - [ ] 22.3 Fjern STDB-skrivestien: portvokteren slutter å skrive til SpacetimeDB. All skriving går kun til PG. NOTIFY-triggere er eneste push-mekanisme. Verifiser at ingenting avhenger av STDB-data.
- [ ] 22.4 Fjern SpacetimeDB: stopp Docker-container, fjern STDB-modul, fjern STDB-klient fra portvokteren og frontend, fjern synkroniseringskode, oppdater docs og CLAUDE.md. - [ ] 22.4 Fjern SpacetimeDB: stopp Docker-container, fjern STDB-modul, fjern STDB-klient fra portvokteren og frontend, fjern synkroniseringskode, oppdater docs og CLAUDE.md.