Fjern STDB-skrivestien: all skriving går kun til PG (oppgave 22.3)

SpacetimeDB var brukt som «instant feedback»-lag mellom portvokteren
og frontend. Nå som PG NOTIFY-triggere og WebSocket er på plass
(oppgave 22.1–22.2), er STDB-skrivestien overflødig.

Endringer:
- intentions.rs: Alle CRUD-operasjoner (create/update/delete node/edge)
  skriver nå synkront til PG i stedet for STDB-først + async PG-jobbkø.
  PG NOTIFY-triggere gir umiddelbar sanntidsoppdatering til klienter.
  Tilgangsgivende edges (owner/admin/member_of/reader) bruker transaksjon
  med recompute_access direkte i handleren.
- maintenance.rs: Fjernet StdbClient fra alle funksjoner. Varsler
  opprettes/oppdateres/slettes direkte i PG.
- agent.rs, audio.rs, tts.rs, ai_process.rs: Fjernet STDB-synk etter
  CLI-verktøy-kjøring. PG NOTIFY dekker sanntidsvisning.
- pg_writes.rs: Fjernet sync_node_access_to_stdb. access_changed
  NOTIFY-trigger håndterer dette.
- workspace.rs: Synkrone PG-skrivinger med recompute_access.
- summarize.rs, ai_edges.rs: Fjernet StdbClient fra signaturer.
- jobs.rs: Fjernet StdbClient fra dispatch og start_worker.
- main.rs: Fjernet STDB-initialisering, warmup, stdb_monitor.
  StdbClient fjernet fra AppState. stdb.rs beholdt som død kode
  (fjernes i oppgave 22.4).
- health.rs: Fjernet STDB-helsesjekk fra dashboard.
- Slettet warmup.rs og stdb_monitor.rs (PG→STDB-synk ikke lenger
  relevant).
- docs/retninger/datalaget.md: Markert fase M3 som fullført.
This commit is contained in:
vegard 2026-03-18 13:11:33 +00:00
parent 24122c9354
commit aee6adc425
17 changed files with 488 additions and 1377 deletions

View file

@ -139,9 +139,13 @@ NOTIFY (ikke bare ID) slik at stores kan oppdateres uten ekstra
API-kall. Mixer-kanaler migrert fra STDB til PG-tabell med API-kall. Mixer-kanaler migrert fra STDB til PG-tabell med
tilhørende NOTIFY-trigger og HTTP API-endepunkter. tilhørende NOTIFY-trigger og HTTP API-endepunkter.
### Fase M3: Fjern skrivestien til STDB ### Fase M3: Fjern skrivestien til STDB ✅
Portvokteren slutter å skrive til SpacetimeDB. All skriving Portvokteren skriver kun til PG. STDB-skrivestien er fjernet.
går kun til PG. NOTIFY-triggere er eneste push-mekanisme. Alle intensjoner (create/update/delete node/edge) skriver
synkront til PG. NOTIFY-triggere er eneste push-mekanisme.
Warmup (PG→STDB) og STDB-monitor er fjernet. StdbClient er
fjernet fra AppState. Job-handlere (agent, audio, tts, ai_process)
synker ikke lenger til STDB — PG NOTIFY dekker sanntid.
### Fase M4: Fjern STDB ### Fase M4: Fjern STDB
- Stopp SpacetimeDB Docker-container - Stopp SpacetimeDB Docker-container

View file

