// pg_writes — Jobbkø-handlere for PG-skriveoperasjoner. // // Historisk kontekst: Disse handlene ble opprettet da skrivestien gikk // via jobbkøen (STDB → async PG-skriving). Etter STDB-migreringen (fase 22) // skriver intensjonene direkte til PG, og NOTIFY-triggere sender // sanntidsoppdateringer. Handlene beholdes for å prosessere eventuelle // gjenværende jobber i køen. // // Jobbtyper: // pg_insert_node, pg_insert_edge, pg_update_node, // pg_delete_node, pg_delete_edge // // Ref: docs/infra/jobbkø.md, oppgave 12.3 use serde_json::json; use sqlx::PgPool; use uuid::Uuid; use crate::jobs::JobRow; use crate::publishing::IndexCache; // Enqueue-funksjonene (enqueue_insert_node, enqueue_insert_edge, etc.) er fjernet. // Etter STDB-migreringen (fase 22) skriver intensjonene direkte til PG. // NOTIFY-triggere sender sanntidsoppdateringer via WebSocket. // Handle-funksjonene under beholdes for å prosessere eventuelle gjenværende // jobber i køen fra den gamle arkitekturen. // ============================================================================= // Job-handlere (kalles fra dispatch i jobs.rs) // ============================================================================= /// Helpers for å parse UUID fra payload. fn uuid_from_payload(payload: &serde_json::Value, key: &str) -> Result { payload .get(key) .and_then(|v| v.as_str()) .and_then(|s| s.parse().ok()) .ok_or_else(|| format!("Mangler eller ugyldig {key} i payload")) } fn string_from_payload(payload: &serde_json::Value, key: &str) -> String { payload.get(key).and_then(|v| v.as_str()).unwrap_or("").to_string() } /// Handler: pg_insert_node pub async fn handle_insert_node( job: &JobRow, db: &PgPool, ) -> Result { let p = &job.payload; let node_id = uuid_from_payload(p, "node_id")?; let node_kind = string_from_payload(p, "node_kind"); let title = string_from_payload(p, "title"); let content = string_from_payload(p, "content"); let visibility = string_from_payload(p, "visibility"); let metadata = p.get("metadata").cloned().unwrap_or(serde_json::json!({})); let created_by = uuid_from_payload(p, "created_by")?; sqlx::query( r#" INSERT INTO nodes (id, node_kind, title, content, visibility, metadata, created_by) VALUES ($1, $2, NULLIF($3, ''), NULLIF($4, ''), $5::visibility, $6, $7) "#, ) .bind(node_id) .bind(&node_kind) .bind(&title) .bind(&content) .bind(&visibility) .bind(&metadata) .bind(created_by) .execute(db) .await .map_err(|e| format!("PG insert node {node_id}: {e}"))?; tracing::info!(node_id = %node_id, "Node persistert til PostgreSQL (via jobbkø)"); Ok(json!({ "node_id": node_id.to_string(), "op": "insert_node" })) } /// Mapper edge_type til access_level for tilgangsgivende edges. fn edge_type_to_access_level(edge_type: &str) -> Option<&'static str> { match edge_type { "owner" => Some("owner"), "admin" => Some("admin"), "member_of" => Some("member"), "reader" => Some("reader"), _ => None, } } /// Handler: pg_insert_edge /// /// Håndterer tilgangsgivende edges (owner/admin/member_of/reader) med /// recompute_access i transaksjon. For belongs_to-edges /// trigges artikkelrendering hvis target er en publiseringssamling. pub async fn handle_insert_edge( job: &JobRow, db: &PgPool, index_cache: &IndexCache, ) -> Result { let p = &job.payload; let edge_id = uuid_from_payload(p, "edge_id")?; let source_id = uuid_from_payload(p, "source_id")?; let target_id = uuid_from_payload(p, "target_id")?; let edge_type = string_from_payload(p, "edge_type"); let metadata = p.get("metadata").cloned().unwrap_or(serde_json::json!({})); let system = p.get("system").and_then(|v| v.as_bool()).unwrap_or(false); let created_by = uuid_from_payload(p, "created_by")?; let access_level = edge_type_to_access_level(&edge_type); if let Some(level) = access_level { // Tilgangsgivende edge: transaksjon med recompute_access let recompute_start = std::time::Instant::now(); let mut tx = db.begin().await.map_err(|e| format!("PG begin: {e}"))?; sqlx::query( r#" INSERT INTO edges (id, source_id, target_id, edge_type, metadata, system, created_by) VALUES ($1, $2, $3, $4, $5, $6, $7) "#, ) .bind(edge_id) .bind(source_id) .bind(target_id) .bind(&edge_type) .bind(&metadata) .bind(system) .bind(created_by) .execute(&mut *tx) .await .map_err(|e| format!("PG insert edge {edge_id}: {e}"))?; sqlx::query("SELECT recompute_access($1, $2, $3::access_level, $4)") .bind(source_id) .bind(target_id) .bind(level) .bind(edge_id) .execute(&mut *tx) .await .map_err(|e| format!("recompute_access: {e}"))?; tx.commit().await.map_err(|e| format!("PG commit: {e}"))?; let recompute_ms = recompute_start.elapsed().as_secs_f64() * 1000.0; tracing::info!( edge_id = %edge_id, edge_type = %edge_type, access_level = %level, recompute_ms = recompute_ms, "Edge + node_access persistert til PostgreSQL (via jobbkø)" ); if recompute_ms >= 100.0 { tracing::warn!( edge_id = %edge_id, duration_ms = recompute_ms, "slow_recompute_access" ); } // PG NOTIFY-triggere (access_changed) sender sanntidsoppdateringer. } else { // Vanlig edge sqlx::query( r#" INSERT INTO edges (id, source_id, target_id, edge_type, metadata, system, created_by) VALUES ($1, $2, $3, $4, $5, $6, $7) "#, ) .bind(edge_id) .bind(source_id) .bind(target_id) .bind(&edge_type) .bind(&metadata) .bind(system) .bind(created_by) .execute(db) .await .map_err(|e| format!("PG insert edge {edge_id}: {e}"))?; tracing::info!(edge_id = %edge_id, "Edge persistert til PostgreSQL (via jobbkø)"); // Trigger rendering ved belongs_to if edge_type == "belongs_to" { trigger_render_if_publishing(db, index_cache, source_id, target_id).await; } // A/B-test for presentasjonselement-edges if matches!(edge_type.as_str(), "title" | "subtitle" | "summary" | "og_image") { crate::publishing::maybe_start_ab_test(db, target_id, &edge_type).await; } } Ok(json!({ "edge_id": edge_id.to_string(), "op": "insert_edge" })) } /// Handler: pg_update_node pub async fn handle_update_node( job: &JobRow, db: &PgPool, ) -> Result { let p = &job.payload; let node_id = uuid_from_payload(p, "node_id")?; let node_kind = string_from_payload(p, "node_kind"); let title = string_from_payload(p, "title"); let content = string_from_payload(p, "content"); let visibility = string_from_payload(p, "visibility"); let metadata = p.get("metadata").cloned().unwrap_or(serde_json::json!({})); sqlx::query( r#" UPDATE nodes SET node_kind = $2, title = NULLIF($3, ''), content = NULLIF($4, ''), visibility = $5::visibility, metadata = $6 WHERE id = $1 "#, ) .bind(node_id) .bind(&node_kind) .bind(&title) .bind(&content) .bind(&visibility) .bind(&metadata) .execute(db) .await .map_err(|e| format!("PG update node {node_id}: {e}"))?; tracing::info!(node_id = %node_id, "Node oppdatert i PostgreSQL (via jobbkø)"); Ok(json!({ "node_id": node_id.to_string(), "op": "update_node" })) } /// Handler: pg_delete_node pub async fn handle_delete_node( job: &JobRow, db: &PgPool, ) -> Result { let p = &job.payload; let node_id = uuid_from_payload(p, "node_id")?; sqlx::query("DELETE FROM nodes WHERE id = $1") .bind(node_id) .execute(db) .await .map_err(|e| format!("PG delete node {node_id}: {e}"))?; tracing::info!(node_id = %node_id, "Node slettet fra PostgreSQL (via jobbkø)"); Ok(json!({ "node_id": node_id.to_string(), "op": "delete_node" })) } /// Handler: pg_delete_edge pub async fn handle_delete_edge( job: &JobRow, db: &PgPool, index_cache: &IndexCache, ) -> Result { let p = &job.payload; let edge_id = uuid_from_payload(p, "edge_id")?; let target_id = uuid_from_payload(p, "target_id")?; let edge_type = string_from_payload(p, "edge_type"); sqlx::query("DELETE FROM edges WHERE id = $1") .bind(edge_id) .execute(db) .await .map_err(|e| format!("PG delete edge {edge_id}: {e}"))?; tracing::info!(edge_id = %edge_id, "Edge slettet fra PostgreSQL (via jobbkø)"); // Invalider publiserings-cache ved fjerning av belongs_to if edge_type == "belongs_to" { trigger_index_invalidation_if_publishing(db, index_cache, target_id).await; } Ok(json!({ "edge_id": edge_id.to_string(), "op": "delete_edge" })) } // ============================================================================= // Hjelpefunksjoner (flyttet fra intentions.rs for gjenbruk) // ============================================================================= /// Trigger artikkelrendering hvis target er en publiseringssamling. async fn trigger_render_if_publishing( db: &PgPool, index_cache: &IndexCache, source_id: Uuid, target_id: Uuid, ) { match crate::publishing::find_publishing_collection_by_id(db, target_id).await { Ok(Some(config)) => { let article_payload = serde_json::json!({ "node_id": source_id.to_string(), "collection_id": target_id.to_string(), }); match crate::jobs::enqueue(db, "render_article", article_payload, Some(target_id), 5).await { Ok(job_id) => { tracing::info!(job_id = %job_id, node_id = %source_id, collection_id = %target_id, "render_article-jobb lagt i kø"); } Err(e) => { tracing::error!(node_id = %source_id, collection_id = %target_id, error = %e, "Kunne ikke legge render_article-jobb i kø"); } } let index_mode = config.index_mode.as_deref().unwrap_or("dynamic"); if index_mode == "static" { let index_payload = serde_json::json!({ "collection_id": target_id.to_string() }); match crate::jobs::enqueue(db, "render_index", index_payload, Some(target_id), 4).await { Ok(job_id) => { tracing::info!(job_id = %job_id, collection_id = %target_id, "render_index-jobb lagt i kø (statisk modus)"); } Err(e) => { tracing::error!(collection_id = %target_id, error = %e, "Kunne ikke legge render_index-jobb i kø"); } } } else { crate::publishing::invalidate_index_cache(index_cache, target_id).await; } } Ok(None) => {} Err(e) => { tracing::error!(target_id = %target_id, error = %e, "Feil ved sjekk av publiseringssamling for rendering-trigger"); } } } /// Invaliderer forside-cache ved fjerning av belongs_to fra publiseringssamling. async fn trigger_index_invalidation_if_publishing( db: &PgPool, index_cache: &IndexCache, collection_id: Uuid, ) { match crate::publishing::find_publishing_collection_by_id(db, collection_id).await { Ok(Some(config)) => { let index_mode = config.index_mode.as_deref().unwrap_or("dynamic"); if index_mode == "static" { let index_payload = serde_json::json!({ "collection_id": collection_id.to_string() }); match crate::jobs::enqueue(db, "render_index", index_payload, Some(collection_id), 4).await { Ok(job_id) => { tracing::info!(job_id = %job_id, collection_id = %collection_id, "render_index-jobb lagt i kø etter avpublisering"); } Err(e) => { tracing::error!(collection_id = %collection_id, error = %e, "Kunne ikke legge render_index-jobb i kø"); } } } else { crate::publishing::invalidate_index_cache(index_cache, collection_id).await; } } Ok(None) => {} Err(e) => { tracing::error!(collection_id = %collection_id, error = %e, "Feil ved sjekk av publiseringssamling for cache-invalidering"); } } }