// pg_writes — Jobbkø-handlere for PG-skriveoperasjoner. // // Erstatter fire-and-forget `tokio::spawn()` med retry via jobbkøen. // Hver skriveoperasjon (insert/update/delete for nodes og edges) er en // egen jobbtype som behandles av den eksisterende worker-loopen med // eksponentiell backoff (30s × 2^n) og dead letter queue (status='error' // etter max_attempts). // // Jobbtyper: // pg_insert_node, pg_insert_edge, pg_update_node, // pg_delete_node, pg_delete_edge // // Ref: docs/infra/jobbkø.md, oppgave 12.3 use serde_json::json; use sqlx::PgPool; use uuid::Uuid; use crate::jobs::JobRow; use crate::publishing::IndexCache; use crate::stdb::StdbClient; /// Prioritet for PG-skriveoperasjoner. Høy — data-konsistens er kritisk. const PG_WRITE_PRIORITY: i16 = 8; // ============================================================================= // Enqueue-funksjoner (erstatter spawn_pg_*) // ============================================================================= /// Legger en insert_node-operasjon i jobbkøen. pub fn enqueue_insert_node( db: PgPool, node_id: Uuid, node_kind: String, title: String, content: String, visibility: String, metadata: serde_json::Value, created_by: Uuid, ) { let payload = json!({ "node_id": node_id, "node_kind": node_kind, "title": title, "content": content, "visibility": visibility, "metadata": metadata, "created_by": created_by, }); tokio::spawn(async move { if let Err(e) = crate::jobs::enqueue(&db, "pg_insert_node", payload, None, PG_WRITE_PRIORITY).await { tracing::error!(node_id = %node_id, error = %e, "Kunne ikke legge pg_insert_node i jobbkø"); } }); } /// Legger en insert_edge-operasjon i jobbkøen. pub fn enqueue_insert_edge( db: PgPool, edge_id: Uuid, source_id: Uuid, target_id: Uuid, edge_type: String, metadata: serde_json::Value, system: bool, created_by: Uuid, ) { let payload = json!({ "edge_id": edge_id, "source_id": source_id, "target_id": target_id, "edge_type": edge_type, "metadata": metadata, "system": system, "created_by": created_by, }); tokio::spawn(async move { if let Err(e) = crate::jobs::enqueue(&db, "pg_insert_edge", payload, None, PG_WRITE_PRIORITY).await { tracing::error!(edge_id = %edge_id, error = %e, "Kunne ikke legge pg_insert_edge i jobbkø"); } }); } /// Legger en update_node-operasjon i jobbkøen. pub fn enqueue_update_node( db: PgPool, node_id: Uuid, node_kind: String, title: String, content: String, visibility: String, metadata: serde_json::Value, ) { let payload = json!({ "node_id": node_id, "node_kind": node_kind, "title": title, "content": content, "visibility": visibility, "metadata": metadata, }); tokio::spawn(async move { if let Err(e) = crate::jobs::enqueue(&db, "pg_update_node", payload, None, PG_WRITE_PRIORITY).await { tracing::error!(node_id = %node_id, error = %e, "Kunne ikke legge pg_update_node i jobbkø"); } }); } /// Legger en delete_node-operasjon i jobbkøen. pub fn enqueue_delete_node(db: PgPool, node_id: Uuid) { let payload = json!({ "node_id": node_id }); tokio::spawn(async move { if let Err(e) = crate::jobs::enqueue(&db, "pg_delete_node", payload, None, PG_WRITE_PRIORITY).await { tracing::error!(node_id = %node_id, error = %e, "Kunne ikke legge pg_delete_node i jobbkø"); } }); } /// Legger en delete_edge-operasjon i jobbkøen. pub fn enqueue_delete_edge( db: PgPool, edge_id: Uuid, source_id: Uuid, target_id: Uuid, edge_type: String, ) { let payload = json!({ "edge_id": edge_id, "source_id": source_id, "target_id": target_id, "edge_type": edge_type, }); tokio::spawn(async move { if let Err(e) = crate::jobs::enqueue(&db, "pg_delete_edge", payload, None, PG_WRITE_PRIORITY).await { tracing::error!(edge_id = %edge_id, error = %e, "Kunne ikke legge pg_delete_edge i jobbkø"); } }); } // ============================================================================= // Job-handlere (kalles fra dispatch i jobs.rs) // ============================================================================= /// Helpers for å parse UUID fra payload. fn uuid_from_payload(payload: &serde_json::Value, key: &str) -> Result { payload .get(key) .and_then(|v| v.as_str()) .and_then(|s| s.parse().ok()) .ok_or_else(|| format!("Mangler eller ugyldig {key} i payload")) } fn string_from_payload(payload: &serde_json::Value, key: &str) -> String { payload.get(key).and_then(|v| v.as_str()).unwrap_or("").to_string() } /// Handler: pg_insert_node pub async fn handle_insert_node( job: &JobRow, db: &PgPool, ) -> Result { let p = &job.payload; let node_id = uuid_from_payload(p, "node_id")?; let node_kind = string_from_payload(p, "node_kind"); let title = string_from_payload(p, "title"); let content = string_from_payload(p, "content"); let visibility = string_from_payload(p, "visibility"); let metadata = p.get("metadata").cloned().unwrap_or(serde_json::json!({})); let created_by = uuid_from_payload(p, "created_by")?; sqlx::query( r#" INSERT INTO nodes (id, node_kind, title, content, visibility, metadata, created_by) VALUES ($1, $2, NULLIF($3, ''), NULLIF($4, ''), $5::visibility, $6, $7) "#, ) .bind(node_id) .bind(&node_kind) .bind(&title) .bind(&content) .bind(&visibility) .bind(&metadata) .bind(created_by) .execute(db) .await .map_err(|e| format!("PG insert node {node_id}: {e}"))?; tracing::info!(node_id = %node_id, "Node persistert til PostgreSQL (via jobbkø)"); Ok(json!({ "node_id": node_id.to_string(), "op": "insert_node" })) } /// Mapper edge_type til access_level for tilgangsgivende edges. fn edge_type_to_access_level(edge_type: &str) -> Option<&'static str> { match edge_type { "owner" => Some("owner"), "admin" => Some("admin"), "member_of" => Some("member"), "reader" => Some("reader"), _ => None, } } /// Handler: pg_insert_edge /// /// Håndterer tilgangsgivende edges (owner/admin/member_of/reader) med /// recompute_access i transaksjon, og synker til STDB. For belongs_to-edges /// trigges artikkelrendering hvis target er en publiseringssamling. pub async fn handle_insert_edge( job: &JobRow, db: &PgPool, stdb: &StdbClient, index_cache: &IndexCache, ) -> Result { let p = &job.payload; let edge_id = uuid_from_payload(p, "edge_id")?; let source_id = uuid_from_payload(p, "source_id")?; let target_id = uuid_from_payload(p, "target_id")?; let edge_type = string_from_payload(p, "edge_type"); let metadata = p.get("metadata").cloned().unwrap_or(serde_json::json!({})); let system = p.get("system").and_then(|v| v.as_bool()).unwrap_or(false); let created_by = uuid_from_payload(p, "created_by")?; let access_level = edge_type_to_access_level(&edge_type); if let Some(level) = access_level { // Tilgangsgivende edge: transaksjon med recompute_access let recompute_start = std::time::Instant::now(); let mut tx = db.begin().await.map_err(|e| format!("PG begin: {e}"))?; sqlx::query( r#" INSERT INTO edges (id, source_id, target_id, edge_type, metadata, system, created_by) VALUES ($1, $2, $3, $4, $5, $6, $7) "#, ) .bind(edge_id) .bind(source_id) .bind(target_id) .bind(&edge_type) .bind(&metadata) .bind(system) .bind(created_by) .execute(&mut *tx) .await .map_err(|e| format!("PG insert edge {edge_id}: {e}"))?; sqlx::query("SELECT recompute_access($1, $2, $3::access_level, $4)") .bind(source_id) .bind(target_id) .bind(level) .bind(edge_id) .execute(&mut *tx) .await .map_err(|e| format!("recompute_access: {e}"))?; tx.commit().await.map_err(|e| format!("PG commit: {e}"))?; let recompute_ms = recompute_start.elapsed().as_secs_f64() * 1000.0; tracing::info!( edge_id = %edge_id, edge_type = %edge_type, access_level = %level, recompute_ms = recompute_ms, "Edge + node_access persistert til PostgreSQL (via jobbkø)" ); if recompute_ms >= 100.0 { tracing::warn!( edge_id = %edge_id, duration_ms = recompute_ms, "slow_recompute_access" ); } // Synk node_access til STDB (best-effort, feil logger men feiler ikke jobben) sync_node_access_to_stdb(db, stdb, source_id).await; } else { // Vanlig edge sqlx::query( r#" INSERT INTO edges (id, source_id, target_id, edge_type, metadata, system, created_by) VALUES ($1, $2, $3, $4, $5, $6, $7) "#, ) .bind(edge_id) .bind(source_id) .bind(target_id) .bind(&edge_type) .bind(&metadata) .bind(system) .bind(created_by) .execute(db) .await .map_err(|e| format!("PG insert edge {edge_id}: {e}"))?; tracing::info!(edge_id = %edge_id, "Edge persistert til PostgreSQL (via jobbkø)"); // Trigger rendering ved belongs_to if edge_type == "belongs_to" { trigger_render_if_publishing(db, index_cache, source_id, target_id).await; } // A/B-test for presentasjonselement-edges if matches!(edge_type.as_str(), "title" | "subtitle" | "summary" | "og_image") { crate::publishing::maybe_start_ab_test(db, target_id, &edge_type).await; } } Ok(json!({ "edge_id": edge_id.to_string(), "op": "insert_edge" })) } /// Handler: pg_update_node pub async fn handle_update_node( job: &JobRow, db: &PgPool, ) -> Result { let p = &job.payload; let node_id = uuid_from_payload(p, "node_id")?; let node_kind = string_from_payload(p, "node_kind"); let title = string_from_payload(p, "title"); let content = string_from_payload(p, "content"); let visibility = string_from_payload(p, "visibility"); let metadata = p.get("metadata").cloned().unwrap_or(serde_json::json!({})); sqlx::query( r#" UPDATE nodes SET node_kind = $2, title = NULLIF($3, ''), content = NULLIF($4, ''), visibility = $5::visibility, metadata = $6 WHERE id = $1 "#, ) .bind(node_id) .bind(&node_kind) .bind(&title) .bind(&content) .bind(&visibility) .bind(&metadata) .execute(db) .await .map_err(|e| format!("PG update node {node_id}: {e}"))?; tracing::info!(node_id = %node_id, "Node oppdatert i PostgreSQL (via jobbkø)"); Ok(json!({ "node_id": node_id.to_string(), "op": "update_node" })) } /// Handler: pg_delete_node pub async fn handle_delete_node( job: &JobRow, db: &PgPool, ) -> Result { let p = &job.payload; let node_id = uuid_from_payload(p, "node_id")?; sqlx::query("DELETE FROM nodes WHERE id = $1") .bind(node_id) .execute(db) .await .map_err(|e| format!("PG delete node {node_id}: {e}"))?; tracing::info!(node_id = %node_id, "Node slettet fra PostgreSQL (via jobbkø)"); Ok(json!({ "node_id": node_id.to_string(), "op": "delete_node" })) } /// Handler: pg_delete_edge pub async fn handle_delete_edge( job: &JobRow, db: &PgPool, index_cache: &IndexCache, ) -> Result { let p = &job.payload; let edge_id = uuid_from_payload(p, "edge_id")?; let target_id = uuid_from_payload(p, "target_id")?; let edge_type = string_from_payload(p, "edge_type"); sqlx::query("DELETE FROM edges WHERE id = $1") .bind(edge_id) .execute(db) .await .map_err(|e| format!("PG delete edge {edge_id}: {e}"))?; tracing::info!(edge_id = %edge_id, "Edge slettet fra PostgreSQL (via jobbkø)"); // Invalider publiserings-cache ved fjerning av belongs_to if edge_type == "belongs_to" { trigger_index_invalidation_if_publishing(db, index_cache, target_id).await; } Ok(json!({ "edge_id": edge_id.to_string(), "op": "delete_edge" })) } // ============================================================================= // Hjelpefunksjoner (flyttet fra intentions.rs for gjenbruk) // ============================================================================= /// Synkroniserer node_access-rader for et subject fra PG til STDB. async fn sync_node_access_to_stdb(db: &PgPool, stdb: &StdbClient, subject_id: Uuid) { #[derive(sqlx::FromRow)] struct NodeAccessRow { subject_id: Uuid, object_id: Uuid, access: String, via_edge: String, } let rows = sqlx::query_as::<_, NodeAccessRow>( "SELECT subject_id, object_id, access::text as access, \ COALESCE(via_edge::text, '') as via_edge \ FROM node_access WHERE subject_id = $1", ) .bind(subject_id) .fetch_all(db) .await; match rows { Ok(rows) => { for row in &rows { if let Err(e) = stdb .upsert_node_access( &row.subject_id.to_string(), &row.object_id.to_string(), &row.access, &row.via_edge, ) .await { tracing::error!( subject_id = %row.subject_id, object_id = %row.object_id, error = %e, "Kunne ikke synke node_access til STDB (pg_writes)" ); } } tracing::info!( subject_id = %subject_id, count = rows.len(), "node_access synket til STDB (via jobbkø)" ); } Err(e) => { tracing::error!(subject_id = %subject_id, error = %e, "Kunne ikke hente node_access fra PG"); } } } /// Trigger artikkelrendering hvis target er en publiseringssamling. async fn trigger_render_if_publishing( db: &PgPool, index_cache: &IndexCache, source_id: Uuid, target_id: Uuid, ) { match crate::publishing::find_publishing_collection_by_id(db, target_id).await { Ok(Some(config)) => { let article_payload = serde_json::json!({ "node_id": source_id.to_string(), "collection_id": target_id.to_string(), }); match crate::jobs::enqueue(db, "render_article", article_payload, Some(target_id), 5).await { Ok(job_id) => { tracing::info!(job_id = %job_id, node_id = %source_id, collection_id = %target_id, "render_article-jobb lagt i kø"); } Err(e) => { tracing::error!(node_id = %source_id, collection_id = %target_id, error = %e, "Kunne ikke legge render_article-jobb i kø"); } } let index_mode = config.index_mode.as_deref().unwrap_or("dynamic"); if index_mode == "static" { let index_payload = serde_json::json!({ "collection_id": target_id.to_string() }); match crate::jobs::enqueue(db, "render_index", index_payload, Some(target_id), 4).await { Ok(job_id) => { tracing::info!(job_id = %job_id, collection_id = %target_id, "render_index-jobb lagt i kø (statisk modus)"); } Err(e) => { tracing::error!(collection_id = %target_id, error = %e, "Kunne ikke legge render_index-jobb i kø"); } } } else { crate::publishing::invalidate_index_cache(index_cache, target_id).await; } } Ok(None) => {} Err(e) => { tracing::error!(target_id = %target_id, error = %e, "Feil ved sjekk av publiseringssamling for rendering-trigger"); } } } /// Invaliderer forside-cache ved fjerning av belongs_to fra publiseringssamling. async fn trigger_index_invalidation_if_publishing( db: &PgPool, index_cache: &IndexCache, collection_id: Uuid, ) { match crate::publishing::find_publishing_collection_by_id(db, collection_id).await { Ok(Some(config)) => { let index_mode = config.index_mode.as_deref().unwrap_or("dynamic"); if index_mode == "static" { let index_payload = serde_json::json!({ "collection_id": collection_id.to_string() }); match crate::jobs::enqueue(db, "render_index", index_payload, Some(collection_id), 4).await { Ok(job_id) => { tracing::info!(job_id = %job_id, collection_id = %collection_id, "render_index-jobb lagt i kø etter avpublisering"); } Err(e) => { tracing::error!(collection_id = %collection_id, error = %e, "Kunne ikke legge render_index-jobb i kø"); } } } else { crate::publishing::invalidate_index_cache(index_cache, collection_id).await; } } Ok(None) => {} Err(e) => { tracing::error!(collection_id = %collection_id, error = %e, "Feil ved sjekk av publiseringssamling for cache-invalidering"); } } }