@ -1,8 +1,8 @@
// Agent-dispatcher — delegerer prosessering til synops-respond CLI. // Agent-dispatcher — delegerer prosessering til synops-respond CLI.
// //
// Maskinrommet beholder: kill switch, rate limiting, loop-prevensjon, // Maskinrommet beholder: kill switch, rate limiting, loop-prevensjon.
// og STDB-skriving (sanntidsvisning). Alt annet (kontekst-henting, // Alt annet (kontekst-henting, prompt-bygging, claude-kall, PG-skriving)
// prompt-bygging, claude-kall, PG-skriving) gjøres av synops-respond. // gjøres av synops-respond. PG NOTIFY-triggere sender sanntidsoppdateringer.
// //
// Jobbtype: "agent_respond" // Jobbtype: "agent_respond"
// Payload: { "communication_id", "message_id", "agent_node_id", "sender_node_id" } // Payload: { "communication_id", "message_id", "agent_node_id", "sender_node_id" }
@ -15,7 +15,6 @@ use sqlx::PgPool;
use uuid::Uuid; use uuid::Uuid;
use crate::jobs::JobRow; use crate::jobs::JobRow;
use crate::stdb::StdbClient;
#[derive(Debug)] #[derive(Debug)]
struct AgentConfig { struct AgentConfig {
@ -32,7 +31,6 @@ fn respond_bin() -> String {
pub async fn handle_agent_respond( pub async fn handle_agent_respond(
job: &JobRow, job: &JobRow,
db: &PgPool, db: &PgPool,
stdb: &StdbClient,
) -> Result<serde_json::Value, String> { ) -> Result<serde_json::Value, String> {
let communication_id: Uuid = job.payload["communication_id"] let communication_id: Uuid = job.payload["communication_id"]
.as_str().and_then(|s| s.parse().ok()) .as_str().and_then(|s| s.parse().ok())
@ -136,29 +134,8 @@ pub async fn handle_agent_respond(
let result: serde_json::Value = serde_json::from_str(&stdout) let result: serde_json::Value = serde_json::from_str(&stdout)
.map_err(|e| format!("Kunne ikke parse synops-respond output: {e}"))?; .map_err(|e| format!("Kunne ikke parse synops-respond output: {e}"))?;
// --- STDB-skriving for sanntidsvisning (forblir i maskinrommet) --- // PG-skriving gjøres av synops-respond med --write.
// PG NOTIFY-triggere sender sanntidsoppdateringer til WebSocket-klienter.
if result["status"].as_str() == Some("completed") {
if let Some(reply_node_id) = result["reply_node_id"].as_str() {
let response_text = result["response_text"].as_str().unwrap_or("");
let agent_str = agent_node_id.to_string();
let comm_str = communication_id.to_string();
let edge_id = Uuid::now_v7().to_string();
let empty = serde_json::json!({}).to_string();
if let Err(e) = stdb.create_node(
reply_node_id, "content", "", response_text, "hidden", &empty, &agent_str,
).await {
tracing::warn!(error = %e, "STDB create_node feilet (PG er allerede skrevet)");
}
if let Err(e) = stdb.create_edge(
&edge_id, reply_node_id, &comm_str, "belongs_to", &empty, false, &agent_str,
).await {
tracing::warn!(error = %e, "STDB create_edge feilet (PG er allerede skrevet)");
}
}
}
tracing::info!( tracing::info!(
status = result["status"].as_str().unwrap_or("unknown"), status = result["status"].as_str().unwrap_or("unknown"),

View file

@ -3,10 +3,6 @@
// Maskinrommet orkestrerer, CLI-verktøyet gjør jobben (LLM-kall, // Maskinrommet orkestrerer, CLI-verktøyet gjør jobben (LLM-kall,
// topic-opprettelse, edge-skriving, ressurslogging). // topic-opprettelse, edge-skriving, ressurslogging).
// //
// STDB-synk for topic-noder og mentions-edges hoppes over her —
// topics er bakgrunnsdata i kunnskapsgrafen og trenger ikke
// sanntidsvisning. De synkes ved neste STDB-refresh.
//
// Jobbtype: "suggest_edges" // Jobbtype: "suggest_edges"
// Payload: { "node_id": "<uuid>" } // Payload: { "node_id": "<uuid>" }
// //
@ -17,7 +13,6 @@ use uuid::Uuid;
use crate::cli_dispatch; use crate::cli_dispatch;
use crate::jobs::JobRow; use crate::jobs::JobRow;
use crate::stdb::StdbClient;
/// Synops-suggest-edges binary path. /// Synops-suggest-edges binary path.
fn suggest_edges_bin() -> String { fn suggest_edges_bin() -> String {
@ -32,7 +27,6 @@ fn suggest_edges_bin() -> String {
pub async fn handle_suggest_edges( pub async fn handle_suggest_edges(
job: &JobRow, job: &JobRow,
_db: &sqlx::PgPool, _db: &sqlx::PgPool,
_stdb: &StdbClient,
) -> Result<serde_json::Value, String> { ) -> Result<serde_json::Value, String> {
let node_id: Uuid = job let node_id: Uuid = job
.payload .payload

View file

@ -27,7 +27,6 @@ use uuid::Uuid;
use crate::jobs::JobRow; use crate::jobs::JobRow;
use crate::resource_usage; use crate::resource_usage;
use crate::stdb::StdbClient;
#[derive(sqlx::FromRow)] #[derive(sqlx::FromRow)]
struct SourceNodeRow { struct SourceNodeRow {
@ -100,7 +99,6 @@ fn model_profile_to_alias(profile: &str) -> &'static str {
pub async fn handle_ai_process( pub async fn handle_ai_process(
job: &JobRow, job: &JobRow,
db: &PgPool, db: &PgPool,
stdb: &StdbClient,
) -> Result<serde_json::Value, String> { ) -> Result<serde_json::Value, String> {
let source_node_id: Uuid = job let source_node_id: Uuid = job
.payload .payload
@ -251,7 +249,7 @@ pub async fn handle_ai_process(
match direction { match direction {
"tool_to_node" => { "tool_to_node" => {
handle_tool_to_node( handle_tool_to_node(
db, stdb, job, source_node_id, ai_preset_id, requested_by, db, job, source_node_id, ai_preset_id, requested_by,
&source, &source_content, &ai_output, &source, &source_content, &ai_output,
).await?; ).await?;
@ -273,7 +271,7 @@ pub async fn handle_ai_process(
} }
"node_to_tool" => { "node_to_tool" => {
let new_node_id = handle_node_to_tool( let new_node_id = handle_node_to_tool(
db, stdb, source_node_id, ai_preset_id, requested_by, db, source_node_id, ai_preset_id, requested_by,
&source, &ai_output, preset.title.as_deref(), &source, &ai_output, preset.title.as_deref(),
).await?; ).await?;
@ -304,7 +302,6 @@ pub async fn handle_ai_process(
/// Ref: docs/features/ai_verktoy.md § 2.2 /// Ref: docs/features/ai_verktoy.md § 2.2
async fn handle_tool_to_node( async fn handle_tool_to_node(
db: &PgPool, db: &PgPool,
stdb: &StdbClient,
job: &JobRow, job: &JobRow,
source_node_id: Uuid, source_node_id: Uuid,
ai_preset_id: Uuid, ai_preset_id: Uuid,
@ -331,20 +328,7 @@ async fn handle_tool_to_node(
.await .await
.map_err(|e| format!("Kunne ikke lagre revisjon: {e}"))?; .map_err(|e| format!("Kunne ikke lagre revisjon: {e}"))?;
// 2. Oppdater node content i STDB (sanntid) // 2. Oppdater node content i PG (NOTIFY-trigger sender sanntidsoppdatering)
let metadata_str = source.metadata.to_string();
stdb.update_node(
&source_node_id.to_string(),
&source.node_kind,
source.title.as_deref().unwrap_or(""),
ai_output,
&source.visibility,
&metadata_str,
)
.await
.map_err(|e| format!("STDB update_node feilet: {e}"))?;
// 3. Oppdater node content i PG (persistering)
sqlx::query( sqlx::query(
"UPDATE nodes SET content = $2 WHERE id = $1", "UPDATE nodes SET content = $2 WHERE id = $1",
) )
@ -363,7 +347,6 @@ async fn handle_tool_to_node(
/// Ref: docs/features/ai_verktoy.md § 2.2 /// Ref: docs/features/ai_verktoy.md § 2.2
async fn handle_node_to_tool( async fn handle_node_to_tool(
db: &PgPool, db: &PgPool,
stdb: &StdbClient,
source_node_id: Uuid, source_node_id: Uuid,
ai_preset_id: Uuid, ai_preset_id: Uuid,
requested_by: Uuid, requested_by: Uuid,
@ -382,23 +365,8 @@ async fn handle_node_to_tool(
"source_node_id": source_node_id.to_string(), "source_node_id": source_node_id.to_string(),
"ai_preset_id": ai_preset_id.to_string() "ai_preset_id": ai_preset_id.to_string()
}); });
let new_metadata_str = new_metadata.to_string();
let empty_meta = serde_json::json!({}).to_string();
// 1. Opprett ny node i STDB (sanntid) // 1. Opprett ny node i PG (NOTIFY-trigger sender sanntidsoppdatering)
stdb.create_node(
&new_node_id.to_string(),
"content",
&new_title,
ai_output,
&source.visibility,
&new_metadata_str,
&requested_by.to_string(),
)
.await
.map_err(|e| format!("STDB create_node feilet: {e}"))?;
// 2. Opprett ny node i PG (persistering)
sqlx::query( sqlx::query(
r#" r#"
INSERT INTO nodes (id, node_kind, title, content, visibility, metadata, created_by) INSERT INTO nodes (id, node_kind, title, content, visibility, metadata, created_by)
@ -415,21 +383,8 @@ async fn handle_node_to_tool(
.await .await
.map_err(|e| format!("PG insert ny node feilet: {e}"))?; .map_err(|e| format!("PG insert ny node feilet: {e}"))?;
// 3. Opprett derived_from-edge: ny node → kilde-node // 2. Opprett derived_from-edge: ny node → kilde-node
// Sporbarhet: "denne noden er avledet fra kilden"
let derived_edge_id = Uuid::now_v7(); let derived_edge_id = Uuid::now_v7();
stdb.create_edge(
&derived_edge_id.to_string(),
&new_node_id.to_string(),
&source_node_id.to_string(),
"derived_from",
&empty_meta,
false,
&requested_by.to_string(),
)
.await
.map_err(|e| format!("STDB create_edge (derived_from) feilet: {e}"))?;
sqlx::query( sqlx::query(
r#" r#"
INSERT INTO edges (id, source_id, target_id, edge_type, metadata, system, created_by) INSERT INTO edges (id, source_id, target_id, edge_type, metadata, system, created_by)
@ -444,21 +399,8 @@ async fn handle_node_to_tool(
.await .await
.map_err(|e| format!("PG insert derived_from-edge feilet: {e}"))?; .map_err(|e| format!("PG insert derived_from-edge feilet: {e}"))?;
// 4. Opprett processed_by-edge: ny node → AI-preset // 3. Opprett processed_by-edge: ny node → AI-preset
// Sporbarhet: "denne noden ble prosessert av dette AI-verktøyet"
let processed_edge_id = Uuid::now_v7(); let processed_edge_id = Uuid::now_v7();
stdb.create_edge(
&processed_edge_id.to_string(),
&new_node_id.to_string(),
&ai_preset_id.to_string(),
"processed_by",
&empty_meta,
false,
&requested_by.to_string(),
)
.await
.map_err(|e| format!("STDB create_edge (processed_by) feilet: {e}"))?;
sqlx::query( sqlx::query(
r#" r#"
INSERT INTO edges (id, source_id, target_id, edge_type, metadata, system, created_by) INSERT INTO edges (id, source_id, target_id, edge_type, metadata, system, created_by)

View file

@ -10,7 +10,6 @@ use uuid::Uuid;
use crate::cas::CasStore; use crate::cas::CasStore;
use crate::jobs::JobRow; use crate::jobs::JobRow;
use crate::stdb::StdbClient;
// ─── EDL-datastrukturer ─────────────────────────────────────────── // ─── EDL-datastrukturer ───────────────────────────────────────────
@ -786,7 +785,6 @@ fn audio_bin() -> String {
pub async fn handle_audio_process_job( pub async fn handle_audio_process_job(
job: &JobRow, job: &JobRow,
_db: &PgPool, _db: &PgPool,
stdb: &StdbClient,
cas: &CasStore, cas: &CasStore,
) -> Result<serde_json::Value, String> { ) -> Result<serde_json::Value, String> {
let media_node_id: Uuid = job.payload["media_node_id"] let media_node_id: Uuid = job.payload["media_node_id"]
@ -837,41 +835,8 @@ pub async fn handle_audio_process_job(
let result = crate::cli_dispatch::run_cli_tool(&bin, &mut cmd).await?; let result = crate::cli_dispatch::run_cli_tool(&bin, &mut cmd).await?;
// --- STDB-synk for sanntidsvisning --- // PG-skriving gjøres av synops-audio med --write.
if let Some(processed_node_id) = result["processed_node_id"].as_str() { // PG NOTIFY-triggere sender sanntidsoppdateringer til WebSocket-klienter.
let result_hash = result["cas_hash"].as_str().unwrap_or("");
let result_size = result["size_bytes"].as_u64().unwrap_or(0);
let mime = match output_format {
"mp3" => "audio/mpeg",
"wav" => "audio/wav",
"flac" => "audio/flac",
"ogg" => "audio/ogg",
_ => "audio/mpeg",
};
let metadata = serde_json::json!({
"cas_hash": result_hash,
"mime": mime,
"size_bytes": result_size,
"source_hash": cas_hash,
});
if let Err(e) = stdb.create_node(
processed_node_id, "media", "Prosessert lyd", "", "hidden",
&metadata.to_string(), &requested_by.to_string(),
).await {
tracing::warn!(error = %e, "STDB create_node (audio) feilet (PG er allerede skrevet)");
}
// derived_from edge: processed → original
let edge_id = Uuid::now_v7().to_string();
if let Err(e) = stdb.create_edge(
&edge_id, processed_node_id, &media_node_id.to_string(),
"derived_from", "{}", true, &requested_by.to_string(),
).await {
tracing::warn!(error = %e, "STDB create_edge (derived_from) feilet (PG er allerede skrevet)");
}
}
tracing::info!( tracing::info!(
original = %media_node_id, original = %media_node_id,

View file

@ -102,24 +102,6 @@ async fn check_pg(db: &PgPool) -> ServiceStatus {
} }
} }
async fn check_stdb(stdb: &crate::stdb::StdbClient) -> ServiceStatus {
let start = std::time::Instant::now();
match stdb.delete_node("__healthcheck_nonexistent__").await {
Ok(()) => ServiceStatus {
name: "SpacetimeDB".to_string(),
status: "up".to_string(),
latency_ms: Some(start.elapsed().as_millis() as u64),
details: None,
},
Err(e) => ServiceStatus {
name: "SpacetimeDB".to_string(),
status: "down".to_string(),
latency_ms: None,
details: Some(format!("{e}")),
},
}
}
/// Sjekk en HTTP-tjeneste med timeout. /// Sjekk en HTTP-tjeneste med timeout.
async fn check_http_service(name: &str, url: &str) -> ServiceStatus { async fn check_http_service(name: &str, url: &str) -> ServiceStatus {
let client = reqwest::Client::builder() let client = reqwest::Client::builder()
@ -458,9 +440,8 @@ pub async fn health_dashboard(
_user: AuthUser, _user: AuthUser,
) -> Result<Json<HealthDashboard>, (StatusCode, Json<crate::intentions::ErrorResponse>)> { ) -> Result<Json<HealthDashboard>, (StatusCode, Json<crate::intentions::ErrorResponse>)> {
// Kjør alle tjeneste-sjekker parallelt // Kjør alle tjeneste-sjekker parallelt
let (pg, stdb, caddy, authentik, litellm, whisper, livekit) = tokio::join!( let (pg, caddy, authentik, litellm, whisper, livekit) = tokio::join!(
check_pg(&state.db), check_pg(&state.db),
check_stdb(&state.stdb),
check_caddy(), check_caddy(),
check_authentik(), check_authentik(),
check_litellm(), check_litellm(),
@ -468,7 +449,7 @@ pub async fn health_dashboard(
check_livekit(), check_livekit(),
); );
let services = vec![pg, stdb, caddy, authentik, litellm, whisper, livekit]; let services = vec![pg, caddy, authentik, litellm, whisper, livekit];
let metrics = collect_metrics(); let metrics = collect_metrics();
let backups = check_backups(); let backups = check_backups();
let pg_stats = collect_pg_stats(&state.db).await; let pg_stats = collect_pg_stats(&state.db).await;

File diff suppressed because it is too large Load diff

View file

@ -24,7 +24,6 @@ use crate::maintenance::MaintenanceState;
use crate::pg_writes; use crate::pg_writes;
use crate::publishing::IndexCache; use crate::publishing::IndexCache;
use crate::resources::{self, PriorityRules}; use crate::resources::{self, PriorityRules};
use crate::stdb::StdbClient;
use crate::summarize; use crate::summarize;
use crate::transcribe; use crate::transcribe;
use crate::tts; use crate::tts;
@ -170,7 +169,6 @@ async fn fail_job(db: &PgPool, job: &JobRow, error_msg: &str) -> Result<(), sqlx
async fn dispatch( async fn dispatch(
job: &JobRow, job: &JobRow,
db: &PgPool, db: &PgPool,
stdb: &StdbClient,
cas: &CasStore, cas: &CasStore,
index_cache: &IndexCache, index_cache: &IndexCache,
whisper_url: &str, whisper_url: &str,
@ -180,24 +178,22 @@ async fn dispatch(
transcribe::handle_whisper_job(job, cas, whisper_url).await transcribe::handle_whisper_job(job, cas, whisper_url).await
} }
"agent_respond" => { "agent_respond" => {
agent::handle_agent_respond(job, db, stdb).await agent::handle_agent_respond(job, db).await
} }
"suggest_edges" => { "suggest_edges" => {
ai_edges::handle_suggest_edges(job, db, stdb).await ai_edges::handle_suggest_edges(job, db).await
} }
"summarize_communication" => { "summarize_communication" => {
summarize::handle_summarize_communication(job, db, stdb).await summarize::handle_summarize_communication(job, db).await
} }
"tts_generate" => { "tts_generate" => {
tts::handle_tts_job(job, db, stdb).await tts::handle_tts_job(job, db).await
} }
"audio_process" => { "audio_process" => {
audio::handle_audio_process_job(job, db, stdb, cas).await audio::handle_audio_process_job(job, db, cas).await
} }
"ai_process" => { "ai_process" => {
// Fortsatt inline — ingen CLI-verktøy ennå. ai_process::handle_ai_process(job, db).await
// TODO: Lag synops-ai-process og migrer hit.
ai_process::handle_ai_process(job, db, stdb).await
} }
"render_article" => { "render_article" => {
handle_render_article(job, cas).await handle_render_article(job, cas).await
@ -210,7 +206,7 @@ async fn dispatch(
pg_writes::handle_insert_node(job, db).await pg_writes::handle_insert_node(job, db).await
} }
"pg_insert_edge" => { "pg_insert_edge" => {
pg_writes::handle_insert_edge(job, db, stdb, index_cache).await pg_writes::handle_insert_edge(job, db, index_cache).await
} }
"pg_update_node" => { "pg_update_node" => {
pg_writes::handle_update_node(job, db).await pg_writes::handle_update_node(job, db).await
@ -469,7 +465,6 @@ pub async fn cancel_job(db: &PgPool, job_id: Uuid) -> Result<bool, sqlx::Error>
/// slutter workeren å dequeue nye jobber (kjørende jobber fullføres). /// slutter workeren å dequeue nye jobber (kjørende jobber fullføres).
pub fn start_worker( pub fn start_worker(
db: PgPool, db: PgPool,
stdb: StdbClient,
cas: CasStore, cas: CasStore,
index_cache: IndexCache, index_cache: IndexCache,
maintenance: MaintenanceState, maintenance: MaintenanceState,
@ -611,7 +606,6 @@ pub fn start_worker(
// Kjør jobben i en egen tokio-task (frigjør poll-loopen) // Kjør jobben i en egen tokio-task (frigjør poll-loopen)
let db2 = db.clone(); let db2 = db.clone();
let stdb2 = stdb.clone();
let cas2 = cas.clone(); let cas2 = cas.clone();
let index_cache2 = index_cache.clone(); let index_cache2 = index_cache.clone();
let whisper_url2 = whisper_url.clone(); let whisper_url2 = whisper_url.clone();
@ -627,7 +621,7 @@ pub fn start_worker(
let result = tokio::time::timeout( let result = tokio::time::timeout(
std::time::Duration::from_secs(timeout_secs), std::time::Duration::from_secs(timeout_secs),
dispatch(&job, &db2, &stdb2, &cas2, &index_cache2, &whisper_url2), dispatch(&job, &db2, &cas2, &index_cache2, &whisper_url2),
) )
.await; .await;

View file

@ -22,7 +22,8 @@ pub mod resource_usage;
pub mod resources; pub mod resources;
mod rss; mod rss;
mod serving; mod serving;
mod stdb; #[allow(dead_code)]
mod stdb; // Beholdt som død kode — fjernes i oppgave 22.4
pub mod summarize; pub mod summarize;
pub mod ws; pub mod ws;
pub mod mixer; pub mod mixer;
@ -31,8 +32,6 @@ pub mod transcribe;
pub mod tts; pub mod tts;
pub mod usage_overview; pub mod usage_overview;
pub mod user_usage; pub mod user_usage;
mod stdb_monitor;
mod warmup;
mod workspace; mod workspace;
use axum::{extract::State, http::StatusCode, middleware, routing::{get, post}, Json, Router}; use axum::{extract::State, http::StatusCode, middleware, routing::{get, post}, Json, Router};
@ -43,14 +42,12 @@ use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilte
use auth::{AuthUser, JwksKeys}; use auth::{AuthUser, JwksKeys};
use cas::CasStore; use cas::CasStore;
use stdb::StdbClient;
use ws::WsBroadcast; use ws::WsBroadcast;
#[derive(Clone)] #[derive(Clone)]
pub struct AppState { pub struct AppState {
pub db: PgPool, pub db: PgPool,
pub jwks: JwksKeys, pub jwks: JwksKeys,
pub stdb: StdbClient,
pub cas: CasStore, pub cas: CasStore,
pub index_cache: publishing::IndexCache, pub index_cache: publishing::IndexCache,
pub dynamic_page_cache: publishing::DynamicPageCache, pub dynamic_page_cache: publishing::DynamicPageCache,
@ -65,7 +62,6 @@ struct HealthResponse {
status: &'static str, status: &'static str,
version: &'static str, version: &'static str,
db: &'static str, db: &'static str,
stdb: &'static str,
} }
#[derive(Serialize)] #[derive(Serialize)]
@ -115,46 +111,6 @@ async fn main() {
.await .await
.expect("Kunne ikke hente JWKS fra Authentik"); .expect("Kunne ikke hente JWKS fra Authentik");
// SpacetimeDB-klient
let stdb_url = std::env::var("SPACETIMEDB_URL")
.unwrap_or_else(|_| "http://spacetimedb:3000".to_string());
let stdb_database = std::env::var("SPACETIMEDB_DATABASE")
.unwrap_or_else(|_| "synops".to_string());
// Hent token fra miljøvariabel, eller opprett ny identitet
let stdb_token = match std::env::var("SPACETIMEDB_TOKEN") {
Ok(token) if !token.is_empty() => {
tracing::info!("Bruker konfigurert STDB-token");
token
}
_ => {
tracing::info!("Ingen STDB-token konfigurert, oppretter ny identitet");
let (identity, token) = StdbClient::create_identity(&stdb_url)
.await
.expect("Kunne ikke opprette STDB-identitet");
tracing::info!("Opprettet STDB-identitet: {identity}");
token
}
};
let stdb = StdbClient::new(&stdb_url, &stdb_database, &stdb_token);
// Warmup: last hele grafen fra PG til SpacetimeDB
match warmup::run(&db, &stdb).await {
Ok(stats) => {
tracing::info!(
"Warmup fullført: {} noder, {} edges, {} access",
stats.nodes,
stats.edges,
stats.access
);
}
Err(e) => {
tracing::error!("Warmup feilet: {e}");
// Fortsett likevel — STDB kan være midlertidig utilgjengelig
}
}
// CAS — content-addressable store for binærfiler // CAS — content-addressable store for binærfiler
let cas_root = std::env::var("CAS_ROOT") let cas_root = std::env::var("CAS_ROOT")
.unwrap_or_else(|_| "/srv/synops/media/cas".to_string()); .unwrap_or_else(|_| "/srv/synops/media/cas".to_string());
@ -174,7 +130,7 @@ async fn main() {
let index_cache = publishing::new_index_cache(); let index_cache = publishing::new_index_cache();
// Start jobbkø-worker i bakgrunnen (med ressursstyring, oppgave 15.5) // Start jobbkø-worker i bakgrunnen (med ressursstyring, oppgave 15.5)
jobs::start_worker(db.clone(), stdb.clone(), cas.clone(), index_cache.clone(), maintenance.clone(), priority_rules.clone()); jobs::start_worker(db.clone(), cas.clone(), index_cache.clone(), maintenance.clone(), priority_rules.clone());
// Start periodisk CAS-pruning i bakgrunnen // Start periodisk CAS-pruning i bakgrunnen
pruning::start_pruning_loop(db.clone(), cas.clone()); pruning::start_pruning_loop(db.clone(), cas.clone());
@ -191,9 +147,6 @@ async fn main() {
// Start nattlig bandwidth-parsing (oppgave 15.7) // Start nattlig bandwidth-parsing (oppgave 15.7)
bandwidth::start_bandwidth_parser(db.clone()); bandwidth::start_bandwidth_parser(db.clone());
// Start STDB-overvåker: oppdager krasj og gjenoppbygger fra PG (oppgave 12.2)
stdb_monitor::start_stdb_monitor(db.clone(), stdb.clone());
// Start periodisk CAS tmp-opprydding (oppgave 17.6) // Start periodisk CAS tmp-opprydding (oppgave 17.6)
cas::start_tmp_cleanup_loop(cas.clone()); cas::start_tmp_cleanup_loop(cas.clone());
let dynamic_page_cache = publishing::new_dynamic_page_cache(); let dynamic_page_cache = publishing::new_dynamic_page_cache();
@ -203,7 +156,7 @@ async fn main() {
let ws_broadcast = WsBroadcast::new(); let ws_broadcast = WsBroadcast::new();
ws::start_pg_listener(db.clone(), ws_broadcast.clone()); ws::start_pg_listener(db.clone(), ws_broadcast.clone());
let state = AppState { db, jwks, stdb, cas, index_cache, dynamic_page_cache, maintenance, priority_rules, metrics, ws_broadcast }; let state = AppState { db, jwks, cas, index_cache, dynamic_page_cache, maintenance, priority_rules, metrics, ws_broadcast };
// Ruter: /health er offentlig, /me krever gyldig JWT // Ruter: /health er offentlig, /me krever gyldig JWT
let app = Router::new() let app = Router::new()
@ -321,25 +274,17 @@ async fn main() {
axum::serve(listener, app).await.unwrap(); axum::serve(listener, app).await.unwrap();
} }
/// Offentlig helsesjekk — verifiserer PG- og STDB-tilkobling. /// Offentlig helsesjekk — verifiserer PG-tilkobling.
async fn health(State(state): State<AppState>) -> Result<Json<HealthResponse>, StatusCode> { async fn health(State(state): State<AppState>) -> Result<Json<HealthResponse>, StatusCode> {
sqlx::query("SELECT 1") sqlx::query("SELECT 1")
.execute(&state.db) .execute(&state.db)
.await .await
.map_err(|_| StatusCode::SERVICE_UNAVAILABLE)?; .map_err(|_| StatusCode::SERVICE_UNAVAILABLE)?;
// STDB helsesjekk: prøv å slette en ikke-eksisterende node.
// Kallet når STDB og returnerer ok (noop), men feiler ved nettverksfeil.
let stdb_status = match state.stdb.delete_node("__healthcheck_nonexistent__").await {
Ok(()) => "connected",
Err(_) => "unavailable",
};
Ok(Json(HealthResponse { Ok(Json(HealthResponse {
status: "ok", status: "ok",
version: env!("CARGO_PKG_VERSION"), version: env!("CARGO_PKG_VERSION"),
db: "connected", db: "connected",
stdb: stdb_status,
})) }))
} }

View file

@ -2,7 +2,7 @@
// //
// Flyt: // Flyt:
// 1. Admin kaller initiate_maintenance med tidspunkt // 1. Admin kaller initiate_maintenance med tidspunkt
// 2. System oppretter systemvarsel → frontend viser nedtelling // 2. System oppretter systemvarsel → frontend viser nedtelling via PG NOTIFY
// 3. Bakgrunnsoppgave venter til vedlikeholdstidspunkt // 3. Bakgrunnsoppgave venter til vedlikeholdstidspunkt
// 4. Setter maintenance_active → blokkerer nye LiveKit-rom + jobbkø stopper dequeue // 4. Setter maintenance_active → blokkerer nye LiveKit-rom + jobbkø stopper dequeue
// 5. Venter på at kjørende jobber fullføres (med timeout) // 5. Venter på at kjørende jobber fullføres (med timeout)
@ -18,8 +18,6 @@ use std::sync::Arc;
use tokio::sync::Mutex; use tokio::sync::Mutex;
use uuid::Uuid; use uuid::Uuid;
use crate::stdb::StdbClient;
/// Delt vedlikeholdstilstand — klones inn i AppState. /// Delt vedlikeholdstilstand — klones inn i AppState.
#[derive(Clone)] #[derive(Clone)]
pub struct MaintenanceState { pub struct MaintenanceState {
@ -102,26 +100,18 @@ impl MaintenanceState {
} }
/// Initier vedlikehold: sett tidspunkt, opprett varsel, start nedtelling. /// Initier vedlikehold: sett tidspunkt, opprett varsel, start nedtelling.
///
/// Oppretter en system_announcement med `critical`-type og `scheduled_at`.
/// Starter en bakgrunnsoppgave som venter til tidspunktet, aktiverer
/// maintenance mode, venter på jobber, og avslutter prosessen.
pub async fn initiate( pub async fn initiate(
&self, &self,
db: &PgPool, db: &PgPool,
stdb: &StdbClient,
scheduled_at: DateTime<Utc>, scheduled_at: DateTime<Utc>,
initiated_by: Uuid, initiated_by: Uuid,
) -> Result<Uuid, String> { ) -> Result<Uuid, String> {
// Sjekk at vi ikke allerede er i vedlikeholdsmodus
if self.is_initiated() { if self.is_initiated() {
return Err("Vedlikehold er allerede initiert".to_string()); return Err("Vedlikehold er allerede initiert".to_string());
} }
// Opprett systemvarsel // Opprett systemvarsel
let node_id = Uuid::now_v7(); let node_id = Uuid::now_v7();
let node_id_str = node_id.to_string();
let created_by_str = initiated_by.to_string();
let metadata = serde_json::json!({ let metadata = serde_json::json!({
"announcement_type": "critical", "announcement_type": "critical",
@ -129,22 +119,8 @@ impl MaintenanceState {
"blocks_new_sessions": true, "blocks_new_sessions": true,
"maintenance_shutdown": true, "maintenance_shutdown": true,
}); });
let metadata_str = metadata.to_string();
// STDB — umiddelbar broadcast til alle klienter // PG — persistent lagring (NOTIFY-trigger sender sanntidsoppdatering)
stdb.create_node(
&node_id_str,
"system_announcement",
"Planlagt vedlikehold",
&format!("Systemet stenges for vedlikehold. Lagre arbeidet ditt."),
"open",
&metadata_str,
&created_by_str,
)
.await
.map_err(|e| format!("STDB-feil: {e}"))?;
// PG — persistent lagring
sqlx::query( sqlx::query(
r#"INSERT INTO nodes (id, node_kind, title, content, visibility, metadata, created_by) r#"INSERT INTO nodes (id, node_kind, title, content, visibility, metadata, created_by)
VALUES ($1, 'system_announcement', 'Planlagt vedlikehold', VALUES ($1, 'system_announcement', 'Planlagt vedlikehold',
@ -168,9 +144,8 @@ impl MaintenanceState {
// Start bakgrunnsoppgave for shutdown-koordinering // Start bakgrunnsoppgave for shutdown-koordinering
let state = self.clone(); let state = self.clone();
let db2 = db.clone(); let db2 = db.clone();
let stdb2 = stdb.clone();
let handle = tokio::spawn(async move { let handle = tokio::spawn(async move {
shutdown_coordinator(state, db2, stdb2, scheduled_at, node_id).await; shutdown_coordinator(state, db2, scheduled_at, node_id).await;
}); });
// Lagre tilstand // Lagre tilstand
@ -188,7 +163,6 @@ impl MaintenanceState {
pub async fn cancel( pub async fn cancel(
&self, &self,
db: &PgPool, db: &PgPool,
stdb: &StdbClient,
) -> Result<(), String> { ) -> Result<(), String> {
if !self.is_initiated() { if !self.is_initiated() {
return Err("Ingen vedlikehold er initiert".to_string()); return Err("Ingen vedlikehold er initiert".to_string());
@ -201,12 +175,8 @@ impl MaintenanceState {
handle.abort(); handle.abort();
} }
// Slett varselet fra STDB og PG // Slett varselet fra PG
if let Some(nid) = inner.announcement_node_id.take() { if let Some(nid) = inner.announcement_node_id.take() {
let nid_str = nid.to_string();
if let Err(e) = stdb.delete_node(&nid_str).await {
tracing::warn!("Kunne ikke slette varsel fra STDB: {e}");
}
if let Err(e) = sqlx::query("DELETE FROM nodes WHERE id = $1") if let Err(e) = sqlx::query("DELETE FROM nodes WHERE id = $1")
.bind(nid) .bind(nid)
.execute(db) .execute(db)
@ -246,15 +216,9 @@ async fn fetch_running_jobs(db: &PgPool) -> Result<Vec<RunningJob>, sqlx::Error>
} }
/// Bakgrunnsoppgave som koordinerer nedstengningen. /// Bakgrunnsoppgave som koordinerer nedstengningen.
///
/// 1. Venter til scheduled_at
/// 2. Setter maintenance_active (blokkerer nye LiveKit-rom + jobbkø)
/// 3. Venter på at kjørende jobber fullføres (maks 5 min timeout)
/// 4. Avslutter prosessen (systemd restarter)
async fn shutdown_coordinator( async fn shutdown_coordinator(
state: MaintenanceState, state: MaintenanceState,
db: PgPool, db: PgPool,
stdb: StdbClient,
scheduled_at: DateTime<Utc>, scheduled_at: DateTime<Utc>,
announcement_id: Uuid, announcement_id: Uuid,
) { ) {
@ -280,15 +244,15 @@ async fn shutdown_coordinator(
"maintenance_shutdown": true, "maintenance_shutdown": true,
"maintenance_active": true, "maintenance_active": true,
}); });
let nid_str = announcement_id.to_string(); let _ = sqlx::query(
let _ = stdb.update_node( "UPDATE nodes SET title = $2, content = $3, metadata = $4 WHERE id = $1",
&nid_str, )
"system_announcement", .bind(announcement_id)
"Vedlikehold pågår", .bind("Vedlikehold pågår")
"Systemet stenger ned. Vent til vedlikeholdet er ferdig.", .bind("Systemet stenger ned. Vent til vedlikeholdet er ferdig.")
"open", .bind(&active_meta)
&active_meta.to_string(), .execute(&db)
).await; .await;
// Vent på at kjørende jobber fullføres (maks 5 minutter) // Vent på at kjørende jobber fullføres (maks 5 minutter)
let timeout = std::time::Duration::from_secs(300); let timeout = std::time::Duration::from_secs(300);
@ -320,8 +284,7 @@ async fn shutdown_coordinator(
tokio::time::sleep(std::time::Duration::from_secs(5)).await; tokio::time::sleep(std::time::Duration::from_secs(5)).await;
} }
// Slett varselet (klienter vil se at tilkoblingen forsvinner) // Slett varselet
let _ = stdb.delete_node(&nid_str).await;
if let Err(e) = sqlx::query("DELETE FROM nodes WHERE id = $1") if let Err(e) = sqlx::query("DELETE FROM nodes WHERE id = $1")
.bind(announcement_id) .bind(announcement_id)
.execute(&db) .execute(&db)

View file

@ -18,7 +18,6 @@ use uuid::Uuid;
use crate::jobs::JobRow; use crate::jobs::JobRow;
use crate::publishing::IndexCache; use crate::publishing::IndexCache;
use crate::stdb::StdbClient;
/// Prioritet for PG-skriveoperasjoner. Høy — data-konsistens er kritisk. /// Prioritet for PG-skriveoperasjoner. Høy — data-konsistens er kritisk.
const PG_WRITE_PRIORITY: i16 = 8; const PG_WRITE_PRIORITY: i16 = 8;
@ -208,7 +207,6 @@ fn edge_type_to_access_level(edge_type: &str) -> Option<&'static str> {
pub async fn handle_insert_edge( pub async fn handle_insert_edge(
job: &JobRow, job: &JobRow,
db: &PgPool, db: &PgPool,
stdb: &StdbClient,
index_cache: &IndexCache, index_cache: &IndexCache,
) -> Result<serde_json::Value, String> { ) -> Result<serde_json::Value, String> {
let p = &job.payload; let p = &job.payload;
@ -271,8 +269,7 @@ pub async fn handle_insert_edge(
); );
} }
// Synk node_access til STDB (best-effort, feil logger men feiler ikke jobben) // PG NOTIFY-triggere (access_changed) sender sanntidsoppdateringer.
sync_node_access_to_stdb(db, stdb, source_id).await;
} else { } else {
// Vanlig edge // Vanlig edge
sqlx::query( sqlx::query(
@ -392,57 +389,6 @@ pub async fn handle_delete_edge(
// Hjelpefunksjoner (flyttet fra intentions.rs for gjenbruk) // Hjelpefunksjoner (flyttet fra intentions.rs for gjenbruk)
// ============================================================================= // =============================================================================
/// Synkroniserer node_access-rader for et subject fra PG til STDB.
async fn sync_node_access_to_stdb(db: &PgPool, stdb: &StdbClient, subject_id: Uuid) {
#[derive(sqlx::FromRow)]
struct NodeAccessRow {
subject_id: Uuid,
object_id: Uuid,
access: String,
via_edge: String,
}
let rows = sqlx::query_as::<_, NodeAccessRow>(
"SELECT subject_id, object_id, access::text as access, \
COALESCE(via_edge::text, '') as via_edge \
FROM node_access WHERE subject_id = $1",
)
.bind(subject_id)
.fetch_all(db)
.await;
match rows {
Ok(rows) => {
for row in &rows {
if let Err(e) = stdb
.upsert_node_access(
&row.subject_id.to_string(),
&row.object_id.to_string(),
&row.access,
&row.via_edge,
)
.await
{
tracing::error!(
subject_id = %row.subject_id,
object_id = %row.object_id,
error = %e,
"Kunne ikke synke node_access til STDB (pg_writes)"
);
}
}
tracing::info!(
subject_id = %subject_id,
count = rows.len(),
"node_access synket til STDB (via jobbkø)"
);
}
Err(e) => {
tracing::error!(subject_id = %subject_id, error = %e, "Kunne ikke hente node_access fra PG");
}
}
}
/// Trigger artikkelrendering hvis target er en publiseringssamling. /// Trigger artikkelrendering hvis target er en publiseringssamling.
async fn trigger_render_if_publishing( async fn trigger_render_if_publishing(
db: &PgPool, db: &PgPool,

View file

@ -1,145 +0,0 @@
// STDB-overvåker: oppdager SpacetimeDB-krasj og gjenoppbygger fra PG.
//
// Kjører i bakgrunnen med jevnlig helsesjekk. Hvis STDB var oppe og
// deretter feiler, kjøres warmup automatisk for å gjenoppbygge tilstand.
//
// Sekvens ved krasj:
// 1. Oppdage at STDB er nede (helsesjekk feiler)
// 2. Vente til STDB er tilbake (container restarter)
// 3. Kjøre warmup (PG → STDB)
// 4. Logge hendelsen
//
// Ref: docs/infra/backup.md, docs/infra/synkronisering.md
use sqlx::PgPool;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use crate::stdb::StdbClient;
/// Start STDB-overvåker i bakgrunnen.
/// Sjekker STDB-helse hvert 30. sekund og kjører warmup ved krasj.
pub fn start_stdb_monitor(db: PgPool, stdb: StdbClient) {
tokio::spawn(async move {
monitor_loop(db, stdb).await;
});
}
/// Intern tilstand for overvåkeren.
struct MonitorState {
/// Var STDB oppe ved forrige sjekk?
was_up: bool,
/// Pågår det en recovery akkurat nå?
recovering: Arc<AtomicBool>,
}
async fn monitor_loop(db: PgPool, stdb: StdbClient) {
let mut state = MonitorState {
was_up: true, // Antar oppe etter warmup ved oppstart
recovering: Arc::new(AtomicBool::new(false)),
};
// Vent litt etter oppstart slik at warmup fullføres først
tokio::time::sleep(std::time::Duration::from_secs(60)).await;
tracing::info!("STDB-overvåker startet (sjekker hvert 30s)");
let mut interval = tokio::time::interval(std::time::Duration::from_secs(30));
loop {
interval.tick().await;
// Ikke sjekk hvis recovery allerede pågår
if state.recovering.load(Ordering::Relaxed) {
continue;
}
let is_up = check_stdb_health(&stdb).await;
match (state.was_up, is_up) {
(true, false) => {
// STDB gikk ned! Logg og start recovery-venting.
tracing::error!("STDB-overvåker: SpacetimeDB er NEDE — starter recovery-prosess");
state.recovering.store(true, Ordering::Relaxed);
let db_clone = db.clone();
let stdb_clone = stdb.clone();
let recovering = state.recovering.clone();
tokio::spawn(async move {
recover_stdb(db_clone, stdb_clone, recovering).await;
});
}
(false, true) => {
// STDB kom tilbake uten vår hjelp (recovery-tasken fikset det)
tracing::info!("STDB-overvåker: SpacetimeDB er tilbake");
state.was_up = true;
}
(false, false) => {
// Fortsatt nede — recovery-tasken håndterer dette
}
(true, true) => {
// Alt OK
}
}
if is_up {
state.was_up = true;
} else if !state.recovering.load(Ordering::Relaxed) {
state.was_up = false;
}
}
}
/// Sjekk om STDB svarer på en enkel reducer-kall.
async fn check_stdb_health(stdb: &StdbClient) -> bool {
stdb.delete_node("__healthcheck_nonexistent__").await.is_ok()
}
/// Vent til STDB er tilbake, deretter kjør warmup.
async fn recover_stdb(db: PgPool, stdb: StdbClient, recovering: Arc<AtomicBool>) {
let max_wait = std::time::Duration::from_secs(600); // Maks 10 min
let check_interval = std::time::Duration::from_secs(10);
let start = std::time::Instant::now();
tracing::info!("STDB-recovery: venter på at SpacetimeDB starter opp igjen (maks 10 min)");
// Vent til STDB svarer
loop {
if start.elapsed() > max_wait {
tracing::error!(
"STDB-recovery: SpacetimeDB kom ikke tilbake innen {} sekunder — gir opp",
max_wait.as_secs()
);
recovering.store(false, Ordering::Relaxed);
return;
}
tokio::time::sleep(check_interval).await;
if check_stdb_health(&stdb).await {
tracing::info!(
"STDB-recovery: SpacetimeDB svarer igjen etter {}s",
start.elapsed().as_secs()
);
break;
}
}
// STDB er tilbake — kjør warmup
tracing::info!("STDB-recovery: kjører warmup (PG → STDB)");
match crate::warmup::run(&db, &stdb).await {
Ok(stats) => {
tracing::info!(
"STDB-recovery: warmup fullført ({} noder, {} edges, {} access)",
stats.nodes,
stats.edges,
stats.access
);
}
Err(e) => {
tracing::error!("STDB-recovery: warmup feilet: {e}");
}
}
recovering.store(false, Ordering::Relaxed);
}

View file

@ -10,7 +10,6 @@ use std::process::Stdio;
use uuid::Uuid; use uuid::Uuid;
use crate::jobs::JobRow; use crate::jobs::JobRow;
use crate::stdb::StdbClient;
/// Synops-summarize binary path. /// Synops-summarize binary path.
/// Søker i PATH, men kan overrides med SYNOPS_SUMMARIZE_BIN. /// Søker i PATH, men kan overrides med SYNOPS_SUMMARIZE_BIN.
@ -30,7 +29,6 @@ fn summarize_bin() -> String {
pub async fn handle_summarize_communication( pub async fn handle_summarize_communication(
job: &JobRow, job: &JobRow,
_db: &sqlx::PgPool, _db: &sqlx::PgPool,
_stdb: &StdbClient,
) -> Result<serde_json::Value, String> { ) -> Result<serde_json::Value, String> {
let communication_id: Uuid = job let communication_id: Uuid = job
.payload .payload

View file

@ -1,8 +1,8 @@
// TTS-dispatcher — delegerer til synops-tts CLI. // TTS-dispatcher — delegerer til synops-tts CLI.
// //
// Maskinrommet beholder: voice_id-oppslag (payload > node metadata > env), // Maskinrommet beholder: voice_id-oppslag (payload > node metadata > env).
// og STDB-skriving (sanntidsvisning). Alt annet (ElevenLabs-kall, CAS-lagring, // Alt annet (ElevenLabs-kall, CAS-lagring, PG-skriving) gjøres av synops-tts.
// PG-skriving) gjøres av synops-tts. // PG NOTIFY-triggere sender sanntidsoppdateringer.
// //
// Jobbtype: "tts_generate" // Jobbtype: "tts_generate"
// Payload: { "text", "voice_id"?, "language"?, "source_node_id"?, "requested_by" } // Payload: { "text", "voice_id"?, "language"?, "source_node_id"?, "requested_by" }
@ -14,7 +14,6 @@ use uuid::Uuid;
use crate::cli_dispatch; use crate::cli_dispatch;
use crate::jobs::JobRow; use crate::jobs::JobRow;
use crate::stdb::StdbClient;
/// Synops-tts binary path. /// Synops-tts binary path.
fn tts_bin() -> String { fn tts_bin() -> String {
@ -30,7 +29,6 @@ fn tts_bin() -> String {
pub async fn handle_tts_job( pub async fn handle_tts_job(
job: &JobRow, job: &JobRow,
db: &PgPool, db: &PgPool,
stdb: &StdbClient,
) -> Result<serde_json::Value, String> { ) -> Result<serde_json::Value, String> {
let text = job.payload["text"] let text = job.payload["text"]
.as_str() .as_str()
@ -82,41 +80,8 @@ pub async fn handle_tts_job(
let result = cli_dispatch::run_cli_tool(&bin, &mut cmd).await?; let result = cli_dispatch::run_cli_tool(&bin, &mut cmd).await?;
// --- STDB-synk for sanntidsvisning --- // PG-skriving gjøres av synops-tts med --write.
// synops-tts skriver PG. Vi synker til STDB for umiddelbar visning. // PG NOTIFY-triggere sender sanntidsoppdateringer til WebSocket-klienter.
if let Some(media_node_id) = result["media_node_id"].as_str() {
let cas_hash = result["cas_hash"].as_str().unwrap_or("");
let characters = result["characters"].as_u64().unwrap_or(0);
let title = format!("TTS: {}", truncate(text, 60));
let metadata = serde_json::json!({
"cas_hash": cas_hash,
"mime": "audio/mpeg",
"tts": { "voice_id": &voice_id, "characters": characters }
});
if let Err(e) = stdb.create_node(
media_node_id, "content", &title, "", "hidden",
&metadata.to_string(), &requested_by.to_string(),
).await {
tracing::warn!(error = %e, "STDB create_node (tts) feilet (PG er allerede skrevet)");
}
// has_media-edge
if let Some(source_id) = source_node_id {
let edge_id = Uuid::now_v7().to_string();
let edge_meta = serde_json::json!({
"media_type": "tts_audio",
"generated_at": chrono::Utc::now().to_rfc3339()
});
if let Err(e) = stdb.create_edge(
&edge_id, &source_id.to_string(), media_node_id, "has_media",
&edge_meta.to_string(), false, &requested_by.to_string(),
).await {
tracing::warn!(error = %e, "STDB create_edge (has_media) feilet (PG er allerede skrevet)");
}
}
}
tracing::info!( tracing::info!(
cas_hash = result["cas_hash"].as_str().unwrap_or("n/a"), cas_hash = result["cas_hash"].as_str().unwrap_or("n/a"),

View file

@ -1,141 +0,0 @@
// Warmup: last hele grafen fra PG til SpacetimeDB ved oppstart.
//
// Sekvens: clear_all → noder → edges.
// Edges refererer til noder, så noder må lastes først.
//
// Ref: docs/infra/synkronisering.md
use sqlx::PgPool;
use crate::stdb::StdbClient;
/// Last hele grafen fra PG til SpacetimeDB.
pub async fn run(db: &PgPool, stdb: &StdbClient) -> Result<WarmupStats, Box<dyn std::error::Error>> {
tracing::info!("Warmup: starter (PG → SpacetimeDB)");
// 1. Tøm STDB for å unngå duplikater ved restart
stdb.clear_all().await?;
tracing::info!("Warmup: STDB tømt");
// 2. Last alle noder
let nodes = sqlx::query_as::<_, PgNode>(
"SELECT id, node_kind::text, COALESCE(title, '') as title, \
COALESCE(content, '') as content, visibility::text, \
COALESCE(metadata::text, '{}') as metadata, \
created_at, COALESCE(created_by::text, '') as created_by \
FROM nodes ORDER BY created_at"
)
.fetch_all(db)
.await?;
let node_count = nodes.len();
for node in &nodes {
stdb.create_node(
&node.id.to_string(),
&node.node_kind,
&node.title,
&node.content,
&node.visibility,
&node.metadata,
&node.created_by,
)
.await?;
}
tracing::info!("Warmup: {node_count} noder lastet");
// 3. Last alle edges
let edges = sqlx::query_as::<_, PgEdge>(
"SELECT id, source_id, target_id, edge_type, \
COALESCE(metadata::text, '{}') as metadata, \
system, created_at, COALESCE(created_by::text, '') as created_by \
FROM edges ORDER BY created_at"
)
.fetch_all(db)
.await?;
let edge_count = edges.len();
for edge in &edges {
stdb.create_edge(
&edge.id.to_string(),
&edge.source_id.to_string(),
&edge.target_id.to_string(),
&edge.edge_type,
&edge.metadata,
edge.system,
&edge.created_by,
)
.await?;
}
tracing::info!("Warmup: {edge_count} edges lastet");
// 4. Last alle node_access-rader
let access_rows = sqlx::query_as::<_, PgNodeAccess>(
"SELECT subject_id, object_id, access::text, \
COALESCE(via_edge::text, '') as via_edge \
FROM node_access"
)
.fetch_all(db)
.await?;
let access_count = access_rows.len();
for row in &access_rows {
stdb.upsert_node_access(
&row.subject_id.to_string(),
&row.object_id.to_string(),
&row.access,
&row.via_edge,
)
.await?;
}
tracing::info!("Warmup: {access_count} node_access-rader lastet");
let stats = WarmupStats {
nodes: node_count,
edges: edge_count,
access: access_count,
};
tracing::info!("Warmup: ferdig ({} noder, {} edges, {} access)", stats.nodes, stats.edges, stats.access);
Ok(stats)
}
pub struct WarmupStats {
pub nodes: usize,
pub edges: usize,
pub access: usize,
}
// PG-radtyper for sqlx
#[derive(sqlx::FromRow)]
#[allow(dead_code)]
struct PgNode {
id: uuid::Uuid,
node_kind: String,
title: String,
content: String,
visibility: String,
metadata: String,
created_at: chrono::DateTime<chrono::Utc>,
created_by: String,
}
#[derive(sqlx::FromRow)]
#[allow(dead_code)]
struct PgNodeAccess {
subject_id: uuid::Uuid,
object_id: uuid::Uuid,
access: String,
via_edge: String,
}
#[derive(sqlx::FromRow)]
#[allow(dead_code)]
struct PgEdge {
id: uuid::Uuid,
source_id: uuid::Uuid,
target_id: uuid::Uuid,
edge_type: String,
metadata: String,
system: bool,
created_at: chrono::DateTime<chrono::Utc>,
created_by: String,
}

View file

@ -75,97 +75,62 @@ pub async fn my_workspace(
"panels": [] "panels": []
} }
}); });
let metadata_str = metadata.to_string();
let node_id_str = node_id.to_string();
let created_by_str = user.node_id.to_string();
// Skriv til SpacetimeDB (instant) // Skriv workspace-node til PG
state sqlx::query(
.stdb r#"INSERT INTO nodes (id, node_kind, title, content, visibility, metadata, created_by)
.create_node( VALUES ($1, 'workspace', $2, '', 'hidden'::visibility, $3, $4)
&node_id_str, ON CONFLICT (id) DO NOTHING"#,
"workspace",
&title,
"",
"hidden",
&metadata_str,
&created_by_str,
)
.await
.map_err(|e| {
tracing::error!("STDB-feil ved workspace-opprettelse: {e}");
(
StatusCode::INTERNAL_SERVER_ERROR,
"SpacetimeDB-feil".to_string(),
)
})?;
// Async PG-skriving (for persistens)
let db = state.db.clone();
let title_clone = title.clone();
let metadata_str_clone = metadata_str.clone();
tokio::spawn(async move {
if let Err(e) = sqlx::query(
r#"
INSERT INTO nodes (id, node_kind, title, content, visibility, metadata, created_by)
VALUES ($1, 'workspace', $2, '', 'hidden', $3, $4)
ON CONFLICT (id) DO NOTHING
"#,
) )
.bind(node_id) .bind(node_id)
.bind(&title_clone) .bind(&title)
.bind(&metadata_str_clone) .bind(&metadata)
.bind(user.node_id) .bind(user.node_id)
.execute(&db) .execute(&state.db)
.await
{
tracing::error!("PG-feil ved workspace-skriving: {e}");
}
});
// Opprett owner-edge
let edge_id = Uuid::now_v7();
let edge_id_str = edge_id.to_string();
state
.stdb
.create_edge(
&edge_id_str,
&created_by_str,
&node_id_str,
"owner",
"{}",
false,
&created_by_str,
)
.await .await
.map_err(|e| { .map_err(|e| {
tracing::error!("STDB-feil ved workspace owner-edge: {e}"); tracing::error!("PG-feil ved workspace-opprettelse: {e}");
( (StatusCode::INTERNAL_SERVER_ERROR, "Databasefeil".to_string())
StatusCode::INTERNAL_SERVER_ERROR,
"SpacetimeDB-feil".to_string(),
)
})?; })?;
// Async PG edge-skriving // Opprett owner-edge med recompute_access
let db2 = state.db.clone(); let edge_id = Uuid::now_v7();
tokio::spawn(async move {
if let Err(e) = sqlx::query( let mut tx = state.db.begin().await.map_err(|e| {
r#" tracing::error!("PG begin: {e}");
INSERT INTO edges (id, source_id, target_id, edge_type, metadata, system, created_by) (StatusCode::INTERNAL_SERVER_ERROR, "Databasefeil".to_string())
})?;
sqlx::query(
r#"INSERT INTO edges (id, source_id, target_id, edge_type, metadata, system, created_by)
VALUES ($1, $2, $3, 'owner', '{}', false, $2) VALUES ($1, $2, $3, 'owner', '{}', false, $2)
ON CONFLICT (id) DO NOTHING ON CONFLICT (id) DO NOTHING"#,
"#,
) )
.bind(edge_id) .bind(edge_id)
.bind(user.node_id) .bind(user.node_id)
.bind(node_id) .bind(node_id)
.execute(&db2) .execute(&mut *tx)
.await .await
{ .map_err(|e| {
tracing::error!("PG-feil ved workspace owner-edge skriving: {e}"); tracing::error!("PG-feil ved workspace owner-edge: {e}");
} (StatusCode::INTERNAL_SERVER_ERROR, "Databasefeil".to_string())
}); })?;
sqlx::query("SELECT recompute_access($1, $2, 'owner'::access_level, $3)")
.bind(user.node_id)
.bind(node_id)
.bind(edge_id)
.execute(&mut *tx)
.await
.map_err(|e| {
tracing::error!("recompute_access: {e}");
(StatusCode::INTERNAL_SERVER_ERROR, "Databasefeil".to_string())
})?;
tx.commit().await.map_err(|e| {
tracing::error!("PG commit: {e}");
(StatusCode::INTERNAL_SERVER_ERROR, "Databasefeil".to_string())
})?;
tracing::info!( tracing::info!(
node_id = %node_id, node_id = %node_id,

View file

@ -281,8 +281,7 @@ ingen synk-kompleksitet.
- [x] 22.1 WebSocket-lag i portvokteren: implementer PG LISTEN/NOTIFY-lytter og WebSocket-endepunkt. Legg til PG-triggers (`notify_node_change`, `notify_edge_change`) for nodes og edges. Frontend kobler til begge (STDB + nytt WS) i parallell for verifisering. - [x] 22.1 WebSocket-lag i portvokteren: implementer PG LISTEN/NOTIFY-lytter og WebSocket-endepunkt. Legg til PG-triggers (`notify_node_change`, `notify_edge_change`) for nodes og edges. Frontend kobler til begge (STDB + nytt WS) i parallell for verifisering.
- [x] 22.2 Frontend-migrering: erstatt SpacetimeDB-klient med vanlig WebSocket til portvokteren. Erstatt STDB-stores med reaktive stores som lytter på WebSocket. Verifiser all sanntidsfunksjonalitet (chat, kanban, kalender, mixer, canvas). - [x] 22.2 Frontend-migrering: erstatt SpacetimeDB-klient med vanlig WebSocket til portvokteren. Erstatt STDB-stores med reaktive stores som lytter på WebSocket. Verifiser all sanntidsfunksjonalitet (chat, kanban, kalender, mixer, canvas).
- [~] 22.3 Fjern STDB-skrivestien: portvokteren slutter å skrive til SpacetimeDB. All skriving går kun til PG. NOTIFY-triggere er eneste push-mekanisme. Verifiser at ingenting avhenger av STDB-data. - [x] 22.3 Fjern STDB-skrivestien: portvokteren slutter å skrive til SpacetimeDB. All skriving går kun til PG. NOTIFY-triggere er eneste push-mekanisme. Verifiser at ingenting avhenger av STDB-data.
> Påbegynt: 2026-03-18T12:36
- [ ] 22.4 Fjern SpacetimeDB: stopp Docker-container, fjern STDB-modul, fjern STDB-klient fra portvokteren og frontend, fjern synkroniseringskode, oppdater docs og CLAUDE.md. - [ ] 22.4 Fjern SpacetimeDB: stopp Docker-container, fjern STDB-modul, fjern STDB-klient fra portvokteren og frontend, fjern synkroniseringskode, oppdater docs og CLAUDE.md.
- [ ] 22.5 Opprydding: arkiver STDB-relaterte erfaringsdocs, oppdater alle docs-referanser, fjern Docker-konfig for SpacetimeDB, fjern SpacetimeDB-loven fra feedback-memories. - [ ] 22.5 Opprydding: arkiver STDB-relaterte erfaringsdocs, oppdater alle docs-referanser, fjern Docker-konfig for SpacetimeDB, fjern SpacetimeDB-loven fra feedback-memories.