From 0fc559a20789c39def76a387d4b935a228933ab9 Mon Sep 17 00:00:00 2001 From: vegard Date: Wed, 18 Mar 2026 11:26:48 +0000 Subject: [PATCH] =?UTF-8?q?Feilh=C3=A5ndtering:=20retry=20med=20backoff=20?= =?UTF-8?q?+=20dead=20letter=20queue=20for=20PG-skrivinger=20(oppgave=2012?= =?UTF-8?q?.3)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Erstatter fire-and-forget tokio::spawn() i skrivestien med jobbkø-basert persistering. Alle PG-skriveoperasjoner (insert/update/delete for noder og edges) går nå gjennom den eksisterende jobbkøen som allerede har: - Eksponentiell backoff (30s × 2^n) ved feil - Dead letter queue (status='error' etter max_attempts=3) - Admin-API for overvåking, manuell retry og avbryt - Ressursstyring og prioritetsregler Ny modul pg_writes.rs med: - 5 enqueue-funksjoner (erstatter spawn_pg_*) - 5 job-handlere for dispatch i worker-loopen - Full paritet med gammel logikk: tilgangsgivende edges kjører recompute_access i transaksjon, synker til STDB, trigger rendering Før: PG-skrivefeil logget og glemt → data kun i STDB, tapt fra PG. Nå: automatisk retry → admin-synlig dead letter → manuell recovery. --- docs/infra/jobbkø.md | 5 + maskinrommet/src/intentions.rs | 416 ++------------------------- maskinrommet/src/jobs.rs | 23 +- maskinrommet/src/main.rs | 7 +- maskinrommet/src/pg_writes.rs | 509 +++++++++++++++++++++++++++++++++ tasks.md | 3 +- 6 files changed, 566 insertions(+), 397 deletions(-) create mode 100644 maskinrommet/src/pg_writes.rs diff --git a/docs/infra/jobbkø.md b/docs/infra/jobbkø.md index 6da3bd9..8a1dc08 100644 --- a/docs/infra/jobbkø.md +++ b/docs/infra/jobbkø.md @@ -181,6 +181,11 @@ Status lagres i `disk_status_log`-tabellen (siste 1000 målinger beholdes). Admi | `summarize_communication` | Oppsummering (AI) | Generer AI-sammendrag av kommunikasjonsnode (chat/møte). Oppretter content-node med summary-edge tilbake. Trigges via `/intentions/summarize` | | `url_ingest` | Web Clipper (proposal) | Hent URL, oppsummer via AI, opprett research-klipp med graf-koblinger | | `generate_waveform` | Waveforms (proposal) | Generer audio-peaks fra lydfil for visuell bølgeform | +| `pg_insert_node` | PG-skrivestien | Persister node til PostgreSQL med retry og dead letter (oppgave 12.3) | +| `pg_insert_edge` | PG-skrivestien | Persister edge til PostgreSQL, inkl. recompute_access for tilgangsgivende edges | +| `pg_update_node` | PG-skrivestien | Oppdater node i PostgreSQL med retry | +| `pg_delete_node` | PG-skrivestien | Slett node fra PostgreSQL med retry | +| `pg_delete_edge` | PG-skrivestien | Slett edge fra PostgreSQL med retry, invalider publiserings-cache | ## 6. Tilgangsisolasjon Alle jobber merkes med `collection_node_id`. Rust-workers kjører som superuser (bypasser RLS) og sikrer isolasjon i applikasjonskode: diff --git a/maskinrommet/src/intentions.rs b/maskinrommet/src/intentions.rs index 04ca886..0a4df73 100644 --- a/maskinrommet/src/intentions.rs +++ b/maskinrommet/src/intentions.rs @@ -568,12 +568,12 @@ pub async fn create_node( "Node opprettet i STDB" ); - // Fang verdier for AI-trigger før de flyttes inn i spawn_pg_insert_node + // Fang verdier for AI-trigger før de flyttes inn i enqueue_insert_node let is_content_node = node_kind == "content"; let has_enough_text = content.len() >= 20 || title.len() >= 20; - // -- Spawn async PG-skriving -- - spawn_pg_insert_node( + // -- PG-skriving via jobbkø (retry + dead letter) -- + crate::pg_writes::enqueue_insert_node( state.db.clone(), node_id, node_kind, @@ -613,11 +613,9 @@ pub async fn create_node( "belongs_to-edge opprettet i STDB (kontekst-arv)" ); - // belongs_to er ikke tilgangsgivende — enkel PG-insert - spawn_pg_insert_edge( + // belongs_to er ikke tilgangsgivende — PG-insert via jobbkø + crate::pg_writes::enqueue_insert_edge( state.db.clone(), - state.stdb.clone(), - state.index_cache.clone(), edge_id, node_id, ctx_id, @@ -679,7 +677,7 @@ pub async fn create_node( // -- AI edge-forslag: analyser innholdet for topics og mentions -- // Trigges for content-noder med nok tekst. Lav prioritet (bakgrunnsjobb). - // NB: node_kind, title og content er flyttet inn i spawn_pg_insert_node over, + // NB: node_kind, title og content er flyttet inn i enqueue_insert_node over, // så vi sjekker på kopi av verdiene tatt før move. if is_content_node && has_enough_text { let db_clone = state.db.clone(); @@ -874,12 +872,10 @@ pub async fn create_edge( "Edge opprettet i STDB" ); - // -- Spawn async PG-skriving -- + // -- PG-skriving via jobbkø (retry + dead letter) -- let edge_type = req.edge_type.clone(); - spawn_pg_insert_edge( + crate::pg_writes::enqueue_insert_edge( state.db.clone(), - state.stdb.clone(), - state.index_cache.clone(), edge_id, req.source_id, req.target_id, @@ -1092,8 +1088,8 @@ pub async fn update_node( "Node oppdatert i STDB" ); - // -- Spawn async PG-skriving -- - spawn_pg_update_node( + // -- PG-skriving via jobbkø (retry + dead letter) -- + crate::pg_writes::enqueue_update_node( state.db.clone(), req.node_id, node_kind, @@ -1209,8 +1205,8 @@ pub async fn delete_node( "Node slettet fra STDB" ); - // -- Spawn async PG-sletting -- - spawn_pg_delete_node(state.db.clone(), req.node_id); + // -- PG-sletting via jobbkø (retry + dead letter) -- + crate::pg_writes::enqueue_delete_node(state.db.clone(), req.node_id); Ok(Json(DeleteNodeResponse { deleted: true })) } @@ -1452,9 +1448,8 @@ pub async fn delete_edge( ); // -- Spawn async PG-sletting + publiserings-invalidering -- - spawn_pg_delete_edge( + crate::pg_writes::enqueue_delete_edge( state.db.clone(), - state.index_cache.clone(), req.edge_id, edge_info.source_id, edge_info.target_id, @@ -1464,70 +1459,6 @@ pub async fn delete_edge( Ok(Json(DeleteEdgeResponse { deleted: true })) } -/// Spawner en tokio-task som sletter edgen fra PostgreSQL -/// og invaliderer publiserings-cache ved behov. -fn spawn_pg_delete_edge( - db: PgPool, - index_cache: crate::publishing::IndexCache, - edge_id: Uuid, - _source_id: Uuid, - target_id: Uuid, - edge_type: String, -) { - tokio::spawn(async move { - let result = sqlx::query("DELETE FROM edges WHERE id = $1") - .bind(edge_id) - .execute(&db) - .await; - - match result { - Ok(_) => { - tracing::info!(edge_id = %edge_id, "Edge slettet fra PostgreSQL"); - - // Ved fjerning av belongs_to til publiseringssamling: invalider forside-cache - if edge_type == "belongs_to" { - trigger_index_invalidation_if_publishing(&db, &index_cache, target_id).await; - } - } - Err(e) => { - tracing::error!(edge_id = %edge_id, error = %e, "Kunne ikke slette edge fra PostgreSQL"); - } - } - }); -} - -/// Invaliderer forside-cache (dynamisk modus) eller legger render_index-jobb i køen -/// (statisk modus) når en edge fjernes fra en publiseringssamling. -async fn trigger_index_invalidation_if_publishing( - db: &PgPool, - index_cache: &crate::publishing::IndexCache, - collection_id: Uuid, -) { - match crate::publishing::find_publishing_collection_by_id(db, collection_id).await { - Ok(Some(config)) => { - let index_mode = config.index_mode.as_deref().unwrap_or("dynamic"); - if index_mode == "static" { - let index_payload = serde_json::json!({ - "collection_id": collection_id.to_string(), - }); - match crate::jobs::enqueue(db, "render_index", index_payload, Some(collection_id), 4).await { - Ok(job_id) => { - tracing::info!(job_id = %job_id, collection_id = %collection_id, "render_index-jobb lagt i kø etter avpublisering"); - } - Err(e) => { - tracing::error!(collection_id = %collection_id, error = %e, "Kunne ikke legge render_index-jobb i kø"); - } - } - } else { - crate::publishing::invalidate_index_cache(index_cache, collection_id).await; - } - } - Ok(None) => {} - Err(e) => { - tracing::error!(collection_id = %collection_id, error = %e, "Feil ved sjekk av publiseringssamling for cache-invalidering"); - } - } -} // ============================================================================= // set_slot — Redaksjonell slot-håndtering for publiseringssamlinger @@ -1900,8 +1831,8 @@ pub async fn create_communication( "Kommunikasjonsnode opprettet i STDB" ); - // Spawn PG-skriving for noden - spawn_pg_insert_node( + // PG-skriving via jobbkø + crate::pg_writes::enqueue_insert_node( state.db.clone(), node_id, "communication".to_string(), @@ -1937,11 +1868,9 @@ pub async fn create_communication( .await .map_err(|e| stdb_error("create_edge (owner)", e))?; - // Spawn PG-skriving for owner-edge (med access recompute) - spawn_pg_insert_edge( + // PG-skriving for owner-edge via jobbkø (med access recompute) + crate::pg_writes::enqueue_insert_edge( state.db.clone(), - state.stdb.clone(), - state.index_cache.clone(), owner_edge_id, user.node_id, node_id, @@ -1980,10 +1909,8 @@ pub async fn create_communication( .await .map_err(|e| stdb_error("create_edge (member_of)", e))?; - spawn_pg_insert_edge( + crate::pg_writes::enqueue_insert_edge( state.db.clone(), - state.stdb.clone(), - state.index_cache.clone(), edge_id, *participant_id, node_id, @@ -2031,10 +1958,8 @@ pub async fn create_communication( .await .map_err(|e| stdb_error("create_edge (belongs_to context)", e))?; - spawn_pg_insert_edge( + crate::pg_writes::enqueue_insert_edge( state.db.clone(), - state.stdb.clone(), - state.index_cache.clone(), ctx_edge_id, node_id, context_id, @@ -2219,8 +2144,8 @@ pub async fn upload_media( "Media-node opprettet i STDB" ); - // Spawn async PG-skriving for media-noden - spawn_pg_insert_node( + // PG-skriving for media-noden via jobbkø + crate::pg_writes::enqueue_insert_node( state.db.clone(), media_node_id, "media".to_string(), @@ -2260,11 +2185,9 @@ pub async fn upload_media( "has_media-edge opprettet i STDB" ); - // has_media er ikke tilgangsgivende — enkel PG-insert - spawn_pg_insert_edge( + // has_media er ikke tilgangsgivende — PG-insert via jobbkø + crate::pg_writes::enqueue_insert_edge( state.db.clone(), - state.stdb.clone(), - state.index_cache.clone(), edge_id, src_id, media_node_id, @@ -2543,139 +2466,7 @@ struct NodeKindRow { node_kind: String, } -/// Spawner en tokio-task som skriver noden til PostgreSQL i bakgrunnen. -fn spawn_pg_insert_node( - db: PgPool, - node_id: Uuid, - node_kind: String, - title: String, - content: String, - visibility: String, - metadata: serde_json::Value, - created_by: Uuid, -) { - tokio::spawn(async move { - let result = sqlx::query( - r#" - INSERT INTO nodes (id, node_kind, title, content, visibility, metadata, created_by) - VALUES ($1, $2, NULLIF($3, ''), NULLIF($4, ''), $5::visibility, $6, $7) - "#, - ) - .bind(node_id) - .bind(&node_kind) - .bind(&title) - .bind(&content) - .bind(&visibility) - .bind(&metadata) - .bind(created_by) - .execute(&db) - .await; - match result { - Ok(_) => { - tracing::info!(node_id = %node_id, "Node persistert til PostgreSQL"); - } - Err(e) => { - tracing::error!(node_id = %node_id, error = %e, "Kunne ikke persistere node til PostgreSQL"); - } - } - }); -} - -/// Mapper edge_type til access_level for tilgangsgivende edges. -/// Returnerer None for edges som ikke gir tilgang. -fn edge_type_to_access_level(edge_type: &str) -> Option<&'static str> { - match edge_type { - "owner" => Some("owner"), - "admin" => Some("admin"), - "member_of" => Some("member"), - "reader" => Some("reader"), - _ => None, - } -} - -/// Spawner en tokio-task som skriver edgen til PostgreSQL i bakgrunnen. -/// For tilgangsgivende edges (owner, admin, member_of, reader) kalles -/// recompute_access i samme transaksjon — ingen vindu med stale tilgang. -/// Synker også node_access til STDB for visibility-filtrering i frontend. -fn spawn_pg_insert_edge( - db: PgPool, - stdb: crate::stdb::StdbClient, - index_cache: crate::publishing::IndexCache, - edge_id: Uuid, - source_id: Uuid, - target_id: Uuid, - edge_type: String, - metadata: serde_json::Value, - system: bool, - created_by: Uuid, -) { - tokio::spawn(async move { - let access_level = edge_type_to_access_level(&edge_type); - - if let Some(level) = access_level { - // Tilgangsgivende edge: wrap i transaksjon med recompute_access - let result = insert_edge_with_access(&db, edge_id, source_id, target_id, &edge_type, &metadata, system, created_by, level).await; - match result { - Ok(_) => { - tracing::info!( - edge_id = %edge_id, - edge_type = %edge_type, - access_level = %level, - "Edge + node_access persistert til PostgreSQL" - ); - - // Synk oppdatert node_access til STDB - sync_node_access_to_stdb(&db, &stdb, source_id).await; - } - Err(e) => { - tracing::error!( - edge_id = %edge_id, - error = %e, - "Kunne ikke persistere edge + node_access til PostgreSQL" - ); - } - } - } else { - // Vanlig edge uten tilgangspåvirkning - let result = sqlx::query( - r#" - INSERT INTO edges (id, source_id, target_id, edge_type, metadata, system, created_by) - VALUES ($1, $2, $3, $4, $5, $6, $7) - "#, - ) - .bind(edge_id) - .bind(source_id) - .bind(target_id) - .bind(&edge_type) - .bind(&metadata) - .bind(system) - .bind(created_by) - .execute(&db) - .await; - - match result { - Ok(_) => { - tracing::info!(edge_id = %edge_id, "Edge persistert til PostgreSQL"); - - // Trigger artikkelrendering ved belongs_to til publiseringssamling - if edge_type == "belongs_to" { - trigger_render_if_publishing(&db, &index_cache, source_id, target_id).await; - } - - // Sjekk om dette er en presentasjonselement-edge og start A/B-test - // hvis det finnes >1 variant av samme type (oppgave 14.17) - if matches!(edge_type.as_str(), "title" | "subtitle" | "summary" | "og_image") { - crate::publishing::maybe_start_ab_test(&db, target_id, &edge_type).await; - } - } - Err(e) => { - tracing::error!(edge_id = %edge_id, error = %e, "Kunne ikke persistere edge til PostgreSQL"); - } - } - } - }); -} /// Sjekker om target er en samling med publishing-trait, og legger i så fall /// en `render_article`-jobb i køen. For statisk modus legges også en @@ -2755,163 +2546,6 @@ async fn trigger_render_if_publishing( } } -/// Synkroniserer node_access-rader for et subject fra PG til STDB. -/// Kalles etter recompute_access for å holde STDB i synk. -async fn sync_node_access_to_stdb(db: &PgPool, stdb: &crate::stdb::StdbClient, subject_id: Uuid) { - let rows = sqlx::query_as::<_, NodeAccessRow>( - "SELECT subject_id, object_id, access::text as access, \ - COALESCE(via_edge::text, '') as via_edge \ - FROM node_access WHERE subject_id = $1", - ) - .bind(subject_id) - .fetch_all(db) - .await; - - match rows { - Ok(rows) => { - for row in &rows { - if let Err(e) = stdb - .upsert_node_access( - &row.subject_id.to_string(), - &row.object_id.to_string(), - &row.access, - &row.via_edge, - ) - .await - { - tracing::error!( - subject_id = %row.subject_id, - object_id = %row.object_id, - error = %e, - "Kunne ikke synke node_access til STDB" - ); - } - } - tracing::info!( - subject_id = %subject_id, - count = rows.len(), - "node_access synket til STDB" - ); - } - Err(e) => { - tracing::error!(subject_id = %subject_id, error = %e, "Kunne ikke hente node_access fra PG"); - } - } -} - -#[derive(sqlx::FromRow)] -struct NodeAccessRow { - subject_id: Uuid, - object_id: Uuid, - access: String, - via_edge: String, -} - -/// Inserter en tilgangsgivende edge og oppdaterer node_access i én transaksjon. -/// source_id = subject (bruker/team), target_id = object (noden det gis tilgang til). -async fn insert_edge_with_access( - db: &PgPool, - edge_id: Uuid, - source_id: Uuid, - target_id: Uuid, - edge_type: &str, - metadata: &serde_json::Value, - system: bool, - created_by: Uuid, - access_level: &str, -) -> Result<(), sqlx::Error> { - let mut tx = db.begin().await?; - - sqlx::query( - r#" - INSERT INTO edges (id, source_id, target_id, edge_type, metadata, system, created_by) - VALUES ($1, $2, $3, $4, $5, $6, $7) - "#, - ) - .bind(edge_id) - .bind(source_id) - .bind(target_id) - .bind(edge_type) - .bind(metadata) - .bind(system) - .bind(created_by) - .execute(&mut *tx) - .await?; - - // Kall recompute_access: subject=source_id, object=target_id - sqlx::query( - "SELECT recompute_access($1, $2, $3::access_level, $4)", - ) - .bind(source_id) - .bind(target_id) - .bind(access_level) - .bind(edge_id) - .execute(&mut *tx) - .await?; - - tx.commit().await?; - - Ok(()) -} - -/// Spawner en tokio-task som oppdaterer noden i PostgreSQL. -fn spawn_pg_update_node( - db: PgPool, - node_id: Uuid, - node_kind: String, - title: String, - content: String, - visibility: String, - metadata: serde_json::Value, -) { - tokio::spawn(async move { - let result = sqlx::query( - r#" - UPDATE nodes - SET node_kind = $2, title = NULLIF($3, ''), content = NULLIF($4, ''), - visibility = $5::visibility, metadata = $6 - WHERE id = $1 - "#, - ) - .bind(node_id) - .bind(&node_kind) - .bind(&title) - .bind(&content) - .bind(&visibility) - .bind(&metadata) - .execute(&db) - .await; - - match result { - Ok(_) => { - tracing::info!(node_id = %node_id, "Node oppdatert i PostgreSQL"); - } - Err(e) => { - tracing::error!(node_id = %node_id, error = %e, "Kunne ikke oppdatere node i PostgreSQL"); - } - } - }); -} - -/// Spawner en tokio-task som sletter noden fra PostgreSQL. -/// Edges slettes automatisk via ON DELETE CASCADE. -fn spawn_pg_delete_node(db: PgPool, node_id: Uuid) { - tokio::spawn(async move { - let result = sqlx::query("DELETE FROM nodes WHERE id = $1") - .bind(node_id) - .execute(&db) - .await; - - match result { - Ok(_) => { - tracing::info!(node_id = %node_id, "Node slettet fra PostgreSQL"); - } - Err(e) => { - tracing::error!(node_id = %node_id, error = %e, "Kunne ikke slette node fra PostgreSQL"); - } - } - }); -} // ============================================================================= // POST /intentions/update_segment — rediger transkripsjons-segment @@ -4385,8 +4019,8 @@ pub async fn create_announcement( "Systemvarsel opprettet i STDB" ); - // -- Persister til PostgreSQL asynkront -- - spawn_pg_insert_node( + // -- Persister til PostgreSQL via jobbkø -- + crate::pg_writes::enqueue_insert_node( state.db.clone(), node_id, "system_announcement".to_string(), diff --git a/maskinrommet/src/jobs.rs b/maskinrommet/src/jobs.rs index a021ecf..8b489b6 100644 --- a/maskinrommet/src/jobs.rs +++ b/maskinrommet/src/jobs.rs @@ -21,6 +21,8 @@ use crate::audio; use crate::cas::CasStore; use crate::cli_dispatch; use crate::maintenance::MaintenanceState; +use crate::pg_writes; +use crate::publishing::IndexCache; use crate::resources::{self, PriorityRules}; use crate::stdb::StdbClient; use crate::summarize; @@ -170,6 +172,7 @@ async fn dispatch( db: &PgPool, stdb: &StdbClient, cas: &CasStore, + index_cache: &IndexCache, whisper_url: &str, ) -> Result { match job.job_type.as_str() { @@ -202,6 +205,22 @@ async fn dispatch( "render_index" => { handle_render_index(job, cas).await } + // PG-skriveoperasjoner (oppgave 12.3): retry med backoff + dead letter queue + "pg_insert_node" => { + pg_writes::handle_insert_node(job, db).await + } + "pg_insert_edge" => { + pg_writes::handle_insert_edge(job, db, stdb, index_cache).await + } + "pg_update_node" => { + pg_writes::handle_update_node(job, db).await + } + "pg_delete_node" => { + pg_writes::handle_delete_node(job, db).await + } + "pg_delete_edge" => { + pg_writes::handle_delete_edge(job, db, index_cache).await + } other => Err(format!("Ukjent jobbtype: {other}")), } } @@ -452,6 +471,7 @@ pub fn start_worker( db: PgPool, stdb: StdbClient, cas: CasStore, + index_cache: IndexCache, maintenance: MaintenanceState, priority_rules: PriorityRules, ) { @@ -593,6 +613,7 @@ pub fn start_worker( let db2 = db.clone(); let stdb2 = stdb.clone(); let cas2 = cas.clone(); + let index_cache2 = index_cache.clone(); let whisper_url2 = whisper_url.clone(); let timeout_secs = if rule.timeout_seconds > 0 { rule.timeout_seconds as u64 @@ -606,7 +627,7 @@ pub fn start_worker( let result = tokio::time::timeout( std::time::Duration::from_secs(timeout_secs), - dispatch(&job, &db2, &stdb2, &cas2, &whisper_url2), + dispatch(&job, &db2, &stdb2, &cas2, &index_cache2, &whisper_url2), ) .await; diff --git a/maskinrommet/src/main.rs b/maskinrommet/src/main.rs index 9fa189d..3564026 100644 --- a/maskinrommet/src/main.rs +++ b/maskinrommet/src/main.rs @@ -15,6 +15,7 @@ pub mod maintenance; pub mod metrics; pub mod pruning; mod queries; +pub mod pg_writes; pub mod publishing; pub mod health; pub mod resource_usage; @@ -166,8 +167,10 @@ async fn main() { .await .expect("Kunne ikke laste prioritetsregler fra PG"); + let index_cache = publishing::new_index_cache(); + // Start jobbkø-worker i bakgrunnen (med ressursstyring, oppgave 15.5) - jobs::start_worker(db.clone(), stdb.clone(), cas.clone(), maintenance.clone(), priority_rules.clone()); + jobs::start_worker(db.clone(), stdb.clone(), cas.clone(), index_cache.clone(), maintenance.clone(), priority_rules.clone()); // Start periodisk CAS-pruning i bakgrunnen pruning::start_pruning_loop(db.clone(), cas.clone()); @@ -189,8 +192,6 @@ async fn main() { // Start periodisk CAS tmp-opprydding (oppgave 17.6) cas::start_tmp_cleanup_loop(cas.clone()); - - let index_cache = publishing::new_index_cache(); let dynamic_page_cache = publishing::new_dynamic_page_cache(); let metrics = metrics::MetricsCollector::new(); let state = AppState { db, jwks, stdb, cas, index_cache, dynamic_page_cache, maintenance, priority_rules, metrics }; diff --git a/maskinrommet/src/pg_writes.rs b/maskinrommet/src/pg_writes.rs new file mode 100644 index 0000000..4d0003f --- /dev/null +++ b/maskinrommet/src/pg_writes.rs @@ -0,0 +1,509 @@ +// pg_writes — Jobbkø-handlere for PG-skriveoperasjoner. +// +// Erstatter fire-and-forget `tokio::spawn()` med retry via jobbkøen. +// Hver skriveoperasjon (insert/update/delete for nodes og edges) er en +// egen jobbtype som behandles av den eksisterende worker-loopen med +// eksponentiell backoff (30s × 2^n) og dead letter queue (status='error' +// etter max_attempts). +// +// Jobbtyper: +// pg_insert_node, pg_insert_edge, pg_update_node, +// pg_delete_node, pg_delete_edge +// +// Ref: docs/infra/jobbkø.md, oppgave 12.3 + +use serde_json::json; +use sqlx::PgPool; +use uuid::Uuid; + +use crate::jobs::JobRow; +use crate::publishing::IndexCache; +use crate::stdb::StdbClient; + +/// Prioritet for PG-skriveoperasjoner. Høy — data-konsistens er kritisk. +const PG_WRITE_PRIORITY: i16 = 8; + +// ============================================================================= +// Enqueue-funksjoner (erstatter spawn_pg_*) +// ============================================================================= + +/// Legger en insert_node-operasjon i jobbkøen. +pub fn enqueue_insert_node( + db: PgPool, + node_id: Uuid, + node_kind: String, + title: String, + content: String, + visibility: String, + metadata: serde_json::Value, + created_by: Uuid, +) { + let payload = json!({ + "node_id": node_id, + "node_kind": node_kind, + "title": title, + "content": content, + "visibility": visibility, + "metadata": metadata, + "created_by": created_by, + }); + tokio::spawn(async move { + if let Err(e) = crate::jobs::enqueue(&db, "pg_insert_node", payload, None, PG_WRITE_PRIORITY).await { + tracing::error!(node_id = %node_id, error = %e, "Kunne ikke legge pg_insert_node i jobbkø"); + } + }); +} + +/// Legger en insert_edge-operasjon i jobbkøen. +pub fn enqueue_insert_edge( + db: PgPool, + edge_id: Uuid, + source_id: Uuid, + target_id: Uuid, + edge_type: String, + metadata: serde_json::Value, + system: bool, + created_by: Uuid, +) { + let payload = json!({ + "edge_id": edge_id, + "source_id": source_id, + "target_id": target_id, + "edge_type": edge_type, + "metadata": metadata, + "system": system, + "created_by": created_by, + }); + tokio::spawn(async move { + if let Err(e) = crate::jobs::enqueue(&db, "pg_insert_edge", payload, None, PG_WRITE_PRIORITY).await { + tracing::error!(edge_id = %edge_id, error = %e, "Kunne ikke legge pg_insert_edge i jobbkø"); + } + }); +} + +/// Legger en update_node-operasjon i jobbkøen. +pub fn enqueue_update_node( + db: PgPool, + node_id: Uuid, + node_kind: String, + title: String, + content: String, + visibility: String, + metadata: serde_json::Value, +) { + let payload = json!({ + "node_id": node_id, + "node_kind": node_kind, + "title": title, + "content": content, + "visibility": visibility, + "metadata": metadata, + }); + tokio::spawn(async move { + if let Err(e) = crate::jobs::enqueue(&db, "pg_update_node", payload, None, PG_WRITE_PRIORITY).await { + tracing::error!(node_id = %node_id, error = %e, "Kunne ikke legge pg_update_node i jobbkø"); + } + }); +} + +/// Legger en delete_node-operasjon i jobbkøen. +pub fn enqueue_delete_node(db: PgPool, node_id: Uuid) { + let payload = json!({ "node_id": node_id }); + tokio::spawn(async move { + if let Err(e) = crate::jobs::enqueue(&db, "pg_delete_node", payload, None, PG_WRITE_PRIORITY).await { + tracing::error!(node_id = %node_id, error = %e, "Kunne ikke legge pg_delete_node i jobbkø"); + } + }); +} + +/// Legger en delete_edge-operasjon i jobbkøen. +pub fn enqueue_delete_edge( + db: PgPool, + edge_id: Uuid, + source_id: Uuid, + target_id: Uuid, + edge_type: String, +) { + let payload = json!({ + "edge_id": edge_id, + "source_id": source_id, + "target_id": target_id, + "edge_type": edge_type, + }); + tokio::spawn(async move { + if let Err(e) = crate::jobs::enqueue(&db, "pg_delete_edge", payload, None, PG_WRITE_PRIORITY).await { + tracing::error!(edge_id = %edge_id, error = %e, "Kunne ikke legge pg_delete_edge i jobbkø"); + } + }); +} + +// ============================================================================= +// Job-handlere (kalles fra dispatch i jobs.rs) +// ============================================================================= + +/// Helpers for å parse UUID fra payload. +fn uuid_from_payload(payload: &serde_json::Value, key: &str) -> Result { + payload + .get(key) + .and_then(|v| v.as_str()) + .and_then(|s| s.parse().ok()) + .ok_or_else(|| format!("Mangler eller ugyldig {key} i payload")) +} + +fn string_from_payload(payload: &serde_json::Value, key: &str) -> String { + payload.get(key).and_then(|v| v.as_str()).unwrap_or("").to_string() +} + +/// Handler: pg_insert_node +pub async fn handle_insert_node( + job: &JobRow, + db: &PgPool, +) -> Result { + let p = &job.payload; + let node_id = uuid_from_payload(p, "node_id")?; + let node_kind = string_from_payload(p, "node_kind"); + let title = string_from_payload(p, "title"); + let content = string_from_payload(p, "content"); + let visibility = string_from_payload(p, "visibility"); + let metadata = p.get("metadata").cloned().unwrap_or(serde_json::json!({})); + let created_by = uuid_from_payload(p, "created_by")?; + + sqlx::query( + r#" + INSERT INTO nodes (id, node_kind, title, content, visibility, metadata, created_by) + VALUES ($1, $2, NULLIF($3, ''), NULLIF($4, ''), $5::visibility, $6, $7) + "#, + ) + .bind(node_id) + .bind(&node_kind) + .bind(&title) + .bind(&content) + .bind(&visibility) + .bind(&metadata) + .bind(created_by) + .execute(db) + .await + .map_err(|e| format!("PG insert node {node_id}: {e}"))?; + + tracing::info!(node_id = %node_id, "Node persistert til PostgreSQL (via jobbkø)"); + Ok(json!({ "node_id": node_id.to_string(), "op": "insert_node" })) +} + +/// Mapper edge_type til access_level for tilgangsgivende edges. +fn edge_type_to_access_level(edge_type: &str) -> Option<&'static str> { + match edge_type { + "owner" => Some("owner"), + "admin" => Some("admin"), + "member_of" => Some("member"), + "reader" => Some("reader"), + _ => None, + } +} + +/// Handler: pg_insert_edge +/// +/// Håndterer tilgangsgivende edges (owner/admin/member_of/reader) med +/// recompute_access i transaksjon, og synker til STDB. For belongs_to-edges +/// trigges artikkelrendering hvis target er en publiseringssamling. +pub async fn handle_insert_edge( + job: &JobRow, + db: &PgPool, + stdb: &StdbClient, + index_cache: &IndexCache, +) -> Result { + let p = &job.payload; + let edge_id = uuid_from_payload(p, "edge_id")?; + let source_id = uuid_from_payload(p, "source_id")?; + let target_id = uuid_from_payload(p, "target_id")?; + let edge_type = string_from_payload(p, "edge_type"); + let metadata = p.get("metadata").cloned().unwrap_or(serde_json::json!({})); + let system = p.get("system").and_then(|v| v.as_bool()).unwrap_or(false); + let created_by = uuid_from_payload(p, "created_by")?; + + let access_level = edge_type_to_access_level(&edge_type); + + if let Some(level) = access_level { + // Tilgangsgivende edge: transaksjon med recompute_access + let mut tx = db.begin().await.map_err(|e| format!("PG begin: {e}"))?; + + sqlx::query( + r#" + INSERT INTO edges (id, source_id, target_id, edge_type, metadata, system, created_by) + VALUES ($1, $2, $3, $4, $5, $6, $7) + "#, + ) + .bind(edge_id) + .bind(source_id) + .bind(target_id) + .bind(&edge_type) + .bind(&metadata) + .bind(system) + .bind(created_by) + .execute(&mut *tx) + .await + .map_err(|e| format!("PG insert edge {edge_id}: {e}"))?; + + sqlx::query("SELECT recompute_access($1, $2, $3::access_level, $4)") + .bind(source_id) + .bind(target_id) + .bind(level) + .bind(edge_id) + .execute(&mut *tx) + .await + .map_err(|e| format!("recompute_access: {e}"))?; + + tx.commit().await.map_err(|e| format!("PG commit: {e}"))?; + + tracing::info!( + edge_id = %edge_id, + edge_type = %edge_type, + access_level = %level, + "Edge + node_access persistert til PostgreSQL (via jobbkø)" + ); + + // Synk node_access til STDB (best-effort, feil logger men feiler ikke jobben) + sync_node_access_to_stdb(db, stdb, source_id).await; + } else { + // Vanlig edge + sqlx::query( + r#" + INSERT INTO edges (id, source_id, target_id, edge_type, metadata, system, created_by) + VALUES ($1, $2, $3, $4, $5, $6, $7) + "#, + ) + .bind(edge_id) + .bind(source_id) + .bind(target_id) + .bind(&edge_type) + .bind(&metadata) + .bind(system) + .bind(created_by) + .execute(db) + .await + .map_err(|e| format!("PG insert edge {edge_id}: {e}"))?; + + tracing::info!(edge_id = %edge_id, "Edge persistert til PostgreSQL (via jobbkø)"); + + // Trigger rendering ved belongs_to + if edge_type == "belongs_to" { + trigger_render_if_publishing(db, index_cache, source_id, target_id).await; + } + + // A/B-test for presentasjonselement-edges + if matches!(edge_type.as_str(), "title" | "subtitle" | "summary" | "og_image") { + crate::publishing::maybe_start_ab_test(db, target_id, &edge_type).await; + } + } + + Ok(json!({ "edge_id": edge_id.to_string(), "op": "insert_edge" })) +} + +/// Handler: pg_update_node +pub async fn handle_update_node( + job: &JobRow, + db: &PgPool, +) -> Result { + let p = &job.payload; + let node_id = uuid_from_payload(p, "node_id")?; + let node_kind = string_from_payload(p, "node_kind"); + let title = string_from_payload(p, "title"); + let content = string_from_payload(p, "content"); + let visibility = string_from_payload(p, "visibility"); + let metadata = p.get("metadata").cloned().unwrap_or(serde_json::json!({})); + + sqlx::query( + r#" + UPDATE nodes + SET node_kind = $2, title = NULLIF($3, ''), content = NULLIF($4, ''), + visibility = $5::visibility, metadata = $6 + WHERE id = $1 + "#, + ) + .bind(node_id) + .bind(&node_kind) + .bind(&title) + .bind(&content) + .bind(&visibility) + .bind(&metadata) + .execute(db) + .await + .map_err(|e| format!("PG update node {node_id}: {e}"))?; + + tracing::info!(node_id = %node_id, "Node oppdatert i PostgreSQL (via jobbkø)"); + Ok(json!({ "node_id": node_id.to_string(), "op": "update_node" })) +} + +/// Handler: pg_delete_node +pub async fn handle_delete_node( + job: &JobRow, + db: &PgPool, +) -> Result { + let p = &job.payload; + let node_id = uuid_from_payload(p, "node_id")?; + + sqlx::query("DELETE FROM nodes WHERE id = $1") + .bind(node_id) + .execute(db) + .await + .map_err(|e| format!("PG delete node {node_id}: {e}"))?; + + tracing::info!(node_id = %node_id, "Node slettet fra PostgreSQL (via jobbkø)"); + Ok(json!({ "node_id": node_id.to_string(), "op": "delete_node" })) +} + +/// Handler: pg_delete_edge +pub async fn handle_delete_edge( + job: &JobRow, + db: &PgPool, + index_cache: &IndexCache, +) -> Result { + let p = &job.payload; + let edge_id = uuid_from_payload(p, "edge_id")?; + let target_id = uuid_from_payload(p, "target_id")?; + let edge_type = string_from_payload(p, "edge_type"); + + sqlx::query("DELETE FROM edges WHERE id = $1") + .bind(edge_id) + .execute(db) + .await + .map_err(|e| format!("PG delete edge {edge_id}: {e}"))?; + + tracing::info!(edge_id = %edge_id, "Edge slettet fra PostgreSQL (via jobbkø)"); + + // Invalider publiserings-cache ved fjerning av belongs_to + if edge_type == "belongs_to" { + trigger_index_invalidation_if_publishing(db, index_cache, target_id).await; + } + + Ok(json!({ "edge_id": edge_id.to_string(), "op": "delete_edge" })) +} + +// ============================================================================= +// Hjelpefunksjoner (flyttet fra intentions.rs for gjenbruk) +// ============================================================================= + +/// Synkroniserer node_access-rader for et subject fra PG til STDB. +async fn sync_node_access_to_stdb(db: &PgPool, stdb: &StdbClient, subject_id: Uuid) { + #[derive(sqlx::FromRow)] + struct NodeAccessRow { + subject_id: Uuid, + object_id: Uuid, + access: String, + via_edge: String, + } + + let rows = sqlx::query_as::<_, NodeAccessRow>( + "SELECT subject_id, object_id, access::text as access, \ + COALESCE(via_edge::text, '') as via_edge \ + FROM node_access WHERE subject_id = $1", + ) + .bind(subject_id) + .fetch_all(db) + .await; + + match rows { + Ok(rows) => { + for row in &rows { + if let Err(e) = stdb + .upsert_node_access( + &row.subject_id.to_string(), + &row.object_id.to_string(), + &row.access, + &row.via_edge, + ) + .await + { + tracing::error!( + subject_id = %row.subject_id, + object_id = %row.object_id, + error = %e, + "Kunne ikke synke node_access til STDB (pg_writes)" + ); + } + } + tracing::info!( + subject_id = %subject_id, + count = rows.len(), + "node_access synket til STDB (via jobbkø)" + ); + } + Err(e) => { + tracing::error!(subject_id = %subject_id, error = %e, "Kunne ikke hente node_access fra PG"); + } + } +} + +/// Trigger artikkelrendering hvis target er en publiseringssamling. +async fn trigger_render_if_publishing( + db: &PgPool, + index_cache: &IndexCache, + source_id: Uuid, + target_id: Uuid, +) { + match crate::publishing::find_publishing_collection_by_id(db, target_id).await { + Ok(Some(config)) => { + let article_payload = serde_json::json!({ + "node_id": source_id.to_string(), + "collection_id": target_id.to_string(), + }); + + match crate::jobs::enqueue(db, "render_article", article_payload, Some(target_id), 5).await { + Ok(job_id) => { + tracing::info!(job_id = %job_id, node_id = %source_id, collection_id = %target_id, "render_article-jobb lagt i kø"); + } + Err(e) => { + tracing::error!(node_id = %source_id, collection_id = %target_id, error = %e, "Kunne ikke legge render_article-jobb i kø"); + } + } + + let index_mode = config.index_mode.as_deref().unwrap_or("dynamic"); + if index_mode == "static" { + let index_payload = serde_json::json!({ "collection_id": target_id.to_string() }); + match crate::jobs::enqueue(db, "render_index", index_payload, Some(target_id), 4).await { + Ok(job_id) => { + tracing::info!(job_id = %job_id, collection_id = %target_id, "render_index-jobb lagt i kø (statisk modus)"); + } + Err(e) => { + tracing::error!(collection_id = %target_id, error = %e, "Kunne ikke legge render_index-jobb i kø"); + } + } + } else { + crate::publishing::invalidate_index_cache(index_cache, target_id).await; + } + } + Ok(None) => {} + Err(e) => { + tracing::error!(target_id = %target_id, error = %e, "Feil ved sjekk av publiseringssamling for rendering-trigger"); + } + } +} + +/// Invaliderer forside-cache ved fjerning av belongs_to fra publiseringssamling. +async fn trigger_index_invalidation_if_publishing( + db: &PgPool, + index_cache: &IndexCache, + collection_id: Uuid, +) { + match crate::publishing::find_publishing_collection_by_id(db, collection_id).await { + Ok(Some(config)) => { + let index_mode = config.index_mode.as_deref().unwrap_or("dynamic"); + if index_mode == "static" { + let index_payload = serde_json::json!({ "collection_id": collection_id.to_string() }); + match crate::jobs::enqueue(db, "render_index", index_payload, Some(collection_id), 4).await { + Ok(job_id) => { + tracing::info!(job_id = %job_id, collection_id = %collection_id, "render_index-jobb lagt i kø etter avpublisering"); + } + Err(e) => { + tracing::error!(collection_id = %collection_id, error = %e, "Kunne ikke legge render_index-jobb i kø"); + } + } + } else { + crate::publishing::invalidate_index_cache(index_cache, collection_id).await; + } + } + Ok(None) => {} + Err(e) => { + tracing::error!(collection_id = %collection_id, error = %e, "Feil ved sjekk av publiseringssamling for cache-invalidering"); + } + } +} diff --git a/tasks.md b/tasks.md index 51a758f..1de7e8a 100644 --- a/tasks.md +++ b/tasks.md @@ -269,8 +269,7 @@ kaller dem direkte. Samme verktøy, to brukere. - [x] 12.1 Observerbarhet: strukturert logging, metrikker (request latency, queue depth, AI cost). - [x] 12.2 Backup: PG-dump rutine, STDB → PG gjenoppbygging ved krasj. -- [~] 12.3 Feilhåndtering: retry med backoff i skrivestien, dead letter queue for feilede PG-skrivinger. - > Påbegynt: 2026-03-18T11:16 +- [x] 12.3 Feilhåndtering: retry med backoff i skrivestien, dead letter queue for feilede PG-skrivinger. - [ ] 12.4 Ytelse: profiler PG-spørringer, optimaliser node_access-oppdatering. ## Fase 22: SpacetimeDB-migrering — PG LISTEN/NOTIFY