From aee6adc4251b1f3f8ac921a2ecfed25ce43a5f9f Mon Sep 17 00:00:00 2001 From: vegard Date: Wed, 18 Mar 2026 13:11:33 +0000 Subject: [PATCH] =?UTF-8?q?Fjern=20STDB-skrivestien:=20all=20skriving=20g?= =?UTF-8?q?=C3=A5r=20kun=20til=20PG=20(oppgave=2022.3)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit SpacetimeDB var brukt som «instant feedback»-lag mellom portvokteren og frontend. Nå som PG NOTIFY-triggere og WebSocket er på plass (oppgave 22.1–22.2), er STDB-skrivestien overflødig. Endringer: - intentions.rs: Alle CRUD-operasjoner (create/update/delete node/edge) skriver nå synkront til PG i stedet for STDB-først + async PG-jobbkø. PG NOTIFY-triggere gir umiddelbar sanntidsoppdatering til klienter. Tilgangsgivende edges (owner/admin/member_of/reader) bruker transaksjon med recompute_access direkte i handleren. - maintenance.rs: Fjernet StdbClient fra alle funksjoner. Varsler opprettes/oppdateres/slettes direkte i PG. - agent.rs, audio.rs, tts.rs, ai_process.rs: Fjernet STDB-synk etter CLI-verktøy-kjøring. PG NOTIFY dekker sanntidsvisning. - pg_writes.rs: Fjernet sync_node_access_to_stdb. access_changed NOTIFY-trigger håndterer dette. - workspace.rs: Synkrone PG-skrivinger med recompute_access. - summarize.rs, ai_edges.rs: Fjernet StdbClient fra signaturer. - jobs.rs: Fjernet StdbClient fra dispatch og start_worker. - main.rs: Fjernet STDB-initialisering, warmup, stdb_monitor. StdbClient fjernet fra AppState. stdb.rs beholdt som død kode (fjernes i oppgave 22.4). - health.rs: Fjernet STDB-helsesjekk fra dashboard. - Slettet warmup.rs og stdb_monitor.rs (PG→STDB-synk ikke lenger relevant). - docs/retninger/datalaget.md: Markert fase M3 som fullført. --- docs/retninger/datalaget.md | 10 +- maskinrommet/src/agent.rs | 33 +- maskinrommet/src/ai_edges.rs | 6 - maskinrommet/src/ai_process.rs | 70 +-- maskinrommet/src/audio.rs | 39 +- maskinrommet/src/health.rs | 23 +- maskinrommet/src/intentions.rs | 1009 ++++++++++++------------------ maskinrommet/src/jobs.rs | 22 +- maskinrommet/src/main.rs | 65 +- maskinrommet/src/maintenance.rs | 65 +- maskinrommet/src/pg_writes.rs | 56 +- maskinrommet/src/stdb_monitor.rs | 145 ----- maskinrommet/src/summarize.rs | 2 - maskinrommet/src/tts.rs | 45 +- maskinrommet/src/warmup.rs | 141 ----- maskinrommet/src/workspace.rs | 131 ++-- tasks.md | 3 +- 17 files changed, 488 insertions(+), 1377 deletions(-) delete mode 100644 maskinrommet/src/stdb_monitor.rs delete mode 100644 maskinrommet/src/warmup.rs diff --git a/docs/retninger/datalaget.md b/docs/retninger/datalaget.md index c0886cc..ea3c725 100644 --- a/docs/retninger/datalaget.md +++ b/docs/retninger/datalaget.md @@ -139,9 +139,13 @@ NOTIFY (ikke bare ID) slik at stores kan oppdateres uten ekstra API-kall. Mixer-kanaler migrert fra STDB til PG-tabell med tilhørende NOTIFY-trigger og HTTP API-endepunkter. -### Fase M3: Fjern skrivestien til STDB -Portvokteren slutter å skrive til SpacetimeDB. All skriving -går kun til PG. NOTIFY-triggere er eneste push-mekanisme. +### Fase M3: Fjern skrivestien til STDB ✅ +Portvokteren skriver kun til PG. STDB-skrivestien er fjernet. +Alle intensjoner (create/update/delete node/edge) skriver +synkront til PG. NOTIFY-triggere er eneste push-mekanisme. +Warmup (PG→STDB) og STDB-monitor er fjernet. StdbClient er +fjernet fra AppState. Job-handlere (agent, audio, tts, ai_process) +synker ikke lenger til STDB — PG NOTIFY dekker sanntid. ### Fase M4: Fjern STDB - Stopp SpacetimeDB Docker-container diff --git a/maskinrommet/src/agent.rs b/maskinrommet/src/agent.rs index 1cd26a0..612f512 100644 --- a/maskinrommet/src/agent.rs +++ b/maskinrommet/src/agent.rs @@ -1,8 +1,8 @@ // Agent-dispatcher — delegerer prosessering til synops-respond CLI. // -// Maskinrommet beholder: kill switch, rate limiting, loop-prevensjon, -// og STDB-skriving (sanntidsvisning). Alt annet (kontekst-henting, -// prompt-bygging, claude-kall, PG-skriving) gjøres av synops-respond. +// Maskinrommet beholder: kill switch, rate limiting, loop-prevensjon. +// Alt annet (kontekst-henting, prompt-bygging, claude-kall, PG-skriving) +// gjøres av synops-respond. PG NOTIFY-triggere sender sanntidsoppdateringer. // // Jobbtype: "agent_respond" // Payload: { "communication_id", "message_id", "agent_node_id", "sender_node_id" } @@ -15,7 +15,6 @@ use sqlx::PgPool; use uuid::Uuid; use crate::jobs::JobRow; -use crate::stdb::StdbClient; #[derive(Debug)] struct AgentConfig { @@ -32,7 +31,6 @@ fn respond_bin() -> String { pub async fn handle_agent_respond( job: &JobRow, db: &PgPool, - stdb: &StdbClient, ) -> Result { let communication_id: Uuid = job.payload["communication_id"] .as_str().and_then(|s| s.parse().ok()) @@ -136,29 +134,8 @@ pub async fn handle_agent_respond( let result: serde_json::Value = serde_json::from_str(&stdout) .map_err(|e| format!("Kunne ikke parse synops-respond output: {e}"))?; - // --- STDB-skriving for sanntidsvisning (forblir i maskinrommet) --- - - if result["status"].as_str() == Some("completed") { - if let Some(reply_node_id) = result["reply_node_id"].as_str() { - let response_text = result["response_text"].as_str().unwrap_or(""); - let agent_str = agent_node_id.to_string(); - let comm_str = communication_id.to_string(); - let edge_id = Uuid::now_v7().to_string(); - let empty = serde_json::json!({}).to_string(); - - if let Err(e) = stdb.create_node( - reply_node_id, "content", "", response_text, "hidden", &empty, &agent_str, - ).await { - tracing::warn!(error = %e, "STDB create_node feilet (PG er allerede skrevet)"); - } - - if let Err(e) = stdb.create_edge( - &edge_id, reply_node_id, &comm_str, "belongs_to", &empty, false, &agent_str, - ).await { - tracing::warn!(error = %e, "STDB create_edge feilet (PG er allerede skrevet)"); - } - } - } + // PG-skriving gjøres av synops-respond med --write. + // PG NOTIFY-triggere sender sanntidsoppdateringer til WebSocket-klienter. tracing::info!( status = result["status"].as_str().unwrap_or("unknown"), diff --git a/maskinrommet/src/ai_edges.rs b/maskinrommet/src/ai_edges.rs index f0a038a..1d88927 100644 --- a/maskinrommet/src/ai_edges.rs +++ b/maskinrommet/src/ai_edges.rs @@ -3,10 +3,6 @@ // Maskinrommet orkestrerer, CLI-verktøyet gjør jobben (LLM-kall, // topic-opprettelse, edge-skriving, ressurslogging). // -// STDB-synk for topic-noder og mentions-edges hoppes over her — -// topics er bakgrunnsdata i kunnskapsgrafen og trenger ikke -// sanntidsvisning. De synkes ved neste STDB-refresh. -// // Jobbtype: "suggest_edges" // Payload: { "node_id": "" } // @@ -17,7 +13,6 @@ use uuid::Uuid; use crate::cli_dispatch; use crate::jobs::JobRow; -use crate::stdb::StdbClient; /// Synops-suggest-edges binary path. fn suggest_edges_bin() -> String { @@ -32,7 +27,6 @@ fn suggest_edges_bin() -> String { pub async fn handle_suggest_edges( job: &JobRow, _db: &sqlx::PgPool, - _stdb: &StdbClient, ) -> Result { let node_id: Uuid = job .payload diff --git a/maskinrommet/src/ai_process.rs b/maskinrommet/src/ai_process.rs index 3b3ba96..6ab2493 100644 --- a/maskinrommet/src/ai_process.rs +++ b/maskinrommet/src/ai_process.rs @@ -27,7 +27,6 @@ use uuid::Uuid; use crate::jobs::JobRow; use crate::resource_usage; -use crate::stdb::StdbClient; #[derive(sqlx::FromRow)] struct SourceNodeRow { @@ -100,7 +99,6 @@ fn model_profile_to_alias(profile: &str) -> &'static str { pub async fn handle_ai_process( job: &JobRow, db: &PgPool, - stdb: &StdbClient, ) -> Result { let source_node_id: Uuid = job .payload @@ -251,7 +249,7 @@ pub async fn handle_ai_process( match direction { "tool_to_node" => { handle_tool_to_node( - db, stdb, job, source_node_id, ai_preset_id, requested_by, + db, job, source_node_id, ai_preset_id, requested_by, &source, &source_content, &ai_output, ).await?; @@ -273,7 +271,7 @@ pub async fn handle_ai_process( } "node_to_tool" => { let new_node_id = handle_node_to_tool( - db, stdb, source_node_id, ai_preset_id, requested_by, + db, source_node_id, ai_preset_id, requested_by, &source, &ai_output, preset.title.as_deref(), ).await?; @@ -304,7 +302,6 @@ pub async fn handle_ai_process( /// Ref: docs/features/ai_verktoy.md § 2.2 async fn handle_tool_to_node( db: &PgPool, - stdb: &StdbClient, job: &JobRow, source_node_id: Uuid, ai_preset_id: Uuid, @@ -331,20 +328,7 @@ async fn handle_tool_to_node( .await .map_err(|e| format!("Kunne ikke lagre revisjon: {e}"))?; - // 2. Oppdater node content i STDB (sanntid) - let metadata_str = source.metadata.to_string(); - stdb.update_node( - &source_node_id.to_string(), - &source.node_kind, - source.title.as_deref().unwrap_or(""), - ai_output, - &source.visibility, - &metadata_str, - ) - .await - .map_err(|e| format!("STDB update_node feilet: {e}"))?; - - // 3. Oppdater node content i PG (persistering) + // 2. Oppdater node content i PG (NOTIFY-trigger sender sanntidsoppdatering) sqlx::query( "UPDATE nodes SET content = $2 WHERE id = $1", ) @@ -363,7 +347,6 @@ async fn handle_tool_to_node( /// Ref: docs/features/ai_verktoy.md § 2.2 async fn handle_node_to_tool( db: &PgPool, - stdb: &StdbClient, source_node_id: Uuid, ai_preset_id: Uuid, requested_by: Uuid, @@ -382,23 +365,8 @@ async fn handle_node_to_tool( "source_node_id": source_node_id.to_string(), "ai_preset_id": ai_preset_id.to_string() }); - let new_metadata_str = new_metadata.to_string(); - let empty_meta = serde_json::json!({}).to_string(); - // 1. Opprett ny node i STDB (sanntid) - stdb.create_node( - &new_node_id.to_string(), - "content", - &new_title, - ai_output, - &source.visibility, - &new_metadata_str, - &requested_by.to_string(), - ) - .await - .map_err(|e| format!("STDB create_node feilet: {e}"))?; - - // 2. Opprett ny node i PG (persistering) + // 1. Opprett ny node i PG (NOTIFY-trigger sender sanntidsoppdatering) sqlx::query( r#" INSERT INTO nodes (id, node_kind, title, content, visibility, metadata, created_by) @@ -415,21 +383,8 @@ async fn handle_node_to_tool( .await .map_err(|e| format!("PG insert ny node feilet: {e}"))?; - // 3. Opprett derived_from-edge: ny node → kilde-node - // Sporbarhet: "denne noden er avledet fra kilden" + // 2. Opprett derived_from-edge: ny node → kilde-node let derived_edge_id = Uuid::now_v7(); - stdb.create_edge( - &derived_edge_id.to_string(), - &new_node_id.to_string(), - &source_node_id.to_string(), - "derived_from", - &empty_meta, - false, - &requested_by.to_string(), - ) - .await - .map_err(|e| format!("STDB create_edge (derived_from) feilet: {e}"))?; - sqlx::query( r#" INSERT INTO edges (id, source_id, target_id, edge_type, metadata, system, created_by) @@ -444,21 +399,8 @@ async fn handle_node_to_tool( .await .map_err(|e| format!("PG insert derived_from-edge feilet: {e}"))?; - // 4. Opprett processed_by-edge: ny node → AI-preset - // Sporbarhet: "denne noden ble prosessert av dette AI-verktøyet" + // 3. Opprett processed_by-edge: ny node → AI-preset let processed_edge_id = Uuid::now_v7(); - stdb.create_edge( - &processed_edge_id.to_string(), - &new_node_id.to_string(), - &ai_preset_id.to_string(), - "processed_by", - &empty_meta, - false, - &requested_by.to_string(), - ) - .await - .map_err(|e| format!("STDB create_edge (processed_by) feilet: {e}"))?; - sqlx::query( r#" INSERT INTO edges (id, source_id, target_id, edge_type, metadata, system, created_by) diff --git a/maskinrommet/src/audio.rs b/maskinrommet/src/audio.rs index 05d6657..bb91a8e 100644 --- a/maskinrommet/src/audio.rs +++ b/maskinrommet/src/audio.rs @@ -10,7 +10,6 @@ use uuid::Uuid; use crate::cas::CasStore; use crate::jobs::JobRow; -use crate::stdb::StdbClient; // ─── EDL-datastrukturer ─────────────────────────────────────────── @@ -786,7 +785,6 @@ fn audio_bin() -> String { pub async fn handle_audio_process_job( job: &JobRow, _db: &PgPool, - stdb: &StdbClient, cas: &CasStore, ) -> Result { let media_node_id: Uuid = job.payload["media_node_id"] @@ -837,41 +835,8 @@ pub async fn handle_audio_process_job( let result = crate::cli_dispatch::run_cli_tool(&bin, &mut cmd).await?; - // --- STDB-synk for sanntidsvisning --- - if let Some(processed_node_id) = result["processed_node_id"].as_str() { - let result_hash = result["cas_hash"].as_str().unwrap_or(""); - let result_size = result["size_bytes"].as_u64().unwrap_or(0); - let mime = match output_format { - "mp3" => "audio/mpeg", - "wav" => "audio/wav", - "flac" => "audio/flac", - "ogg" => "audio/ogg", - _ => "audio/mpeg", - }; - - let metadata = serde_json::json!({ - "cas_hash": result_hash, - "mime": mime, - "size_bytes": result_size, - "source_hash": cas_hash, - }); - - if let Err(e) = stdb.create_node( - processed_node_id, "media", "Prosessert lyd", "", "hidden", - &metadata.to_string(), &requested_by.to_string(), - ).await { - tracing::warn!(error = %e, "STDB create_node (audio) feilet (PG er allerede skrevet)"); - } - - // derived_from edge: processed → original - let edge_id = Uuid::now_v7().to_string(); - if let Err(e) = stdb.create_edge( - &edge_id, processed_node_id, &media_node_id.to_string(), - "derived_from", "{}", true, &requested_by.to_string(), - ).await { - tracing::warn!(error = %e, "STDB create_edge (derived_from) feilet (PG er allerede skrevet)"); - } - } + // PG-skriving gjøres av synops-audio med --write. + // PG NOTIFY-triggere sender sanntidsoppdateringer til WebSocket-klienter. tracing::info!( original = %media_node_id, diff --git a/maskinrommet/src/health.rs b/maskinrommet/src/health.rs index 3d9229b..923243b 100644 --- a/maskinrommet/src/health.rs +++ b/maskinrommet/src/health.rs @@ -102,24 +102,6 @@ async fn check_pg(db: &PgPool) -> ServiceStatus { } } -async fn check_stdb(stdb: &crate::stdb::StdbClient) -> ServiceStatus { - let start = std::time::Instant::now(); - match stdb.delete_node("__healthcheck_nonexistent__").await { - Ok(()) => ServiceStatus { - name: "SpacetimeDB".to_string(), - status: "up".to_string(), - latency_ms: Some(start.elapsed().as_millis() as u64), - details: None, - }, - Err(e) => ServiceStatus { - name: "SpacetimeDB".to_string(), - status: "down".to_string(), - latency_ms: None, - details: Some(format!("{e}")), - }, - } -} - /// Sjekk en HTTP-tjeneste med timeout. async fn check_http_service(name: &str, url: &str) -> ServiceStatus { let client = reqwest::Client::builder() @@ -458,9 +440,8 @@ pub async fn health_dashboard( _user: AuthUser, ) -> Result, (StatusCode, Json)> { // Kjør alle tjeneste-sjekker parallelt - let (pg, stdb, caddy, authentik, litellm, whisper, livekit) = tokio::join!( + let (pg, caddy, authentik, litellm, whisper, livekit) = tokio::join!( check_pg(&state.db), - check_stdb(&state.stdb), check_caddy(), check_authentik(), check_litellm(), @@ -468,7 +449,7 @@ pub async fn health_dashboard( check_livekit(), ); - let services = vec![pg, stdb, caddy, authentik, litellm, whisper, livekit]; + let services = vec![pg, caddy, authentik, litellm, whisper, livekit]; let metrics = collect_metrics(); let backups = check_backups(); let pg_stats = collect_pg_stats(&state.db).await; diff --git a/maskinrommet/src/intentions.rs b/maskinrommet/src/intentions.rs index 0a4df73..686c5fb 100644 --- a/maskinrommet/src/intentions.rs +++ b/maskinrommet/src/intentions.rs @@ -1,8 +1,8 @@ // Intensjoner — skrivestien i maskinrommet. // -// Frontend sender intensjoner (ikke data). Maskinrommet validerer, -// skriver til SpacetimeDB først (instant feedback via WebSocket), -// deretter persisterer til PostgreSQL asynkront. +// Frontend sender intensjoner (ikke data). Maskinrommet validerer og +// skriver til PostgreSQL. PG NOTIFY-triggere sender sanntidsoppdateringer +// til klienter via WebSocket. // // Tilgangskontroll: Muterende operasjoner (update, delete) krever at // brukeren er created_by på noden, eller har owner/admin-edge til den. @@ -220,10 +220,6 @@ fn internal_error(msg: &str) -> (StatusCode, Json) { ) } -fn stdb_error(op: &str, e: crate::stdb::StdbError) -> (StatusCode, Json) { - tracing::error!("STDB {op} feilet: {e}"); - internal_error(&format!("Kunne ikke skrive til SpacetimeDB: {e}")) -} // ============================================================================= // Tilgangskontroll og kontekstbasert identitet @@ -522,8 +518,6 @@ pub async fn create_node( // -- Valider metadata for AI-presets (oppgave 18.1) -- validate_ai_preset_metadata(&node_kind, &metadata).map_err(|e| bad_request(&e))?; - let metadata_str = metadata.to_string(); - // -- Kontekstbasert identitet (oppgave 8.2) -- // Hvis context_id er satt, sjekk om brukeren har et alias som er // deltaker i kommunikasjonsnoden. I så fall brukes aliaset som created_by. @@ -540,23 +534,29 @@ pub async fn create_node( // -- Generer UUIDv7 (tidssortert) -- let node_id = Uuid::now_v7(); - let node_id_str = node_id.to_string(); - let created_by_str = effective_identity.to_string(); - // -- Skriv til SpacetimeDB (instant) -- - state - .stdb - .create_node( - &node_id_str, - &node_kind, - &title, - &content, - &visibility, - &metadata_str, - &created_by_str, - ) - .await - .map_err(|e| stdb_error("create_node", e))?; + // Fang verdier for AI-trigger før de brukes i PG-insert + let is_content_node = node_kind == "content"; + let has_enough_text = content.len() >= 20 || title.len() >= 20; + + // -- Skriv til PostgreSQL (NOTIFY-trigger sender sanntidsoppdatering) -- + sqlx::query( + r#"INSERT INTO nodes (id, node_kind, title, content, visibility, metadata, created_by) + VALUES ($1, $2, NULLIF($3, ''), NULLIF($4, ''), $5::visibility, $6, $7)"#, + ) + .bind(node_id) + .bind(&node_kind) + .bind(&title) + .bind(&content) + .bind(&visibility) + .bind(&metadata) + .bind(effective_identity) + .execute(&state.db) + .await + .map_err(|e| { + tracing::error!("PG-feil ved opprettelse av node: {e}"); + internal_error("Databasefeil ved opprettelse av node") + })?; tracing::info!( node_id = %node_id, @@ -565,64 +565,35 @@ pub async fn create_node( auth_user = %user.node_id, context_id = ?req.context_id, alias_used = %(effective_identity != user.node_id), - "Node opprettet i STDB" - ); - - // Fang verdier for AI-trigger før de flyttes inn i enqueue_insert_node - let is_content_node = node_kind == "content"; - let has_enough_text = content.len() >= 20 || title.len() >= 20; - - // -- PG-skriving via jobbkø (retry + dead letter) -- - crate::pg_writes::enqueue_insert_node( - state.db.clone(), - node_id, - node_kind, - title, - content, - visibility, - metadata, - effective_identity, + "Node opprettet" ); // -- Kontekst-arv: automatisk belongs_to-edge -- let belongs_to_edge_id = if let Some(ctx_id) = req.context_id { let edge_id = Uuid::now_v7(); - let edge_id_str = edge_id.to_string(); - let ctx_id_str = ctx_id.to_string(); let bt_metadata = serde_json::json!({}); - let bt_metadata_str = bt_metadata.to_string(); - state - .stdb - .create_edge( - &edge_id_str, - &node_id_str, // source = ny node - &ctx_id_str, // target = kommunikasjonsnoden - "belongs_to", - &bt_metadata_str, - false, - &created_by_str, - ) - .await - .map_err(|e| stdb_error("create_edge (belongs_to)", e))?; + sqlx::query( + r#"INSERT INTO edges (id, source_id, target_id, edge_type, metadata, system, created_by) + VALUES ($1, $2, $3, 'belongs_to', $4, false, $5)"#, + ) + .bind(edge_id) + .bind(node_id) + .bind(ctx_id) + .bind(&bt_metadata) + .bind(effective_identity) + .execute(&state.db) + .await + .map_err(|e| { + tracing::error!("PG-feil ved opprettelse av belongs_to-edge: {e}"); + internal_error("Databasefeil ved opprettelse av edge") + })?; tracing::info!( edge_id = %edge_id, node_id = %node_id, context_id = %ctx_id, - "belongs_to-edge opprettet i STDB (kontekst-arv)" - ); - - // belongs_to er ikke tilgangsgivende — PG-insert via jobbkø - crate::pg_writes::enqueue_insert_edge( - state.db.clone(), - edge_id, - node_id, - ctx_id, - "belongs_to".to_string(), - bt_metadata, - false, - effective_identity, + "belongs_to-edge opprettet (kontekst-arv)" ); Some(edge_id) @@ -839,29 +810,102 @@ pub async fn create_edge( } } - let metadata_str = metadata.to_string(); - // -- Generer UUIDv7 -- let edge_id = Uuid::now_v7(); - let edge_id_str = edge_id.to_string(); - let source_id_str = req.source_id.to_string(); - let target_id_str = req.target_id.to_string(); - let created_by_str = user.node_id.to_string(); - // -- Skriv til SpacetimeDB (instant) -- - state - .stdb - .create_edge( - &edge_id_str, - &source_id_str, - &target_id_str, - &req.edge_type, - &metadata_str, - system, - &created_by_str, + // -- Skriv til PostgreSQL (NOTIFY-trigger sender sanntidsoppdatering) -- + let access_level = match req.edge_type.as_str() { + "owner" => Some("owner"), + "admin" => Some("admin"), + "member_of" => Some("member"), + "reader" => Some("reader"), + _ => None, + }; + + if let Some(level) = access_level { + // Tilgangsgivende edge: transaksjon med recompute_access + let mut tx = state.db.begin().await.map_err(|e| { + tracing::error!("PG begin: {e}"); + internal_error("Databasefeil") + })?; + + sqlx::query( + r#"INSERT INTO edges (id, source_id, target_id, edge_type, metadata, system, created_by) + VALUES ($1, $2, $3, $4, $5, $6, $7)"#, ) + .bind(edge_id) + .bind(req.source_id) + .bind(req.target_id) + .bind(&req.edge_type) + .bind(&metadata) + .bind(system) + .bind(user.node_id) + .execute(&mut *tx) .await - .map_err(|e| stdb_error("create_edge", e))?; + .map_err(|e| { + tracing::error!("PG insert edge: {e}"); + internal_error("Databasefeil ved opprettelse av edge") + })?; + + sqlx::query("SELECT recompute_access($1, $2, $3::access_level, $4)") + .bind(req.source_id) + .bind(req.target_id) + .bind(level) + .bind(edge_id) + .execute(&mut *tx) + .await + .map_err(|e| { + tracing::error!("recompute_access: {e}"); + internal_error("Databasefeil ved tilgangsberegning") + })?; + + tx.commit().await.map_err(|e| { + tracing::error!("PG commit: {e}"); + internal_error("Databasefeil") + })?; + } else { + sqlx::query( + r#"INSERT INTO edges (id, source_id, target_id, edge_type, metadata, system, created_by) + VALUES ($1, $2, $3, $4, $5, $6, $7)"#, + ) + .bind(edge_id) + .bind(req.source_id) + .bind(req.target_id) + .bind(&req.edge_type) + .bind(&metadata) + .bind(system) + .bind(user.node_id) + .execute(&state.db) + .await + .map_err(|e| { + tracing::error!("PG insert edge: {e}"); + internal_error("Databasefeil ved opprettelse av edge") + })?; + + // Trigger rendering ved belongs_to + if req.edge_type == "belongs_to" { + if let Ok(Some(config)) = crate::publishing::find_publishing_collection_by_id(&state.db, req.target_id).await { + let article_payload = serde_json::json!({ + "node_id": req.source_id.to_string(), + "collection_id": req.target_id.to_string(), + }); + let _ = crate::jobs::enqueue(&state.db, "render_article", article_payload, Some(req.target_id), 5).await; + + let index_mode = config.index_mode.as_deref().unwrap_or("dynamic"); + if index_mode == "static" { + let index_payload = serde_json::json!({ "collection_id": req.target_id.to_string() }); + let _ = crate::jobs::enqueue(&state.db, "render_index", index_payload, Some(req.target_id), 4).await; + } else { + crate::publishing::invalidate_index_cache(&state.index_cache, req.target_id).await; + } + } + } + + // A/B-test for presentasjonselement-edges + if matches!(req.edge_type.as_str(), "title" | "subtitle" | "summary" | "og_image") { + crate::publishing::maybe_start_ab_test(&state.db, req.target_id, &req.edge_type).await; + } + } tracing::info!( edge_id = %edge_id, @@ -869,20 +913,7 @@ pub async fn create_edge( target_id = %req.target_id, edge_type = %req.edge_type, created_by = %user.node_id, - "Edge opprettet i STDB" - ); - - // -- PG-skriving via jobbkø (retry + dead letter) -- - let edge_type = req.edge_type.clone(); - crate::pg_writes::enqueue_insert_edge( - state.db.clone(), - edge_id, - req.source_id, - req.target_id, - edge_type, - metadata, - system, - user.node_id, + "Edge opprettet" ); Ok(Json(CreateEdgeResponse { edge_id })) @@ -1064,39 +1095,28 @@ pub async fn update_node( false }; - let metadata_str = metadata.to_string(); - - let node_id_str = req.node_id.to_string(); - - // -- Skriv til SpacetimeDB (instant) -- - state - .stdb - .update_node( - &node_id_str, - &node_kind, - &title, - &content, - &visibility, - &metadata_str, - ) - .await - .map_err(|e| stdb_error("update_node", e))?; + // -- Skriv til PostgreSQL (NOTIFY-trigger sender sanntidsoppdatering) -- + sqlx::query( + r#"UPDATE nodes SET node_kind = $2, title = NULLIF($3, ''), content = NULLIF($4, ''), + visibility = $5::visibility, metadata = $6 WHERE id = $1"#, + ) + .bind(req.node_id) + .bind(&node_kind) + .bind(&title) + .bind(&content) + .bind(&visibility) + .bind(&metadata) + .execute(&state.db) + .await + .map_err(|e| { + tracing::error!("PG-feil ved oppdatering av node: {e}"); + internal_error("Databasefeil ved oppdatering av node") + })?; tracing::info!( node_id = %req.node_id, updated_by = %user.node_id, - "Node oppdatert i STDB" - ); - - // -- PG-skriving via jobbkø (retry + dead letter) -- - crate::pg_writes::enqueue_update_node( - state.db.clone(), - req.node_id, - node_kind, - title, - content, - visibility, - metadata, + "Node oppdatert" ); // -- Re-render alle artikler hvis custom_domain endret (canonical URL) -- @@ -1190,24 +1210,22 @@ pub async fn delete_node( return Err(bad_request(&format!("Node {} finnes ikke", req.node_id))); } - let node_id_str = req.node_id.to_string(); - - // -- Slett fra SpacetimeDB (instant) -- - state - .stdb - .delete_node(&node_id_str) + // -- Slett fra PostgreSQL (NOTIFY-trigger sender sanntidsoppdatering) -- + sqlx::query("DELETE FROM nodes WHERE id = $1") + .bind(req.node_id) + .execute(&state.db) .await - .map_err(|e| stdb_error("delete_node", e))?; + .map_err(|e| { + tracing::error!("PG-feil ved sletting av node: {e}"); + internal_error("Databasefeil ved sletting av node") + })?; tracing::info!( node_id = %req.node_id, deleted_by = %user.node_id, - "Node slettet fra STDB" + "Node slettet" ); - // -- PG-sletting via jobbkø (retry + dead letter) -- - crate::pg_writes::enqueue_delete_node(state.db.clone(), req.node_id); - Ok(Json(DeleteNodeResponse { deleted: true })) } @@ -1333,44 +1351,25 @@ pub async fn update_edge( } } - let metadata_str = metadata.to_string(); - let edge_id_str = req.edge_id.to_string(); - - // -- Skriv til SpacetimeDB (instant) -- - state - .stdb - .update_edge(&edge_id_str, &edge_type, &metadata_str) + // -- Skriv til PostgreSQL (NOTIFY-trigger sender sanntidsoppdatering) -- + sqlx::query("UPDATE edges SET edge_type = $1, metadata = $2 WHERE id = $3") + .bind(&edge_type) + .bind(&metadata) + .bind(req.edge_id) + .execute(&state.db) .await - .map_err(|e| stdb_error("update_edge", e))?; + .map_err(|e| { + tracing::error!("PG-feil ved oppdatering av edge: {e}"); + internal_error("Databasefeil ved oppdatering av edge") + })?; tracing::info!( edge_id = %req.edge_id, edge_type = %edge_type, updated_by = %user.node_id, - "Edge oppdatert i STDB" + "Edge oppdatert" ); - // -- Spawn async PG-skriving -- - let db = state.db.clone(); - let eid = req.edge_id; - let et = edge_type.clone(); - let md = metadata.clone(); - tokio::spawn(async move { - let result = sqlx::query( - "UPDATE edges SET edge_type = $1, metadata = $2 WHERE id = $3", - ) - .bind(&et) - .bind(&md) - .bind(eid) - .execute(&db) - .await; - - match result { - Ok(_) => tracing::info!(edge_id = %eid, "Edge oppdatert i PostgreSQL"), - Err(e) => tracing::error!(edge_id = %eid, error = %e, "Kunne ikke oppdatere edge i PostgreSQL"), - } - }); - Ok(Json(UpdateEdgeResponse { edge_id: req.edge_id })) } @@ -1431,30 +1430,35 @@ pub async fn delete_edge( })? .ok_or_else(|| bad_request(&format!("Edge {} finnes ikke", req.edge_id)))?; - let edge_id_str = req.edge_id.to_string(); - - // -- Slett fra SpacetimeDB (instant) -- - state - .stdb - .delete_edge(&edge_id_str) + // -- Slett fra PostgreSQL (NOTIFY-trigger sender sanntidsoppdatering) -- + sqlx::query("DELETE FROM edges WHERE id = $1") + .bind(req.edge_id) + .execute(&state.db) .await - .map_err(|e| stdb_error("delete_edge", e))?; + .map_err(|e| { + tracing::error!("PG-feil ved sletting av edge: {e}"); + internal_error("Databasefeil ved sletting av edge") + })?; tracing::info!( edge_id = %req.edge_id, edge_type = %edge_info.edge_type, deleted_by = %user.node_id, - "Edge slettet fra STDB" + "Edge slettet" ); - // -- Spawn async PG-sletting + publiserings-invalidering -- - crate::pg_writes::enqueue_delete_edge( - state.db.clone(), - req.edge_id, - edge_info.source_id, - edge_info.target_id, - edge_info.edge_type, - ); + // Invalider publiserings-cache ved fjerning av belongs_to + if edge_info.edge_type == "belongs_to" { + if let Ok(Some(config)) = crate::publishing::find_publishing_collection_by_id(&state.db, edge_info.target_id).await { + let index_mode = config.index_mode.as_deref().unwrap_or("dynamic"); + if index_mode == "static" { + let index_payload = serde_json::json!({ "collection_id": edge_info.target_id.to_string() }); + let _ = crate::jobs::enqueue(&state.db, "render_index", index_payload, Some(edge_info.target_id), 4).await; + } else { + crate::publishing::invalidate_index_cache(&state.index_cache, edge_info.target_id).await; + } + } + } Ok(Json(DeleteEdgeResponse { deleted: true })) } @@ -1592,7 +1596,7 @@ pub async fn set_slot( for (hero_edge_id, hero_meta) in &existing_heroes { let pinned = hero_meta.get("pinned").and_then(|v| v.as_bool()).unwrap_or(false); if !pinned { - displace_to_stream(&state.db, &state.stdb, *hero_edge_id, hero_meta).await?; + displace_to_stream(&state.db, *hero_edge_id, hero_meta).await?; displaced.push(*hero_edge_id); } } @@ -1636,7 +1640,7 @@ pub async fn set_slot( .collect(); for (feat_edge_id, feat_meta, _) in removable { - displace_to_stream(&state.db, &state.stdb, *feat_edge_id, feat_meta).await?; + displace_to_stream(&state.db, *feat_edge_id, feat_meta).await?; displaced.push(*feat_edge_id); } } @@ -1666,16 +1670,7 @@ pub async fn set_slot( } } - // Skriv til STDB (instant) - let edge_id_str = req.edge_id.to_string(); - let meta_str = new_meta.to_string(); - state - .stdb - .update_edge(&edge_id_str, "belongs_to", &meta_str) - .await - .map_err(|e| stdb_error("update_edge", e))?; - - // Skriv til PG + // Skriv til PG (NOTIFY-trigger sender sanntidsoppdatering) sqlx::query("UPDATE edges SET metadata = $1 WHERE id = $2") .bind(&new_meta) .bind(req.edge_id) @@ -1706,7 +1701,6 @@ pub async fn set_slot( /// Beholder annen metadata (publish_at, approved_by, etc.). async fn displace_to_stream( db: &PgPool, - stdb: &crate::stdb::StdbClient, edge_id: Uuid, current_meta: &serde_json::Value, ) -> Result<(), (StatusCode, Json)> { @@ -1718,14 +1712,6 @@ async fn displace_to_stream( obj.remove("pinned"); } - // STDB (instant) - let edge_id_str = edge_id.to_string(); - let meta_str = meta.to_string(); - stdb.update_edge(&edge_id_str, "belongs_to", &meta_str) - .await - .map_err(|e| stdb_error("update_edge (displace)", e))?; - - // PG sqlx::query("UPDATE edges SET metadata = $1 WHERE id = $2") .bind(&meta) .bind(edge_id) @@ -1803,82 +1789,77 @@ pub async fn create_communication( let title = req.title.unwrap_or_default(); let now = chrono::Utc::now(); let metadata = serde_json::json!({ "started_at": now.to_rfc3339() }); - let metadata_str = metadata.to_string(); // -- Opprett kommunikasjonsnoden -- let node_id = Uuid::now_v7(); - let node_id_str = node_id.to_string(); - let created_by_str = user.node_id.to_string(); - state - .stdb - .create_node( - &node_id_str, - "communication", - &title, - "", - &visibility, - &metadata_str, - &created_by_str, - ) - .await - .map_err(|e| stdb_error("create_node (communication)", e))?; + sqlx::query( + r#"INSERT INTO nodes (id, node_kind, title, content, visibility, metadata, created_by) + VALUES ($1, 'communication', NULLIF($2, ''), '', $3::visibility, $4, $5)"#, + ) + .bind(node_id) + .bind(&title) + .bind(&visibility) + .bind(&metadata) + .bind(user.node_id) + .execute(&state.db) + .await + .map_err(|e| { + tracing::error!("PG-feil ved opprettelse av kommunikasjonsnode: {e}"); + internal_error("Databasefeil ved opprettelse av kommunikasjonsnode") + })?; tracing::info!( node_id = %node_id, created_by = %user.node_id, participants = ?req.participants, - "Kommunikasjonsnode opprettet i STDB" - ); - - // PG-skriving via jobbkø - crate::pg_writes::enqueue_insert_node( - state.db.clone(), - node_id, - "communication".to_string(), - title, - String::new(), - visibility, - metadata, - user.node_id, + "Kommunikasjonsnode opprettet" ); // -- Opprett deltaker-edges -- let mut edge_ids = Vec::new(); - // Owner-edge for innlogget bruker + // Owner-edge for innlogget bruker (med recompute_access) let owner_edge_id = Uuid::now_v7(); edge_ids.push(owner_edge_id); - let owner_edge_id_str = owner_edge_id.to_string(); - let owner_metadata = serde_json::json!({}); - let owner_metadata_str = owner_metadata.to_string(); + { + let mut tx = state.db.begin().await.map_err(|e| { + tracing::error!("PG begin: {e}"); + internal_error("Databasefeil") + })?; - state - .stdb - .create_edge( - &owner_edge_id_str, - &created_by_str, - &node_id_str, - "owner", - &owner_metadata_str, - false, - &created_by_str, + sqlx::query( + r#"INSERT INTO edges (id, source_id, target_id, edge_type, metadata, system, created_by) + VALUES ($1, $2, $3, 'owner', '{}', false, $4)"#, ) + .bind(owner_edge_id) + .bind(user.node_id) + .bind(node_id) + .bind(user.node_id) + .execute(&mut *tx) .await - .map_err(|e| stdb_error("create_edge (owner)", e))?; + .map_err(|e| { + tracing::error!("PG insert owner edge: {e}"); + internal_error("Databasefeil ved opprettelse av owner-edge") + })?; - // PG-skriving for owner-edge via jobbkø (med access recompute) - crate::pg_writes::enqueue_insert_edge( - state.db.clone(), - owner_edge_id, - user.node_id, - node_id, - "owner".to_string(), - owner_metadata, - false, - user.node_id, - ); + sqlx::query("SELECT recompute_access($1, $2, 'owner'::access_level, $3)") + .bind(user.node_id) + .bind(node_id) + .bind(owner_edge_id) + .execute(&mut *tx) + .await + .map_err(|e| { + tracing::error!("recompute_access: {e}"); + internal_error("Databasefeil ved tilgangsberegning") + })?; + + tx.commit().await.map_err(|e| { + tracing::error!("PG commit: {e}"); + internal_error("Databasefeil") + })?; + } // member_of-edges for øvrige deltakere for participant_id in &req.participants { @@ -1890,35 +1871,41 @@ pub async fn create_communication( let edge_id = Uuid::now_v7(); edge_ids.push(edge_id); - let edge_id_str = edge_id.to_string(); - let participant_id_str = participant_id.to_string(); - let member_metadata = serde_json::json!({}); - let member_metadata_str = member_metadata.to_string(); + let mut tx = state.db.begin().await.map_err(|e| { + tracing::error!("PG begin: {e}"); + internal_error("Databasefeil") + })?; - state - .stdb - .create_edge( - &edge_id_str, - &participant_id_str, - &node_id_str, - "member_of", - &member_metadata_str, - false, - &created_by_str, - ) + sqlx::query( + r#"INSERT INTO edges (id, source_id, target_id, edge_type, metadata, system, created_by) + VALUES ($1, $2, $3, 'member_of', '{}', false, $4)"#, + ) + .bind(edge_id) + .bind(*participant_id) + .bind(node_id) + .bind(user.node_id) + .execute(&mut *tx) + .await + .map_err(|e| { + tracing::error!("PG insert member_of edge: {e}"); + internal_error("Databasefeil ved opprettelse av member_of-edge") + })?; + + sqlx::query("SELECT recompute_access($1, $2, 'member'::access_level, $3)") + .bind(*participant_id) + .bind(node_id) + .bind(edge_id) + .execute(&mut *tx) .await - .map_err(|e| stdb_error("create_edge (member_of)", e))?; + .map_err(|e| { + tracing::error!("recompute_access: {e}"); + internal_error("Databasefeil ved tilgangsberegning") + })?; - crate::pg_writes::enqueue_insert_edge( - state.db.clone(), - edge_id, - *participant_id, - node_id, - "member_of".to_string(), - member_metadata, - false, - user.node_id, - ); + tx.commit().await.map_err(|e| { + tracing::error!("PG commit: {e}"); + internal_error("Databasefeil") + })?; } // -- Opprett belongs_to-edge til kontekstnode (f.eks. artikkel) -- @@ -1939,35 +1926,20 @@ pub async fn create_communication( let ctx_edge_id = Uuid::now_v7(); edge_ids.push(ctx_edge_id); - let ctx_edge_id_str = ctx_edge_id.to_string(); - let context_id_str = context_id.to_string(); - let ctx_metadata = serde_json::json!({}); - let ctx_metadata_str = ctx_metadata.to_string(); - - state - .stdb - .create_edge( - &ctx_edge_id_str, - &node_id_str, - &context_id_str, - "belongs_to", - &ctx_metadata_str, - false, - &created_by_str, - ) - .await - .map_err(|e| stdb_error("create_edge (belongs_to context)", e))?; - - crate::pg_writes::enqueue_insert_edge( - state.db.clone(), - ctx_edge_id, - node_id, - context_id, - "belongs_to".to_string(), - ctx_metadata, - false, - user.node_id, - ); + sqlx::query( + r#"INSERT INTO edges (id, source_id, target_id, edge_type, metadata, system, created_by) + VALUES ($1, $2, $3, 'belongs_to', '{}', false, $4)"#, + ) + .bind(ctx_edge_id) + .bind(node_id) + .bind(context_id) + .bind(user.node_id) + .execute(&state.db) + .await + .map_err(|e| { + tracing::error!("PG insert belongs_to edge: {e}"); + internal_error("Databasefeil ved opprettelse av edge") + })?; tracing::info!( communication_id = %node_id, @@ -2107,8 +2079,6 @@ pub async fn upload_media( // -- Opprett media-node -- let media_node_id = Uuid::now_v7(); - let media_node_id_str = media_node_id.to_string(); - let created_by_str = user.node_id.to_string(); let mime = content_type.unwrap_or_else(|| "application/octet-stream".to_string()); let node_title = title.unwrap_or_else(|| file_name.unwrap_or_default()); @@ -2117,22 +2087,23 @@ pub async fn upload_media( "mime": mime, "size_bytes": cas_result.size, }); - let metadata_str = metadata.to_string(); - // Skriv til SpacetimeDB (instant) - state - .stdb - .create_node( - &media_node_id_str, - "media", - &node_title, - "", - &visibility, - &metadata_str, - &created_by_str, - ) - .await - .map_err(|e| stdb_error("create_node (media)", e))?; + // Skriv media-node til PG + sqlx::query( + r#"INSERT INTO nodes (id, node_kind, title, content, visibility, metadata, created_by) + VALUES ($1, 'media', NULLIF($2, ''), '', $3::visibility, $4, $5)"#, + ) + .bind(media_node_id) + .bind(&node_title) + .bind(&visibility) + .bind(&metadata) + .bind(user.node_id) + .execute(&state.db) + .await + .map_err(|e| { + tracing::error!("PG-feil ved opprettelse av media-node: {e}"); + internal_error("Databasefeil ved opprettelse av media-node") + })?; tracing::info!( media_node_id = %media_node_id, @@ -2141,60 +2112,33 @@ pub async fn upload_media( mime = %mime, already_existed = cas_result.already_existed, created_by = %user.node_id, - "Media-node opprettet i STDB" - ); - - // PG-skriving for media-noden via jobbkø - crate::pg_writes::enqueue_insert_node( - state.db.clone(), - media_node_id, - "media".to_string(), - node_title, - String::new(), - visibility, - metadata, - user.node_id, + "Media-node opprettet" ); // -- Opprett has_media-edge hvis source_id er oppgitt -- let has_media_edge_id = if let Some(src_id) = source_id { let edge_id = Uuid::now_v7(); - let edge_id_str = edge_id.to_string(); - let src_id_str = src_id.to_string(); - let edge_metadata = serde_json::json!({}); - let edge_metadata_str = edge_metadata.to_string(); - state - .stdb - .create_edge( - &edge_id_str, - &src_id_str, // source = innholdsnoden - &media_node_id_str, // target = media-noden - "has_media", - &edge_metadata_str, - false, - &created_by_str, - ) - .await - .map_err(|e| stdb_error("create_edge (has_media)", e))?; + sqlx::query( + r#"INSERT INTO edges (id, source_id, target_id, edge_type, metadata, system, created_by) + VALUES ($1, $2, $3, 'has_media', '{}', false, $4)"#, + ) + .bind(edge_id) + .bind(src_id) + .bind(media_node_id) + .bind(user.node_id) + .execute(&state.db) + .await + .map_err(|e| { + tracing::error!("PG insert has_media edge: {e}"); + internal_error("Databasefeil ved opprettelse av has_media-edge") + })?; tracing::info!( edge_id = %edge_id, source_id = %src_id, media_node_id = %media_node_id, - "has_media-edge opprettet i STDB" - ); - - // has_media er ikke tilgangsgivende — PG-insert via jobbkø - crate::pg_writes::enqueue_insert_edge( - state.db.clone(), - edge_id, - src_id, - media_node_id, - "has_media".to_string(), - edge_metadata, - false, - user.node_id, + "has_media-edge opprettet" ); Some(edge_id) @@ -2344,103 +2288,45 @@ pub async fn create_alias( let alias_node_id = Uuid::now_v7(); let alias_edge_id = Uuid::now_v7(); - let alias_node_id_str = alias_node_id.to_string(); - let alias_edge_id_str = alias_edge_id.to_string(); - let user_node_id_str = user.node_id.to_string(); - let metadata_str = metadata.to_string(); + // -- Skriv alias-node og alias-edge til PG -- + sqlx::query( + r#"INSERT INTO nodes (id, node_kind, title, visibility, metadata, created_by) + VALUES ($1, 'person', $2, 'hidden'::visibility, $3, $4)"#, + ) + .bind(alias_node_id) + .bind(&title) + .bind(&metadata) + .bind(user.node_id) + .execute(&state.db) + .await + .map_err(|e| { + tracing::error!("PG-feil ved opprettelse av alias-node: {e}"); + internal_error("Databasefeil ved opprettelse av alias") + })?; - // -- Skriv alias-node til STDB -- - state - .stdb - .create_node( - &alias_node_id_str, - "person", - &title, - "", - "hidden", - &metadata_str, - &user_node_id_str, - ) - .await - .map_err(|e| stdb_error("create_node (alias)", e))?; - - // -- Skriv alias-edge til STDB (system=true) -- - state - .stdb - .create_edge( - &alias_edge_id_str, - &user_node_id_str, - &alias_node_id_str, - "alias", - "{}", - true, - &user_node_id_str, - ) - .await - .map_err(|e| stdb_error("create_edge (alias)", e))?; + sqlx::query( + r#"INSERT INTO edges (id, source_id, target_id, edge_type, metadata, system, created_by) + VALUES ($1, $2, $3, 'alias', '{}', true, $4)"#, + ) + .bind(alias_edge_id) + .bind(user.node_id) + .bind(alias_node_id) + .bind(user.node_id) + .execute(&state.db) + .await + .map_err(|e| { + tracing::error!("PG-feil ved opprettelse av alias-edge: {e}"); + internal_error("Databasefeil ved opprettelse av alias-edge") + })?; tracing::info!( alias_node_id = %alias_node_id, alias_edge_id = %alias_edge_id, user_node_id = %user.node_id, title = %title, - "Alias opprettet i STDB" + "Alias opprettet" ); - // -- Spawn async PG-skriving: node + edge -- - let pg_db = state.db.clone(); - let pg_title = title.clone(); - let pg_metadata = metadata.clone(); - let pg_user_node_id = user.node_id; - tokio::spawn(async move { - // 1. Skriv alias-noden - let node_result = sqlx::query( - r#" - INSERT INTO nodes (id, node_kind, title, visibility, metadata, created_by) - VALUES ($1, 'person', $2, 'hidden'::visibility, $3, $4) - "#, - ) - .bind(alias_node_id) - .bind(&pg_title) - .bind(&pg_metadata) - .bind(pg_user_node_id) - .execute(&pg_db) - .await; - - match node_result { - Ok(_) => { - tracing::info!(alias_node_id = %alias_node_id, "Alias-node persistert til PostgreSQL"); - } - Err(e) => { - tracing::error!(alias_node_id = %alias_node_id, error = %e, "Kunne ikke persistere alias-node til PostgreSQL"); - return; - } - } - - // 2. Skriv alias-edge (system=true) - let edge_result = sqlx::query( - r#" - INSERT INTO edges (id, source_id, target_id, edge_type, metadata, system, created_by) - VALUES ($1, $2, $3, 'alias', '{}', true, $4) - "#, - ) - .bind(alias_edge_id) - .bind(pg_user_node_id) - .bind(alias_node_id) - .bind(pg_user_node_id) - .execute(&pg_db) - .await; - - match edge_result { - Ok(_) => { - tracing::info!(alias_edge_id = %alias_edge_id, "Alias-edge persistert til PostgreSQL"); - } - Err(e) => { - tracing::error!(alias_edge_id = %alias_edge_id, error = %e, "Kunne ikke persistere alias-edge til PostgreSQL"); - } - } - }); - Ok(Json(CreateAliasResponse { alias_node_id, alias_edge_id, @@ -3188,24 +3074,6 @@ pub async fn create_ai_preset( } let node_id = Uuid::now_v7(); - let node_id_str = node_id.to_string(); - let created_by_str = user.node_id.to_string(); - let metadata_str = metadata.to_string(); - - // Skriv til SpacetimeDB - state - .stdb - .create_node( - &node_id_str, - "ai_preset", - req.title.trim(), - "", - "discoverable", - &metadata_str, - &created_by_str, - ) - .await - .map_err(|e| stdb_error("create_node (ai_preset)", e))?; // Skriv til PostgreSQL sqlx::query( @@ -3228,23 +3096,6 @@ pub async fn create_ai_preset( // Opprett shared_with-edge hvis samlings-ID er satt let shared_edge_id = if let Some(col_id) = req.share_with_collection_id { let edge_id = Uuid::now_v7(); - let edge_id_str = edge_id.to_string(); - let col_id_str = col_id.to_string(); - let empty_meta = serde_json::json!({}).to_string(); - - state - .stdb - .create_edge( - &edge_id_str, - &node_id_str, - &col_id_str, - "shared_with", - &empty_meta, - false, - &created_by_str, - ) - .await - .map_err(|e| stdb_error("create_edge (shared_with)", e))?; sqlx::query( r#" @@ -3420,7 +3271,6 @@ pub async fn join_communication( Json(req): Json, ) -> Result, (StatusCode, Json)> { let comm_id = req.communication_id; - let comm_id_str = comm_id.to_string(); // Sjekk at kommunikasjonsnoden eksisterer og er riktig type let node_row = sqlx::query_as::<_, (String, String)>( @@ -3525,27 +3375,8 @@ pub async fn join_communication( let room_name = token_result.room_name.clone(); - // Oppdater SpacetimeDB: opprett rom (idempotent) + legg til deltaker - if let Err(e) = state.stdb.create_live_room(&room_name, &comm_id_str).await { - tracing::warn!("STDB create_live_room feilet (fortsetter): {e}"); - } - - if let Err(e) = state - .stdb - .add_room_participant( - &room_name, - &user.node_id.to_string(), - &display_name, - role_str, - ) - .await - { - tracing::warn!("STDB add_room_participant feilet (fortsetter): {e}"); - } - // Oppdater kommunikasjonsnodens metadata med live_status (asynkront) let db = state.db.clone(); - let stdb = state.stdb.clone(); let comm_id_clone = comm_id; let room_name_clone = room_name.clone(); tokio::spawn(async move { @@ -3573,20 +3404,6 @@ pub async fn join_communication( { tracing::error!("Kunne ikke oppdatere node metadata: {e}"); } - - // Synk metadata til STDB - let node = sqlx::query_as::<_, (String, String, String, String, String)>( - "SELECT node_kind, title, content, visibility, metadata::text FROM nodes WHERE id = $1", - ) - .bind(comm_id_clone) - .fetch_optional(&db) - .await; - - if let Ok(Some((kind, t, c, v, m))) = node { - let _ = stdb - .update_node(&comm_id_clone.to_string(), &kind, &t, &c, &v, &m) - .await; - } } }); @@ -3671,25 +3488,13 @@ pub struct LeaveCommunicationResponse { /// POST /intentions/leave_communication /// /// Fjerner brukerens sanntidslyd-tilkobling fra en kommunikasjonsnode. -/// Oppdaterer STDB med ny deltaker-status. Stenger rommet hvis ingen gjenstår. pub async fn leave_communication( - State(state): State, + State(_state): State, user: AuthUser, Json(req): Json, ) -> Result, (StatusCode, Json)> { let comm_id = req.communication_id; let room_name = format!("communication_{comm_id}"); - let user_id_str = user.node_id.to_string(); - - // Fjern deltaker fra STDB - if let Err(e) = state.stdb.remove_room_participant(&room_name, &user_id_str).await { - tracing::warn!("STDB remove_room_participant feilet: {e}"); - } - - // Sjekk om rommet er tomt — i så fall steng det - // (Vi sjekker PG edges for å se hvem som er registrert som deltaker i STDB) - // Forenklet: la rommet ligge, det ryddes ved neste oppstart eller manuelt - // Frontend kan kalle close_communication for å eksplisitt stenge. tracing::info!( communication_id = %comm_id, @@ -3735,16 +3540,8 @@ pub async fn close_communication( return Err(forbidden("Bare eier kan stenge kommunikasjonsrom")); } - let room_name = format!("communication_{comm_id}"); - - // Steng rommet i STDB - if let Err(e) = state.stdb.close_live_room(&room_name).await { - tracing::warn!("STDB close_live_room feilet: {e}"); - } - - // Oppdater metadata i PG + // Oppdater metadata i PG (NOTIFY-trigger sender sanntidsoppdatering) let db = state.db.clone(); - let stdb = state.stdb.clone(); tokio::spawn(async move { let result = sqlx::query_scalar::<_, serde_json::Value>( "SELECT metadata FROM nodes WHERE id = $1", @@ -3767,20 +3564,6 @@ pub async fn close_communication( .bind(&metadata) .execute(&db) .await; - - // Synk til STDB - let node = sqlx::query_as::<_, (String, String, String, String, String)>( - "SELECT node_kind, title, content, visibility, metadata::text FROM nodes WHERE id = $1", - ) - .bind(comm_id) - .fetch_optional(&db) - .await; - - if let Ok(Some((kind, t, c, v, m))) = node { - let _ = stdb - .update_node(&comm_id.to_string(), &kind, &t, &c, &v, &m) - .await; - } } }); @@ -3993,42 +3776,29 @@ pub async fn create_announcement( }); let node_id = Uuid::now_v7(); - let node_id_str = node_id.to_string(); - let created_by_str = user.node_id.to_string(); - let metadata_str = metadata.to_string(); - // -- Skriv til SpacetimeDB (instant broadcast til alle klienter) -- - state - .stdb - .create_node( - &node_id_str, - "system_announcement", - &req.title, - &req.content, - "open", - &metadata_str, - &created_by_str, - ) - .await - .map_err(|e| stdb_error("create_node (announcement)", e))?; + // -- Skriv til PostgreSQL (NOTIFY-trigger sender sanntidsoppdatering) -- + sqlx::query( + r#"INSERT INTO nodes (id, node_kind, title, content, visibility, metadata, created_by) + VALUES ($1, 'system_announcement', $2, $3, 'open'::visibility, $4, $5)"#, + ) + .bind(node_id) + .bind(&req.title) + .bind(&req.content) + .bind(&metadata) + .bind(user.node_id) + .execute(&state.db) + .await + .map_err(|e| { + tracing::error!("PG-feil ved opprettelse av systemvarsel: {e}"); + internal_error("Databasefeil ved opprettelse av systemvarsel") + })?; tracing::info!( node_id = %node_id, announcement_type = %req.announcement_type, created_by = %user.node_id, - "Systemvarsel opprettet i STDB" - ); - - // -- Persister til PostgreSQL via jobbkø -- - crate::pg_writes::enqueue_insert_node( - state.db.clone(), - node_id, - "system_announcement".to_string(), - req.title, - req.content, - "open".to_string(), - metadata, - user.node_id, + "Systemvarsel opprettet" ); Ok(Json(CreateAnnouncementResponse { node_id })) @@ -4085,28 +3855,17 @@ pub async fn expire_announcement( return Err(forbidden("Kun eier kan fjerne systemvarsler")); } - // -- Slett fra STDB (umiddelbar fjerning fra alle klienter) -- - let node_id_str = req.node_id.to_string(); - state - .stdb - .delete_node(&node_id_str) + // -- Slett fra PG (NOTIFY-trigger sender sanntidsoppdatering) -- + sqlx::query("DELETE FROM nodes WHERE id = $1") + .bind(req.node_id) + .execute(&state.db) .await - .map_err(|e| stdb_error("delete_node (announcement)", e))?; + .map_err(|e| { + tracing::error!("PG-feil ved sletting av varsel: {e}"); + internal_error("Databasefeil ved sletting av varsel") + })?; - // -- Slett fra PG -- - let db = state.db.clone(); - let nid = req.node_id; - tokio::spawn(async move { - if let Err(e) = sqlx::query("DELETE FROM nodes WHERE id = $1") - .bind(nid) - .execute(&db) - .await - { - tracing::error!(node_id = %nid, error = %e, "Kunne ikke slette varsel fra PG"); - } else { - tracing::info!(node_id = %nid, "Systemvarsel slettet fra PG"); - } - }); + tracing::info!(node_id = %req.node_id, "Systemvarsel slettet"); Ok(Json(ExpireAnnouncementResponse { expired: true })) } @@ -4151,7 +3910,7 @@ pub async fn initiate_maintenance( let announcement_id = state .maintenance - .initiate(&state.db, &state.stdb, scheduled_at, user_id) + .initiate(&state.db, scheduled_at, user_id) .await .map_err(|e| bad_request(&e))?; @@ -4176,7 +3935,7 @@ pub async fn cancel_maintenance( ) -> Result, (StatusCode, Json)> { state .maintenance - .cancel(&state.db, &state.stdb) + .cancel(&state.db) .await .map_err(|e| bad_request(&e))?; diff --git a/maskinrommet/src/jobs.rs b/maskinrommet/src/jobs.rs index 8b489b6..16c3d80 100644 --- a/maskinrommet/src/jobs.rs +++ b/maskinrommet/src/jobs.rs @@ -24,7 +24,6 @@ use crate::maintenance::MaintenanceState; use crate::pg_writes; use crate::publishing::IndexCache; use crate::resources::{self, PriorityRules}; -use crate::stdb::StdbClient; use crate::summarize; use crate::transcribe; use crate::tts; @@ -170,7 +169,6 @@ async fn fail_job(db: &PgPool, job: &JobRow, error_msg: &str) -> Result<(), sqlx async fn dispatch( job: &JobRow, db: &PgPool, - stdb: &StdbClient, cas: &CasStore, index_cache: &IndexCache, whisper_url: &str, @@ -180,24 +178,22 @@ async fn dispatch( transcribe::handle_whisper_job(job, cas, whisper_url).await } "agent_respond" => { - agent::handle_agent_respond(job, db, stdb).await + agent::handle_agent_respond(job, db).await } "suggest_edges" => { - ai_edges::handle_suggest_edges(job, db, stdb).await + ai_edges::handle_suggest_edges(job, db).await } "summarize_communication" => { - summarize::handle_summarize_communication(job, db, stdb).await + summarize::handle_summarize_communication(job, db).await } "tts_generate" => { - tts::handle_tts_job(job, db, stdb).await + tts::handle_tts_job(job, db).await } "audio_process" => { - audio::handle_audio_process_job(job, db, stdb, cas).await + audio::handle_audio_process_job(job, db, cas).await } "ai_process" => { - // Fortsatt inline — ingen CLI-verktøy ennå. - // TODO: Lag synops-ai-process og migrer hit. - ai_process::handle_ai_process(job, db, stdb).await + ai_process::handle_ai_process(job, db).await } "render_article" => { handle_render_article(job, cas).await @@ -210,7 +206,7 @@ async fn dispatch( pg_writes::handle_insert_node(job, db).await } "pg_insert_edge" => { - pg_writes::handle_insert_edge(job, db, stdb, index_cache).await + pg_writes::handle_insert_edge(job, db, index_cache).await } "pg_update_node" => { pg_writes::handle_update_node(job, db).await @@ -469,7 +465,6 @@ pub async fn cancel_job(db: &PgPool, job_id: Uuid) -> Result /// slutter workeren å dequeue nye jobber (kjørende jobber fullføres). pub fn start_worker( db: PgPool, - stdb: StdbClient, cas: CasStore, index_cache: IndexCache, maintenance: MaintenanceState, @@ -611,7 +606,6 @@ pub fn start_worker( // Kjør jobben i en egen tokio-task (frigjør poll-loopen) let db2 = db.clone(); - let stdb2 = stdb.clone(); let cas2 = cas.clone(); let index_cache2 = index_cache.clone(); let whisper_url2 = whisper_url.clone(); @@ -627,7 +621,7 @@ pub fn start_worker( let result = tokio::time::timeout( std::time::Duration::from_secs(timeout_secs), - dispatch(&job, &db2, &stdb2, &cas2, &index_cache2, &whisper_url2), + dispatch(&job, &db2, &cas2, &index_cache2, &whisper_url2), ) .await; diff --git a/maskinrommet/src/main.rs b/maskinrommet/src/main.rs index 007a3ae..cc82a07 100644 --- a/maskinrommet/src/main.rs +++ b/maskinrommet/src/main.rs @@ -22,7 +22,8 @@ pub mod resource_usage; pub mod resources; mod rss; mod serving; -mod stdb; +#[allow(dead_code)] +mod stdb; // Beholdt som død kode — fjernes i oppgave 22.4 pub mod summarize; pub mod ws; pub mod mixer; @@ -31,8 +32,6 @@ pub mod transcribe; pub mod tts; pub mod usage_overview; pub mod user_usage; -mod stdb_monitor; -mod warmup; mod workspace; use axum::{extract::State, http::StatusCode, middleware, routing::{get, post}, Json, Router}; @@ -43,14 +42,12 @@ use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilte use auth::{AuthUser, JwksKeys}; use cas::CasStore; -use stdb::StdbClient; use ws::WsBroadcast; #[derive(Clone)] pub struct AppState { pub db: PgPool, pub jwks: JwksKeys, - pub stdb: StdbClient, pub cas: CasStore, pub index_cache: publishing::IndexCache, pub dynamic_page_cache: publishing::DynamicPageCache, @@ -65,7 +62,6 @@ struct HealthResponse { status: &'static str, version: &'static str, db: &'static str, - stdb: &'static str, } #[derive(Serialize)] @@ -115,46 +111,6 @@ async fn main() { .await .expect("Kunne ikke hente JWKS fra Authentik"); - // SpacetimeDB-klient - let stdb_url = std::env::var("SPACETIMEDB_URL") - .unwrap_or_else(|_| "http://spacetimedb:3000".to_string()); - let stdb_database = std::env::var("SPACETIMEDB_DATABASE") - .unwrap_or_else(|_| "synops".to_string()); - - // Hent token fra miljøvariabel, eller opprett ny identitet - let stdb_token = match std::env::var("SPACETIMEDB_TOKEN") { - Ok(token) if !token.is_empty() => { - tracing::info!("Bruker konfigurert STDB-token"); - token - } - _ => { - tracing::info!("Ingen STDB-token konfigurert, oppretter ny identitet"); - let (identity, token) = StdbClient::create_identity(&stdb_url) - .await - .expect("Kunne ikke opprette STDB-identitet"); - tracing::info!("Opprettet STDB-identitet: {identity}"); - token - } - }; - - let stdb = StdbClient::new(&stdb_url, &stdb_database, &stdb_token); - - // Warmup: last hele grafen fra PG til SpacetimeDB - match warmup::run(&db, &stdb).await { - Ok(stats) => { - tracing::info!( - "Warmup fullført: {} noder, {} edges, {} access", - stats.nodes, - stats.edges, - stats.access - ); - } - Err(e) => { - tracing::error!("Warmup feilet: {e}"); - // Fortsett likevel — STDB kan være midlertidig utilgjengelig - } - } - // CAS — content-addressable store for binærfiler let cas_root = std::env::var("CAS_ROOT") .unwrap_or_else(|_| "/srv/synops/media/cas".to_string()); @@ -174,7 +130,7 @@ async fn main() { let index_cache = publishing::new_index_cache(); // Start jobbkø-worker i bakgrunnen (med ressursstyring, oppgave 15.5) - jobs::start_worker(db.clone(), stdb.clone(), cas.clone(), index_cache.clone(), maintenance.clone(), priority_rules.clone()); + jobs::start_worker(db.clone(), cas.clone(), index_cache.clone(), maintenance.clone(), priority_rules.clone()); // Start periodisk CAS-pruning i bakgrunnen pruning::start_pruning_loop(db.clone(), cas.clone()); @@ -191,9 +147,6 @@ async fn main() { // Start nattlig bandwidth-parsing (oppgave 15.7) bandwidth::start_bandwidth_parser(db.clone()); - // Start STDB-overvåker: oppdager krasj og gjenoppbygger fra PG (oppgave 12.2) - stdb_monitor::start_stdb_monitor(db.clone(), stdb.clone()); - // Start periodisk CAS tmp-opprydding (oppgave 17.6) cas::start_tmp_cleanup_loop(cas.clone()); let dynamic_page_cache = publishing::new_dynamic_page_cache(); @@ -203,7 +156,7 @@ async fn main() { let ws_broadcast = WsBroadcast::new(); ws::start_pg_listener(db.clone(), ws_broadcast.clone()); - let state = AppState { db, jwks, stdb, cas, index_cache, dynamic_page_cache, maintenance, priority_rules, metrics, ws_broadcast }; + let state = AppState { db, jwks, cas, index_cache, dynamic_page_cache, maintenance, priority_rules, metrics, ws_broadcast }; // Ruter: /health er offentlig, /me krever gyldig JWT let app = Router::new() @@ -321,25 +274,17 @@ async fn main() { axum::serve(listener, app).await.unwrap(); } -/// Offentlig helsesjekk — verifiserer PG- og STDB-tilkobling. +/// Offentlig helsesjekk — verifiserer PG-tilkobling. async fn health(State(state): State) -> Result, StatusCode> { sqlx::query("SELECT 1") .execute(&state.db) .await .map_err(|_| StatusCode::SERVICE_UNAVAILABLE)?; - // STDB helsesjekk: prøv å slette en ikke-eksisterende node. - // Kallet når STDB og returnerer ok (noop), men feiler ved nettverksfeil. - let stdb_status = match state.stdb.delete_node("__healthcheck_nonexistent__").await { - Ok(()) => "connected", - Err(_) => "unavailable", - }; - Ok(Json(HealthResponse { status: "ok", version: env!("CARGO_PKG_VERSION"), db: "connected", - stdb: stdb_status, })) } diff --git a/maskinrommet/src/maintenance.rs b/maskinrommet/src/maintenance.rs index 40baa9f..005cae4 100644 --- a/maskinrommet/src/maintenance.rs +++ b/maskinrommet/src/maintenance.rs @@ -2,7 +2,7 @@ // // Flyt: // 1. Admin kaller initiate_maintenance med tidspunkt -// 2. System oppretter systemvarsel → frontend viser nedtelling +// 2. System oppretter systemvarsel → frontend viser nedtelling via PG NOTIFY // 3. Bakgrunnsoppgave venter til vedlikeholdstidspunkt // 4. Setter maintenance_active → blokkerer nye LiveKit-rom + jobbkø stopper dequeue // 5. Venter på at kjørende jobber fullføres (med timeout) @@ -18,8 +18,6 @@ use std::sync::Arc; use tokio::sync::Mutex; use uuid::Uuid; -use crate::stdb::StdbClient; - /// Delt vedlikeholdstilstand — klones inn i AppState. #[derive(Clone)] pub struct MaintenanceState { @@ -102,26 +100,18 @@ impl MaintenanceState { } /// Initier vedlikehold: sett tidspunkt, opprett varsel, start nedtelling. - /// - /// Oppretter en system_announcement med `critical`-type og `scheduled_at`. - /// Starter en bakgrunnsoppgave som venter til tidspunktet, aktiverer - /// maintenance mode, venter på jobber, og avslutter prosessen. pub async fn initiate( &self, db: &PgPool, - stdb: &StdbClient, scheduled_at: DateTime, initiated_by: Uuid, ) -> Result { - // Sjekk at vi ikke allerede er i vedlikeholdsmodus if self.is_initiated() { return Err("Vedlikehold er allerede initiert".to_string()); } // Opprett systemvarsel let node_id = Uuid::now_v7(); - let node_id_str = node_id.to_string(); - let created_by_str = initiated_by.to_string(); let metadata = serde_json::json!({ "announcement_type": "critical", @@ -129,22 +119,8 @@ impl MaintenanceState { "blocks_new_sessions": true, "maintenance_shutdown": true, }); - let metadata_str = metadata.to_string(); - // STDB — umiddelbar broadcast til alle klienter - stdb.create_node( - &node_id_str, - "system_announcement", - "Planlagt vedlikehold", - &format!("Systemet stenges for vedlikehold. Lagre arbeidet ditt."), - "open", - &metadata_str, - &created_by_str, - ) - .await - .map_err(|e| format!("STDB-feil: {e}"))?; - - // PG — persistent lagring + // PG — persistent lagring (NOTIFY-trigger sender sanntidsoppdatering) sqlx::query( r#"INSERT INTO nodes (id, node_kind, title, content, visibility, metadata, created_by) VALUES ($1, 'system_announcement', 'Planlagt vedlikehold', @@ -168,9 +144,8 @@ impl MaintenanceState { // Start bakgrunnsoppgave for shutdown-koordinering let state = self.clone(); let db2 = db.clone(); - let stdb2 = stdb.clone(); let handle = tokio::spawn(async move { - shutdown_coordinator(state, db2, stdb2, scheduled_at, node_id).await; + shutdown_coordinator(state, db2, scheduled_at, node_id).await; }); // Lagre tilstand @@ -188,7 +163,6 @@ impl MaintenanceState { pub async fn cancel( &self, db: &PgPool, - stdb: &StdbClient, ) -> Result<(), String> { if !self.is_initiated() { return Err("Ingen vedlikehold er initiert".to_string()); @@ -201,12 +175,8 @@ impl MaintenanceState { handle.abort(); } - // Slett varselet fra STDB og PG + // Slett varselet fra PG if let Some(nid) = inner.announcement_node_id.take() { - let nid_str = nid.to_string(); - if let Err(e) = stdb.delete_node(&nid_str).await { - tracing::warn!("Kunne ikke slette varsel fra STDB: {e}"); - } if let Err(e) = sqlx::query("DELETE FROM nodes WHERE id = $1") .bind(nid) .execute(db) @@ -246,15 +216,9 @@ async fn fetch_running_jobs(db: &PgPool) -> Result, sqlx::Error> } /// Bakgrunnsoppgave som koordinerer nedstengningen. -/// -/// 1. Venter til scheduled_at -/// 2. Setter maintenance_active (blokkerer nye LiveKit-rom + jobbkø) -/// 3. Venter på at kjørende jobber fullføres (maks 5 min timeout) -/// 4. Avslutter prosessen (systemd restarter) async fn shutdown_coordinator( state: MaintenanceState, db: PgPool, - stdb: StdbClient, scheduled_at: DateTime, announcement_id: Uuid, ) { @@ -280,15 +244,15 @@ async fn shutdown_coordinator( "maintenance_shutdown": true, "maintenance_active": true, }); - let nid_str = announcement_id.to_string(); - let _ = stdb.update_node( - &nid_str, - "system_announcement", - "Vedlikehold pågår", - "Systemet stenger ned. Vent til vedlikeholdet er ferdig.", - "open", - &active_meta.to_string(), - ).await; + let _ = sqlx::query( + "UPDATE nodes SET title = $2, content = $3, metadata = $4 WHERE id = $1", + ) + .bind(announcement_id) + .bind("Vedlikehold pågår") + .bind("Systemet stenger ned. Vent til vedlikeholdet er ferdig.") + .bind(&active_meta) + .execute(&db) + .await; // Vent på at kjørende jobber fullføres (maks 5 minutter) let timeout = std::time::Duration::from_secs(300); @@ -320,8 +284,7 @@ async fn shutdown_coordinator( tokio::time::sleep(std::time::Duration::from_secs(5)).await; } - // Slett varselet (klienter vil se at tilkoblingen forsvinner) - let _ = stdb.delete_node(&nid_str).await; + // Slett varselet if let Err(e) = sqlx::query("DELETE FROM nodes WHERE id = $1") .bind(announcement_id) .execute(&db) diff --git a/maskinrommet/src/pg_writes.rs b/maskinrommet/src/pg_writes.rs index 1ca7010..584b82f 100644 --- a/maskinrommet/src/pg_writes.rs +++ b/maskinrommet/src/pg_writes.rs @@ -18,7 +18,6 @@ use uuid::Uuid; use crate::jobs::JobRow; use crate::publishing::IndexCache; -use crate::stdb::StdbClient; /// Prioritet for PG-skriveoperasjoner. Høy — data-konsistens er kritisk. const PG_WRITE_PRIORITY: i16 = 8; @@ -208,7 +207,6 @@ fn edge_type_to_access_level(edge_type: &str) -> Option<&'static str> { pub async fn handle_insert_edge( job: &JobRow, db: &PgPool, - stdb: &StdbClient, index_cache: &IndexCache, ) -> Result { let p = &job.payload; @@ -271,8 +269,7 @@ pub async fn handle_insert_edge( ); } - // Synk node_access til STDB (best-effort, feil logger men feiler ikke jobben) - sync_node_access_to_stdb(db, stdb, source_id).await; + // PG NOTIFY-triggere (access_changed) sender sanntidsoppdateringer. } else { // Vanlig edge sqlx::query( @@ -392,57 +389,6 @@ pub async fn handle_delete_edge( // Hjelpefunksjoner (flyttet fra intentions.rs for gjenbruk) // ============================================================================= -/// Synkroniserer node_access-rader for et subject fra PG til STDB. -async fn sync_node_access_to_stdb(db: &PgPool, stdb: &StdbClient, subject_id: Uuid) { - #[derive(sqlx::FromRow)] - struct NodeAccessRow { - subject_id: Uuid, - object_id: Uuid, - access: String, - via_edge: String, - } - - let rows = sqlx::query_as::<_, NodeAccessRow>( - "SELECT subject_id, object_id, access::text as access, \ - COALESCE(via_edge::text, '') as via_edge \ - FROM node_access WHERE subject_id = $1", - ) - .bind(subject_id) - .fetch_all(db) - .await; - - match rows { - Ok(rows) => { - for row in &rows { - if let Err(e) = stdb - .upsert_node_access( - &row.subject_id.to_string(), - &row.object_id.to_string(), - &row.access, - &row.via_edge, - ) - .await - { - tracing::error!( - subject_id = %row.subject_id, - object_id = %row.object_id, - error = %e, - "Kunne ikke synke node_access til STDB (pg_writes)" - ); - } - } - tracing::info!( - subject_id = %subject_id, - count = rows.len(), - "node_access synket til STDB (via jobbkø)" - ); - } - Err(e) => { - tracing::error!(subject_id = %subject_id, error = %e, "Kunne ikke hente node_access fra PG"); - } - } -} - /// Trigger artikkelrendering hvis target er en publiseringssamling. async fn trigger_render_if_publishing( db: &PgPool, diff --git a/maskinrommet/src/stdb_monitor.rs b/maskinrommet/src/stdb_monitor.rs deleted file mode 100644 index 7ebdcf6..0000000 --- a/maskinrommet/src/stdb_monitor.rs +++ /dev/null @@ -1,145 +0,0 @@ -// STDB-overvåker: oppdager SpacetimeDB-krasj og gjenoppbygger fra PG. -// -// Kjører i bakgrunnen med jevnlig helsesjekk. Hvis STDB var oppe og -// deretter feiler, kjøres warmup automatisk for å gjenoppbygge tilstand. -// -// Sekvens ved krasj: -// 1. Oppdage at STDB er nede (helsesjekk feiler) -// 2. Vente til STDB er tilbake (container restarter) -// 3. Kjøre warmup (PG → STDB) -// 4. Logge hendelsen -// -// Ref: docs/infra/backup.md, docs/infra/synkronisering.md - -use sqlx::PgPool; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Arc; - -use crate::stdb::StdbClient; - -/// Start STDB-overvåker i bakgrunnen. -/// Sjekker STDB-helse hvert 30. sekund og kjører warmup ved krasj. -pub fn start_stdb_monitor(db: PgPool, stdb: StdbClient) { - tokio::spawn(async move { - monitor_loop(db, stdb).await; - }); -} - -/// Intern tilstand for overvåkeren. -struct MonitorState { - /// Var STDB oppe ved forrige sjekk? - was_up: bool, - /// Pågår det en recovery akkurat nå? - recovering: Arc, -} - -async fn monitor_loop(db: PgPool, stdb: StdbClient) { - let mut state = MonitorState { - was_up: true, // Antar oppe etter warmup ved oppstart - recovering: Arc::new(AtomicBool::new(false)), - }; - - // Vent litt etter oppstart slik at warmup fullføres først - tokio::time::sleep(std::time::Duration::from_secs(60)).await; - tracing::info!("STDB-overvåker startet (sjekker hvert 30s)"); - - let mut interval = tokio::time::interval(std::time::Duration::from_secs(30)); - - loop { - interval.tick().await; - - // Ikke sjekk hvis recovery allerede pågår - if state.recovering.load(Ordering::Relaxed) { - continue; - } - - let is_up = check_stdb_health(&stdb).await; - - match (state.was_up, is_up) { - (true, false) => { - // STDB gikk ned! Logg og start recovery-venting. - tracing::error!("STDB-overvåker: SpacetimeDB er NEDE — starter recovery-prosess"); - state.recovering.store(true, Ordering::Relaxed); - - let db_clone = db.clone(); - let stdb_clone = stdb.clone(); - let recovering = state.recovering.clone(); - - tokio::spawn(async move { - recover_stdb(db_clone, stdb_clone, recovering).await; - }); - } - (false, true) => { - // STDB kom tilbake uten vår hjelp (recovery-tasken fikset det) - tracing::info!("STDB-overvåker: SpacetimeDB er tilbake"); - state.was_up = true; - } - (false, false) => { - // Fortsatt nede — recovery-tasken håndterer dette - } - (true, true) => { - // Alt OK - } - } - - if is_up { - state.was_up = true; - } else if !state.recovering.load(Ordering::Relaxed) { - state.was_up = false; - } - } -} - -/// Sjekk om STDB svarer på en enkel reducer-kall. -async fn check_stdb_health(stdb: &StdbClient) -> bool { - stdb.delete_node("__healthcheck_nonexistent__").await.is_ok() -} - -/// Vent til STDB er tilbake, deretter kjør warmup. -async fn recover_stdb(db: PgPool, stdb: StdbClient, recovering: Arc) { - let max_wait = std::time::Duration::from_secs(600); // Maks 10 min - let check_interval = std::time::Duration::from_secs(10); - let start = std::time::Instant::now(); - - tracing::info!("STDB-recovery: venter på at SpacetimeDB starter opp igjen (maks 10 min)"); - - // Vent til STDB svarer - loop { - if start.elapsed() > max_wait { - tracing::error!( - "STDB-recovery: SpacetimeDB kom ikke tilbake innen {} sekunder — gir opp", - max_wait.as_secs() - ); - recovering.store(false, Ordering::Relaxed); - return; - } - - tokio::time::sleep(check_interval).await; - - if check_stdb_health(&stdb).await { - tracing::info!( - "STDB-recovery: SpacetimeDB svarer igjen etter {}s", - start.elapsed().as_secs() - ); - break; - } - } - - // STDB er tilbake — kjør warmup - tracing::info!("STDB-recovery: kjører warmup (PG → STDB)"); - match crate::warmup::run(&db, &stdb).await { - Ok(stats) => { - tracing::info!( - "STDB-recovery: warmup fullført ({} noder, {} edges, {} access)", - stats.nodes, - stats.edges, - stats.access - ); - } - Err(e) => { - tracing::error!("STDB-recovery: warmup feilet: {e}"); - } - } - - recovering.store(false, Ordering::Relaxed); -} diff --git a/maskinrommet/src/summarize.rs b/maskinrommet/src/summarize.rs index ba0be41..92b516e 100644 --- a/maskinrommet/src/summarize.rs +++ b/maskinrommet/src/summarize.rs @@ -10,7 +10,6 @@ use std::process::Stdio; use uuid::Uuid; use crate::jobs::JobRow; -use crate::stdb::StdbClient; /// Synops-summarize binary path. /// Søker i PATH, men kan overrides med SYNOPS_SUMMARIZE_BIN. @@ -30,7 +29,6 @@ fn summarize_bin() -> String { pub async fn handle_summarize_communication( job: &JobRow, _db: &sqlx::PgPool, - _stdb: &StdbClient, ) -> Result { let communication_id: Uuid = job .payload diff --git a/maskinrommet/src/tts.rs b/maskinrommet/src/tts.rs index 55774f7..68f75b3 100644 --- a/maskinrommet/src/tts.rs +++ b/maskinrommet/src/tts.rs @@ -1,8 +1,8 @@ // TTS-dispatcher — delegerer til synops-tts CLI. // -// Maskinrommet beholder: voice_id-oppslag (payload > node metadata > env), -// og STDB-skriving (sanntidsvisning). Alt annet (ElevenLabs-kall, CAS-lagring, -// PG-skriving) gjøres av synops-tts. +// Maskinrommet beholder: voice_id-oppslag (payload > node metadata > env). +// Alt annet (ElevenLabs-kall, CAS-lagring, PG-skriving) gjøres av synops-tts. +// PG NOTIFY-triggere sender sanntidsoppdateringer. // // Jobbtype: "tts_generate" // Payload: { "text", "voice_id"?, "language"?, "source_node_id"?, "requested_by" } @@ -14,7 +14,6 @@ use uuid::Uuid; use crate::cli_dispatch; use crate::jobs::JobRow; -use crate::stdb::StdbClient; /// Synops-tts binary path. fn tts_bin() -> String { @@ -30,7 +29,6 @@ fn tts_bin() -> String { pub async fn handle_tts_job( job: &JobRow, db: &PgPool, - stdb: &StdbClient, ) -> Result { let text = job.payload["text"] .as_str() @@ -82,41 +80,8 @@ pub async fn handle_tts_job( let result = cli_dispatch::run_cli_tool(&bin, &mut cmd).await?; - // --- STDB-synk for sanntidsvisning --- - // synops-tts skriver PG. Vi synker til STDB for umiddelbar visning. - if let Some(media_node_id) = result["media_node_id"].as_str() { - let cas_hash = result["cas_hash"].as_str().unwrap_or(""); - let characters = result["characters"].as_u64().unwrap_or(0); - let title = format!("TTS: {}", truncate(text, 60)); - let metadata = serde_json::json!({ - "cas_hash": cas_hash, - "mime": "audio/mpeg", - "tts": { "voice_id": &voice_id, "characters": characters } - }); - - if let Err(e) = stdb.create_node( - media_node_id, "content", &title, "", "hidden", - &metadata.to_string(), &requested_by.to_string(), - ).await { - tracing::warn!(error = %e, "STDB create_node (tts) feilet (PG er allerede skrevet)"); - } - - // has_media-edge - if let Some(source_id) = source_node_id { - let edge_id = Uuid::now_v7().to_string(); - let edge_meta = serde_json::json!({ - "media_type": "tts_audio", - "generated_at": chrono::Utc::now().to_rfc3339() - }); - - if let Err(e) = stdb.create_edge( - &edge_id, &source_id.to_string(), media_node_id, "has_media", - &edge_meta.to_string(), false, &requested_by.to_string(), - ).await { - tracing::warn!(error = %e, "STDB create_edge (has_media) feilet (PG er allerede skrevet)"); - } - } - } + // PG-skriving gjøres av synops-tts med --write. + // PG NOTIFY-triggere sender sanntidsoppdateringer til WebSocket-klienter. tracing::info!( cas_hash = result["cas_hash"].as_str().unwrap_or("n/a"), diff --git a/maskinrommet/src/warmup.rs b/maskinrommet/src/warmup.rs deleted file mode 100644 index 08d526b..0000000 --- a/maskinrommet/src/warmup.rs +++ /dev/null @@ -1,141 +0,0 @@ -// Warmup: last hele grafen fra PG til SpacetimeDB ved oppstart. -// -// Sekvens: clear_all → noder → edges. -// Edges refererer til noder, så noder må lastes først. -// -// Ref: docs/infra/synkronisering.md - -use sqlx::PgPool; - -use crate::stdb::StdbClient; - -/// Last hele grafen fra PG til SpacetimeDB. -pub async fn run(db: &PgPool, stdb: &StdbClient) -> Result> { - tracing::info!("Warmup: starter (PG → SpacetimeDB)"); - - // 1. Tøm STDB for å unngå duplikater ved restart - stdb.clear_all().await?; - tracing::info!("Warmup: STDB tømt"); - - // 2. Last alle noder - let nodes = sqlx::query_as::<_, PgNode>( - "SELECT id, node_kind::text, COALESCE(title, '') as title, \ - COALESCE(content, '') as content, visibility::text, \ - COALESCE(metadata::text, '{}') as metadata, \ - created_at, COALESCE(created_by::text, '') as created_by \ - FROM nodes ORDER BY created_at" - ) - .fetch_all(db) - .await?; - - let node_count = nodes.len(); - for node in &nodes { - stdb.create_node( - &node.id.to_string(), - &node.node_kind, - &node.title, - &node.content, - &node.visibility, - &node.metadata, - &node.created_by, - ) - .await?; - } - tracing::info!("Warmup: {node_count} noder lastet"); - - // 3. Last alle edges - let edges = sqlx::query_as::<_, PgEdge>( - "SELECT id, source_id, target_id, edge_type, \ - COALESCE(metadata::text, '{}') as metadata, \ - system, created_at, COALESCE(created_by::text, '') as created_by \ - FROM edges ORDER BY created_at" - ) - .fetch_all(db) - .await?; - - let edge_count = edges.len(); - for edge in &edges { - stdb.create_edge( - &edge.id.to_string(), - &edge.source_id.to_string(), - &edge.target_id.to_string(), - &edge.edge_type, - &edge.metadata, - edge.system, - &edge.created_by, - ) - .await?; - } - tracing::info!("Warmup: {edge_count} edges lastet"); - - // 4. Last alle node_access-rader - let access_rows = sqlx::query_as::<_, PgNodeAccess>( - "SELECT subject_id, object_id, access::text, \ - COALESCE(via_edge::text, '') as via_edge \ - FROM node_access" - ) - .fetch_all(db) - .await?; - - let access_count = access_rows.len(); - for row in &access_rows { - stdb.upsert_node_access( - &row.subject_id.to_string(), - &row.object_id.to_string(), - &row.access, - &row.via_edge, - ) - .await?; - } - tracing::info!("Warmup: {access_count} node_access-rader lastet"); - - let stats = WarmupStats { - nodes: node_count, - edges: edge_count, - access: access_count, - }; - tracing::info!("Warmup: ferdig ({} noder, {} edges, {} access)", stats.nodes, stats.edges, stats.access); - Ok(stats) -} - -pub struct WarmupStats { - pub nodes: usize, - pub edges: usize, - pub access: usize, -} - -// PG-radtyper for sqlx -#[derive(sqlx::FromRow)] -#[allow(dead_code)] -struct PgNode { - id: uuid::Uuid, - node_kind: String, - title: String, - content: String, - visibility: String, - metadata: String, - created_at: chrono::DateTime, - created_by: String, -} - -#[derive(sqlx::FromRow)] -#[allow(dead_code)] -struct PgNodeAccess { - subject_id: uuid::Uuid, - object_id: uuid::Uuid, - access: String, - via_edge: String, -} - -#[derive(sqlx::FromRow)] -#[allow(dead_code)] -struct PgEdge { - id: uuid::Uuid, - source_id: uuid::Uuid, - target_id: uuid::Uuid, - edge_type: String, - metadata: String, - system: bool, - created_at: chrono::DateTime, - created_by: String, -} diff --git a/maskinrommet/src/workspace.rs b/maskinrommet/src/workspace.rs index 86fad47..f43c68f 100644 --- a/maskinrommet/src/workspace.rs +++ b/maskinrommet/src/workspace.rs @@ -75,97 +75,62 @@ pub async fn my_workspace( "panels": [] } }); - let metadata_str = metadata.to_string(); - let node_id_str = node_id.to_string(); - let created_by_str = user.node_id.to_string(); - // Skriv til SpacetimeDB (instant) - state - .stdb - .create_node( - &node_id_str, - "workspace", - &title, - "", - "hidden", - &metadata_str, - &created_by_str, - ) - .await - .map_err(|e| { - tracing::error!("STDB-feil ved workspace-opprettelse: {e}"); - ( - StatusCode::INTERNAL_SERVER_ERROR, - "SpacetimeDB-feil".to_string(), - ) - })?; + // Skriv workspace-node til PG + sqlx::query( + r#"INSERT INTO nodes (id, node_kind, title, content, visibility, metadata, created_by) + VALUES ($1, 'workspace', $2, '', 'hidden'::visibility, $3, $4) + ON CONFLICT (id) DO NOTHING"#, + ) + .bind(node_id) + .bind(&title) + .bind(&metadata) + .bind(user.node_id) + .execute(&state.db) + .await + .map_err(|e| { + tracing::error!("PG-feil ved workspace-opprettelse: {e}"); + (StatusCode::INTERNAL_SERVER_ERROR, "Databasefeil".to_string()) + })?; - // Async PG-skriving (for persistens) - let db = state.db.clone(); - let title_clone = title.clone(); - let metadata_str_clone = metadata_str.clone(); - tokio::spawn(async move { - if let Err(e) = sqlx::query( - r#" - INSERT INTO nodes (id, node_kind, title, content, visibility, metadata, created_by) - VALUES ($1, 'workspace', $2, '', 'hidden', $3, $4) - ON CONFLICT (id) DO NOTHING - "#, - ) - .bind(node_id) - .bind(&title_clone) - .bind(&metadata_str_clone) - .bind(user.node_id) - .execute(&db) - .await - { - tracing::error!("PG-feil ved workspace-skriving: {e}"); - } - }); - - // Opprett owner-edge + // Opprett owner-edge med recompute_access let edge_id = Uuid::now_v7(); - let edge_id_str = edge_id.to_string(); - state - .stdb - .create_edge( - &edge_id_str, - &created_by_str, - &node_id_str, - "owner", - "{}", - false, - &created_by_str, - ) - .await - .map_err(|e| { - tracing::error!("STDB-feil ved workspace owner-edge: {e}"); - ( - StatusCode::INTERNAL_SERVER_ERROR, - "SpacetimeDB-feil".to_string(), - ) - })?; + let mut tx = state.db.begin().await.map_err(|e| { + tracing::error!("PG begin: {e}"); + (StatusCode::INTERNAL_SERVER_ERROR, "Databasefeil".to_string()) + })?; - // Async PG edge-skriving - let db2 = state.db.clone(); - tokio::spawn(async move { - if let Err(e) = sqlx::query( - r#" - INSERT INTO edges (id, source_id, target_id, edge_type, metadata, system, created_by) - VALUES ($1, $2, $3, 'owner', '{}', false, $2) - ON CONFLICT (id) DO NOTHING - "#, - ) - .bind(edge_id) + sqlx::query( + r#"INSERT INTO edges (id, source_id, target_id, edge_type, metadata, system, created_by) + VALUES ($1, $2, $3, 'owner', '{}', false, $2) + ON CONFLICT (id) DO NOTHING"#, + ) + .bind(edge_id) + .bind(user.node_id) + .bind(node_id) + .execute(&mut *tx) + .await + .map_err(|e| { + tracing::error!("PG-feil ved workspace owner-edge: {e}"); + (StatusCode::INTERNAL_SERVER_ERROR, "Databasefeil".to_string()) + })?; + + sqlx::query("SELECT recompute_access($1, $2, 'owner'::access_level, $3)") .bind(user.node_id) .bind(node_id) - .execute(&db2) + .bind(edge_id) + .execute(&mut *tx) .await - { - tracing::error!("PG-feil ved workspace owner-edge skriving: {e}"); - } - }); + .map_err(|e| { + tracing::error!("recompute_access: {e}"); + (StatusCode::INTERNAL_SERVER_ERROR, "Databasefeil".to_string()) + })?; + + tx.commit().await.map_err(|e| { + tracing::error!("PG commit: {e}"); + (StatusCode::INTERNAL_SERVER_ERROR, "Databasefeil".to_string()) + })?; tracing::info!( node_id = %node_id, diff --git a/tasks.md b/tasks.md index 49bd258..7f727d3 100644 --- a/tasks.md +++ b/tasks.md @@ -281,8 +281,7 @@ ingen synk-kompleksitet. - [x] 22.1 WebSocket-lag i portvokteren: implementer PG LISTEN/NOTIFY-lytter og WebSocket-endepunkt. Legg til PG-triggers (`notify_node_change`, `notify_edge_change`) for nodes og edges. Frontend kobler til begge (STDB + nytt WS) i parallell for verifisering. - [x] 22.2 Frontend-migrering: erstatt SpacetimeDB-klient med vanlig WebSocket til portvokteren. Erstatt STDB-stores med reaktive stores som lytter på WebSocket. Verifiser all sanntidsfunksjonalitet (chat, kanban, kalender, mixer, canvas). -- [~] 22.3 Fjern STDB-skrivestien: portvokteren slutter å skrive til SpacetimeDB. All skriving går kun til PG. NOTIFY-triggere er eneste push-mekanisme. Verifiser at ingenting avhenger av STDB-data. - > Påbegynt: 2026-03-18T12:36 +- [x] 22.3 Fjern STDB-skrivestien: portvokteren slutter å skrive til SpacetimeDB. All skriving går kun til PG. NOTIFY-triggere er eneste push-mekanisme. Verifiser at ingenting avhenger av STDB-data. - [ ] 22.4 Fjern SpacetimeDB: stopp Docker-container, fjern STDB-modul, fjern STDB-klient fra portvokteren og frontend, fjern synkroniseringskode, oppdater docs og CLAUDE.md. - [ ] 22.5 Opprydding: arkiver STDB-relaterte erfaringsdocs, oppdater alle docs-referanser, fjern Docker-konfig for SpacetimeDB, fjern SpacetimeDB-loven fra feedback-memories.