From cb7f88035d79065bb82cc535b7ee6384de01eba4 Mon Sep 17 00:00:00 2001 From: vegard Date: Tue, 17 Mar 2026 12:44:27 +0100 Subject: [PATCH] SpacetimeDB-klient og warmup i maskinrommet (oppgave 2.3) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Legger til HTTP-klient som kaller STDB-reducere via JSON API. Warmup-modul laster hele grafen fra PG til STDB ved oppstart. - stdb.rs: HTTP-klient med create/update/delete for noder og edges - warmup.rs: PG → STDB sync (clear_all → noder → edges) - main.rs: Integrerer STDB i AppState, kjører warmup ved oppstart API-format: POST /v1/database/{db}/call/{reducer} med navngitte params. STDB-token kan settes via SPACETIMEDB_TOKEN eller opprettes automatisk. Co-Authored-By: Claude Opus 4.6 --- maskinrommet/src/main.rs | 62 +++++++- maskinrommet/src/stdb.rs | 286 +++++++++++++++++++++++++++++++++++++ maskinrommet/src/warmup.rs | 108 ++++++++++++++ 3 files changed, 454 insertions(+), 2 deletions(-) create mode 100644 maskinrommet/src/stdb.rs create mode 100644 maskinrommet/src/warmup.rs diff --git a/maskinrommet/src/main.rs b/maskinrommet/src/main.rs index b427137..ed3a648 100644 --- a/maskinrommet/src/main.rs +++ b/maskinrommet/src/main.rs @@ -1,4 +1,6 @@ mod auth; +mod stdb; +mod warmup; use axum::{extract::State, http::StatusCode, routing::get, Json, Router}; use serde::Serialize; @@ -8,11 +10,13 @@ use tower_http::trace::TraceLayer; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter}; use auth::{AuthUser, JwksKeys}; +use stdb::StdbClient; #[derive(Clone)] pub struct AppState { pub db: PgPool, pub jwks: JwksKeys, + pub stdb: StdbClient, } #[derive(Serialize)] @@ -20,6 +24,7 @@ struct HealthResponse { status: &'static str, version: &'static str, db: &'static str, + stdb: &'static str, } #[derive(Serialize)] @@ -61,7 +66,46 @@ async fn main() { .await .expect("Kunne ikke hente JWKS fra Authentik"); - let state = AppState { db, jwks }; + // SpacetimeDB-klient + let stdb_url = std::env::var("SPACETIMEDB_URL") + .unwrap_or_else(|_| "http://spacetimedb:3000".to_string()); + let stdb_database = std::env::var("SPACETIMEDB_DATABASE") + .unwrap_or_else(|_| "synops".to_string()); + + // Hent token fra miljøvariabel, eller opprett ny identitet + let stdb_token = match std::env::var("SPACETIMEDB_TOKEN") { + Ok(token) if !token.is_empty() => { + tracing::info!("Bruker konfigurert STDB-token"); + token + } + _ => { + tracing::info!("Ingen STDB-token konfigurert, oppretter ny identitet"); + let (identity, token) = StdbClient::create_identity(&stdb_url) + .await + .expect("Kunne ikke opprette STDB-identitet"); + tracing::info!("Opprettet STDB-identitet: {identity}"); + token + } + }; + + let stdb = StdbClient::new(&stdb_url, &stdb_database, &stdb_token); + + // Warmup: last hele grafen fra PG til SpacetimeDB + match warmup::run(&db, &stdb).await { + Ok(stats) => { + tracing::info!( + "Warmup fullført: {} noder, {} edges", + stats.nodes, + stats.edges + ); + } + Err(e) => { + tracing::error!("Warmup feilet: {e}"); + // Fortsett likevel — STDB kan være midlertidig utilgjengelig + } + } + + let state = AppState { db, jwks, stdb }; // Ruter: /health er offentlig, /me krever gyldig JWT let app = Router::new() @@ -76,17 +120,31 @@ async fn main() { axum::serve(listener, app).await.unwrap(); } -/// Offentlig helsesjekk — ingen auth påkrevd. +/// Offentlig helsesjekk — verifiserer PG- og STDB-tilkobling. async fn health(State(state): State) -> Result, StatusCode> { sqlx::query("SELECT 1") .execute(&state.db) .await .map_err(|_| StatusCode::SERVICE_UNAVAILABLE)?; + // Sjekk STDB ved å kalle clear_all med en ufarlig test + // (vi bruker en enkel healthcheck-reducer i fremtiden) + let stdb_status = match state.stdb.create_node( + "__healthcheck__", "system", "", "", "hidden", "{}", "", + ).await { + Ok(()) => { + // Rydd opp + let _ = state.stdb.delete_node("__healthcheck__").await; + "connected" + } + Err(_) => "unavailable", + }; + Ok(Json(HealthResponse { status: "ok", version: env!("CARGO_PKG_VERSION"), db: "connected", + stdb: stdb_status, })) } diff --git a/maskinrommet/src/stdb.rs b/maskinrommet/src/stdb.rs new file mode 100644 index 0000000..1ff75e1 --- /dev/null +++ b/maskinrommet/src/stdb.rs @@ -0,0 +1,286 @@ +// SpacetimeDB HTTP-klient for maskinrommet. +// +// Kaller STDB-reducere via HTTP JSON API. +// Maskinrommet eier all skriving — denne klienten er eneste vei inn. +// +// API-format: POST /v1/database/{db}/call/{reducer} +// Body: JSON-objekt med navngitte parametre. +// Auth: Bearer-token fra STDB-identitet. +// +// Ref: docs/retninger/datalaget.md, docs/infra/synkronisering.md + +use reqwest::Client; +use serde::Serialize; + +/// SpacetimeDB-klient som kaller reducere via HTTP. +#[derive(Clone)] +pub struct StdbClient { + client: Client, + base_url: String, + database: String, + token: String, +} + +impl StdbClient { + /// Opprett ny klient. `base_url` er STDB-serverens URL (f.eks. "http://spacetimedb:3000"). + pub fn new(base_url: &str, database: &str, token: &str) -> Self { + Self { + client: Client::new(), + base_url: base_url.trim_end_matches('/').to_string(), + database: database.to_string(), + token: token.to_string(), + } + } + + /// Hent en ny identitet og token fra STDB-serveren. + /// Brukes ved oppstart hvis ingen token er konfigurert. + pub async fn create_identity(base_url: &str) -> Result<(String, String), StdbError> { + let client = Client::new(); + let url = format!("{}/v1/identity", base_url.trim_end_matches('/')); + let resp = client.post(&url).send().await?; + + if !resp.status().is_success() { + return Err(StdbError::Http(format!( + "Kunne ikke opprette identitet: {}", + resp.status() + ))); + } + + let body: serde_json::Value = resp.json().await?; + let identity = body["identity"] + .as_str() + .ok_or_else(|| StdbError::Http("Mangler identity i respons".into()))? + .to_string(); + let token = body["token"] + .as_str() + .ok_or_else(|| StdbError::Http("Mangler token i respons".into()))? + .to_string(); + + Ok((identity, token)) + } + + /// Kall en reducer med navngitte parametre (JSON-objekt). + async fn call_reducer(&self, reducer: &str, args: &T) -> Result<(), StdbError> { + let url = format!( + "{}/v1/database/{}/call/{}", + self.base_url, self.database, reducer + ); + + let resp = self + .client + .post(&url) + .bearer_auth(&self.token) + .json(args) + .send() + .await?; + + if resp.status().is_success() { + Ok(()) + } else { + let status = resp.status(); + let body = resp.text().await.unwrap_or_default(); + Err(StdbError::Reducer { + reducer: reducer.to_string(), + status: status.as_u16(), + message: body, + }) + } + } + + // ========================================================================= + // Node-operasjoner + // ========================================================================= + + pub async fn create_node( + &self, + id: &str, + node_kind: &str, + title: &str, + content: &str, + visibility: &str, + metadata: &str, + created_by: &str, + ) -> Result<(), StdbError> { + #[derive(Serialize)] + struct Args<'a> { + id: &'a str, + node_kind: &'a str, + title: &'a str, + content: &'a str, + visibility: &'a str, + metadata: &'a str, + created_by: &'a str, + } + + self.call_reducer( + "create_node", + &Args { + id, + node_kind, + title, + content, + visibility, + metadata, + created_by, + }, + ) + .await + } + + pub async fn update_node( + &self, + id: &str, + node_kind: &str, + title: &str, + content: &str, + visibility: &str, + metadata: &str, + ) -> Result<(), StdbError> { + #[derive(Serialize)] + struct Args<'a> { + id: &'a str, + node_kind: &'a str, + title: &'a str, + content: &'a str, + visibility: &'a str, + metadata: &'a str, + } + + self.call_reducer( + "update_node", + &Args { + id, + node_kind, + title, + content, + visibility, + metadata, + }, + ) + .await + } + + pub async fn delete_node(&self, id: &str) -> Result<(), StdbError> { + #[derive(Serialize)] + struct Args<'a> { + id: &'a str, + } + + self.call_reducer("delete_node", &Args { id }).await + } + + // ========================================================================= + // Edge-operasjoner + // ========================================================================= + + pub async fn create_edge( + &self, + id: &str, + source_id: &str, + target_id: &str, + edge_type: &str, + metadata: &str, + system: bool, + created_by: &str, + ) -> Result<(), StdbError> { + #[derive(Serialize)] + struct Args<'a> { + id: &'a str, + source_id: &'a str, + target_id: &'a str, + edge_type: &'a str, + metadata: &'a str, + system: bool, + created_by: &'a str, + } + + self.call_reducer( + "create_edge", + &Args { + id, + source_id, + target_id, + edge_type, + metadata, + system, + created_by, + }, + ) + .await + } + + pub async fn update_edge( + &self, + id: &str, + edge_type: &str, + metadata: &str, + ) -> Result<(), StdbError> { + #[derive(Serialize)] + struct Args<'a> { + id: &'a str, + edge_type: &'a str, + metadata: &'a str, + } + + self.call_reducer("update_edge", &Args { id, edge_type, metadata }) + .await + } + + pub async fn delete_edge(&self, id: &str) -> Result<(), StdbError> { + #[derive(Serialize)] + struct Args<'a> { + id: &'a str, + } + + self.call_reducer("delete_edge", &Args { id }).await + } + + // ========================================================================= + // Vedlikehold + // ========================================================================= + + /// Tøm alle noder og edges. Brukes ved warmup for å unngå duplikater. + pub async fn clear_all(&self) -> Result<(), StdbError> { + #[derive(Serialize)] + struct Empty {} + + self.call_reducer("clear_all", &Empty {}).await + } +} + +// ============================================================================= +// Feilhåndtering +// ============================================================================= + +#[derive(Debug)] +pub enum StdbError { + /// HTTP-transportfeil (nettverk, timeout) + Http(String), + /// Reducer returnerte feil (400, 500, etc.) + Reducer { + reducer: String, + status: u16, + message: String, + }, +} + +impl std::fmt::Display for StdbError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + StdbError::Http(msg) => write!(f, "STDB HTTP-feil: {msg}"), + StdbError::Reducer { + reducer, + status, + message, + } => write!(f, "STDB reducer {reducer} feilet ({status}): {message}"), + } + } +} + +impl std::error::Error for StdbError {} + +impl From for StdbError { + fn from(e: reqwest::Error) -> Self { + StdbError::Http(e.to_string()) + } +} diff --git a/maskinrommet/src/warmup.rs b/maskinrommet/src/warmup.rs new file mode 100644 index 0000000..d25f30e --- /dev/null +++ b/maskinrommet/src/warmup.rs @@ -0,0 +1,108 @@ +// Warmup: last hele grafen fra PG til SpacetimeDB ved oppstart. +// +// Sekvens: clear_all → noder → edges. +// Edges refererer til noder, så noder må lastes først. +// +// Ref: docs/infra/synkronisering.md + +use sqlx::PgPool; + +use crate::stdb::StdbClient; + +/// Last hele grafen fra PG til SpacetimeDB. +pub async fn run(db: &PgPool, stdb: &StdbClient) -> Result> { + tracing::info!("Warmup: starter (PG → SpacetimeDB)"); + + // 1. Tøm STDB for å unngå duplikater ved restart + stdb.clear_all().await?; + tracing::info!("Warmup: STDB tømt"); + + // 2. Last alle noder + let nodes = sqlx::query_as::<_, PgNode>( + "SELECT id, node_kind::text, title, content, visibility::text, \ + COALESCE(metadata::text, '{}') as metadata, \ + created_at, COALESCE(created_by::text, '') as created_by \ + FROM nodes ORDER BY created_at" + ) + .fetch_all(db) + .await?; + + let node_count = nodes.len(); + for node in &nodes { + stdb.create_node( + &node.id.to_string(), + &node.node_kind, + &node.title, + &node.content, + &node.visibility, + &node.metadata, + &node.created_by, + ) + .await?; + } + tracing::info!("Warmup: {node_count} noder lastet"); + + // 3. Last alle edges + let edges = sqlx::query_as::<_, PgEdge>( + "SELECT id, source_id, target_id, edge_type, \ + COALESCE(metadata::text, '{}') as metadata, \ + system, created_at, COALESCE(created_by::text, '') as created_by \ + FROM edges ORDER BY created_at" + ) + .fetch_all(db) + .await?; + + let edge_count = edges.len(); + for edge in &edges { + stdb.create_edge( + &edge.id.to_string(), + &edge.source_id.to_string(), + &edge.target_id.to_string(), + &edge.edge_type, + &edge.metadata, + edge.system, + &edge.created_by, + ) + .await?; + } + tracing::info!("Warmup: {edge_count} edges lastet"); + + let stats = WarmupStats { + nodes: node_count, + edges: edge_count, + }; + tracing::info!("Warmup: ferdig ({} noder, {} edges)", stats.nodes, stats.edges); + Ok(stats) +} + +pub struct WarmupStats { + pub nodes: usize, + pub edges: usize, +} + +// PG-radtyper for sqlx +#[derive(sqlx::FromRow)] +#[allow(dead_code)] +struct PgNode { + id: uuid::Uuid, + node_kind: String, + title: String, + content: String, + visibility: String, + metadata: String, + created_at: chrono::DateTime, + created_by: String, +} + +#[derive(sqlx::FromRow)] +#[allow(dead_code)] +struct PgEdge { + id: uuid::Uuid, + source_id: uuid::Uuid, + target_id: uuid::Uuid, + edge_type: String, + metadata: String, + system: bool, + created_at: chrono::DateTime, + created_by: String, +}