Systematisk gjennomgang av PG-skjema, auth-middleware, intensjoner, skrivestien og WebSocket-laget. Alle kjernetabeller matcher docs. Auth fungerer korrekt (401 for ugyldig/manglende token). Skrivestien er konsistent: direkte PG-skriving → NOTIFY → WebSocket. Fikser: - Fjern død kode: pg_writes enqueue-funksjoner (aldri kalt etter STDB-migrering) - Fjern ubrukt truncate() i tts.rs - Legg til #[allow(dead_code)] for sqlx-structs med ubrukte felt - Rett feilaktig doc-påstand i api_grensesnitt.md om jobbkø - Fjern utdatert STDB-referanse i agent_api.md - Kompilerer uten warnings Se logs/validering-23.1.md for fullstendig rapport.
354 lines
13 KiB
Rust
354 lines
13 KiB
Rust
// pg_writes — Jobbkø-handlere for PG-skriveoperasjoner.
|
|
//
|
|
// Historisk kontekst: Disse handlene ble opprettet da skrivestien gikk
|
|
// via jobbkøen (STDB → async PG-skriving). Etter STDB-migreringen (fase 22)
|
|
// skriver intensjonene direkte til PG, og NOTIFY-triggere sender
|
|
// sanntidsoppdateringer. Handlene beholdes for å prosessere eventuelle
|
|
// gjenværende jobber i køen.
|
|
//
|
|
// Jobbtyper:
|
|
// pg_insert_node, pg_insert_edge, pg_update_node,
|
|
// pg_delete_node, pg_delete_edge
|
|
//
|
|
// Ref: docs/infra/jobbkø.md, oppgave 12.3
|
|
|
|
use serde_json::json;
|
|
use sqlx::PgPool;
|
|
use uuid::Uuid;
|
|
|
|
use crate::jobs::JobRow;
|
|
use crate::publishing::IndexCache;
|
|
|
|
// Enqueue-funksjonene (enqueue_insert_node, enqueue_insert_edge, etc.) er fjernet.
|
|
// Etter STDB-migreringen (fase 22) skriver intensjonene direkte til PG.
|
|
// NOTIFY-triggere sender sanntidsoppdateringer via WebSocket.
|
|
// Handle-funksjonene under beholdes for å prosessere eventuelle gjenværende
|
|
// jobber i køen fra den gamle arkitekturen.
|
|
|
|
// =============================================================================
|
|
// Job-handlere (kalles fra dispatch i jobs.rs)
|
|
// =============================================================================
|
|
|
|
/// Helpers for å parse UUID fra payload.
|
|
fn uuid_from_payload(payload: &serde_json::Value, key: &str) -> Result<Uuid, String> {
|
|
payload
|
|
.get(key)
|
|
.and_then(|v| v.as_str())
|
|
.and_then(|s| s.parse().ok())
|
|
.ok_or_else(|| format!("Mangler eller ugyldig {key} i payload"))
|
|
}
|
|
|
|
fn string_from_payload(payload: &serde_json::Value, key: &str) -> String {
|
|
payload.get(key).and_then(|v| v.as_str()).unwrap_or("").to_string()
|
|
}
|
|
|
|
/// Handler: pg_insert_node
|
|
pub async fn handle_insert_node(
|
|
job: &JobRow,
|
|
db: &PgPool,
|
|
) -> Result<serde_json::Value, String> {
|
|
let p = &job.payload;
|
|
let node_id = uuid_from_payload(p, "node_id")?;
|
|
let node_kind = string_from_payload(p, "node_kind");
|
|
let title = string_from_payload(p, "title");
|
|
let content = string_from_payload(p, "content");
|
|
let visibility = string_from_payload(p, "visibility");
|
|
let metadata = p.get("metadata").cloned().unwrap_or(serde_json::json!({}));
|
|
let created_by = uuid_from_payload(p, "created_by")?;
|
|
|
|
sqlx::query(
|
|
r#"
|
|
INSERT INTO nodes (id, node_kind, title, content, visibility, metadata, created_by)
|
|
VALUES ($1, $2, NULLIF($3, ''), NULLIF($4, ''), $5::visibility, $6, $7)
|
|
"#,
|
|
)
|
|
.bind(node_id)
|
|
.bind(&node_kind)
|
|
.bind(&title)
|
|
.bind(&content)
|
|
.bind(&visibility)
|
|
.bind(&metadata)
|
|
.bind(created_by)
|
|
.execute(db)
|
|
.await
|
|
.map_err(|e| format!("PG insert node {node_id}: {e}"))?;
|
|
|
|
tracing::info!(node_id = %node_id, "Node persistert til PostgreSQL (via jobbkø)");
|
|
Ok(json!({ "node_id": node_id.to_string(), "op": "insert_node" }))
|
|
}
|
|
|
|
/// Mapper edge_type til access_level for tilgangsgivende edges.
|
|
fn edge_type_to_access_level(edge_type: &str) -> Option<&'static str> {
|
|
match edge_type {
|
|
"owner" => Some("owner"),
|
|
"admin" => Some("admin"),
|
|
"member_of" => Some("member"),
|
|
"reader" => Some("reader"),
|
|
_ => None,
|
|
}
|
|
}
|
|
|
|
/// Handler: pg_insert_edge
|
|
///
|
|
/// Håndterer tilgangsgivende edges (owner/admin/member_of/reader) med
|
|
/// recompute_access i transaksjon. For belongs_to-edges
|
|
/// trigges artikkelrendering hvis target er en publiseringssamling.
|
|
pub async fn handle_insert_edge(
|
|
job: &JobRow,
|
|
db: &PgPool,
|
|
index_cache: &IndexCache,
|
|
) -> Result<serde_json::Value, String> {
|
|
let p = &job.payload;
|
|
let edge_id = uuid_from_payload(p, "edge_id")?;
|
|
let source_id = uuid_from_payload(p, "source_id")?;
|
|
let target_id = uuid_from_payload(p, "target_id")?;
|
|
let edge_type = string_from_payload(p, "edge_type");
|
|
let metadata = p.get("metadata").cloned().unwrap_or(serde_json::json!({}));
|
|
let system = p.get("system").and_then(|v| v.as_bool()).unwrap_or(false);
|
|
let created_by = uuid_from_payload(p, "created_by")?;
|
|
|
|
let access_level = edge_type_to_access_level(&edge_type);
|
|
|
|
if let Some(level) = access_level {
|
|
// Tilgangsgivende edge: transaksjon med recompute_access
|
|
let recompute_start = std::time::Instant::now();
|
|
let mut tx = db.begin().await.map_err(|e| format!("PG begin: {e}"))?;
|
|
|
|
sqlx::query(
|
|
r#"
|
|
INSERT INTO edges (id, source_id, target_id, edge_type, metadata, system, created_by)
|
|
VALUES ($1, $2, $3, $4, $5, $6, $7)
|
|
"#,
|
|
)
|
|
.bind(edge_id)
|
|
.bind(source_id)
|
|
.bind(target_id)
|
|
.bind(&edge_type)
|
|
.bind(&metadata)
|
|
.bind(system)
|
|
.bind(created_by)
|
|
.execute(&mut *tx)
|
|
.await
|
|
.map_err(|e| format!("PG insert edge {edge_id}: {e}"))?;
|
|
|
|
sqlx::query("SELECT recompute_access($1, $2, $3::access_level, $4)")
|
|
.bind(source_id)
|
|
.bind(target_id)
|
|
.bind(level)
|
|
.bind(edge_id)
|
|
.execute(&mut *tx)
|
|
.await
|
|
.map_err(|e| format!("recompute_access: {e}"))?;
|
|
|
|
tx.commit().await.map_err(|e| format!("PG commit: {e}"))?;
|
|
|
|
let recompute_ms = recompute_start.elapsed().as_secs_f64() * 1000.0;
|
|
tracing::info!(
|
|
edge_id = %edge_id,
|
|
edge_type = %edge_type,
|
|
access_level = %level,
|
|
recompute_ms = recompute_ms,
|
|
"Edge + node_access persistert til PostgreSQL (via jobbkø)"
|
|
);
|
|
if recompute_ms >= 100.0 {
|
|
tracing::warn!(
|
|
edge_id = %edge_id,
|
|
duration_ms = recompute_ms,
|
|
"slow_recompute_access"
|
|
);
|
|
}
|
|
|
|
// PG NOTIFY-triggere (access_changed) sender sanntidsoppdateringer.
|
|
} else {
|
|
// Vanlig edge
|
|
sqlx::query(
|
|
r#"
|
|
INSERT INTO edges (id, source_id, target_id, edge_type, metadata, system, created_by)
|
|
VALUES ($1, $2, $3, $4, $5, $6, $7)
|
|
"#,
|
|
)
|
|
.bind(edge_id)
|
|
.bind(source_id)
|
|
.bind(target_id)
|
|
.bind(&edge_type)
|
|
.bind(&metadata)
|
|
.bind(system)
|
|
.bind(created_by)
|
|
.execute(db)
|
|
.await
|
|
.map_err(|e| format!("PG insert edge {edge_id}: {e}"))?;
|
|
|
|
tracing::info!(edge_id = %edge_id, "Edge persistert til PostgreSQL (via jobbkø)");
|
|
|
|
// Trigger rendering ved belongs_to
|
|
if edge_type == "belongs_to" {
|
|
trigger_render_if_publishing(db, index_cache, source_id, target_id).await;
|
|
}
|
|
|
|
// A/B-test for presentasjonselement-edges
|
|
if matches!(edge_type.as_str(), "title" | "subtitle" | "summary" | "og_image") {
|
|
crate::publishing::maybe_start_ab_test(db, target_id, &edge_type).await;
|
|
}
|
|
}
|
|
|
|
Ok(json!({ "edge_id": edge_id.to_string(), "op": "insert_edge" }))
|
|
}
|
|
|
|
/// Handler: pg_update_node
|
|
pub async fn handle_update_node(
|
|
job: &JobRow,
|
|
db: &PgPool,
|
|
) -> Result<serde_json::Value, String> {
|
|
let p = &job.payload;
|
|
let node_id = uuid_from_payload(p, "node_id")?;
|
|
let node_kind = string_from_payload(p, "node_kind");
|
|
let title = string_from_payload(p, "title");
|
|
let content = string_from_payload(p, "content");
|
|
let visibility = string_from_payload(p, "visibility");
|
|
let metadata = p.get("metadata").cloned().unwrap_or(serde_json::json!({}));
|
|
|
|
sqlx::query(
|
|
r#"
|
|
UPDATE nodes
|
|
SET node_kind = $2, title = NULLIF($3, ''), content = NULLIF($4, ''),
|
|
visibility = $5::visibility, metadata = $6
|
|
WHERE id = $1
|
|
"#,
|
|
)
|
|
.bind(node_id)
|
|
.bind(&node_kind)
|
|
.bind(&title)
|
|
.bind(&content)
|
|
.bind(&visibility)
|
|
.bind(&metadata)
|
|
.execute(db)
|
|
.await
|
|
.map_err(|e| format!("PG update node {node_id}: {e}"))?;
|
|
|
|
tracing::info!(node_id = %node_id, "Node oppdatert i PostgreSQL (via jobbkø)");
|
|
Ok(json!({ "node_id": node_id.to_string(), "op": "update_node" }))
|
|
}
|
|
|
|
/// Handler: pg_delete_node
|
|
pub async fn handle_delete_node(
|
|
job: &JobRow,
|
|
db: &PgPool,
|
|
) -> Result<serde_json::Value, String> {
|
|
let p = &job.payload;
|
|
let node_id = uuid_from_payload(p, "node_id")?;
|
|
|
|
sqlx::query("DELETE FROM nodes WHERE id = $1")
|
|
.bind(node_id)
|
|
.execute(db)
|
|
.await
|
|
.map_err(|e| format!("PG delete node {node_id}: {e}"))?;
|
|
|
|
tracing::info!(node_id = %node_id, "Node slettet fra PostgreSQL (via jobbkø)");
|
|
Ok(json!({ "node_id": node_id.to_string(), "op": "delete_node" }))
|
|
}
|
|
|
|
/// Handler: pg_delete_edge
|
|
pub async fn handle_delete_edge(
|
|
job: &JobRow,
|
|
db: &PgPool,
|
|
index_cache: &IndexCache,
|
|
) -> Result<serde_json::Value, String> {
|
|
let p = &job.payload;
|
|
let edge_id = uuid_from_payload(p, "edge_id")?;
|
|
let target_id = uuid_from_payload(p, "target_id")?;
|
|
let edge_type = string_from_payload(p, "edge_type");
|
|
|
|
sqlx::query("DELETE FROM edges WHERE id = $1")
|
|
.bind(edge_id)
|
|
.execute(db)
|
|
.await
|
|
.map_err(|e| format!("PG delete edge {edge_id}: {e}"))?;
|
|
|
|
tracing::info!(edge_id = %edge_id, "Edge slettet fra PostgreSQL (via jobbkø)");
|
|
|
|
// Invalider publiserings-cache ved fjerning av belongs_to
|
|
if edge_type == "belongs_to" {
|
|
trigger_index_invalidation_if_publishing(db, index_cache, target_id).await;
|
|
}
|
|
|
|
Ok(json!({ "edge_id": edge_id.to_string(), "op": "delete_edge" }))
|
|
}
|
|
|
|
// =============================================================================
|
|
// Hjelpefunksjoner (flyttet fra intentions.rs for gjenbruk)
|
|
// =============================================================================
|
|
|
|
/// Trigger artikkelrendering hvis target er en publiseringssamling.
|
|
async fn trigger_render_if_publishing(
|
|
db: &PgPool,
|
|
index_cache: &IndexCache,
|
|
source_id: Uuid,
|
|
target_id: Uuid,
|
|
) {
|
|
match crate::publishing::find_publishing_collection_by_id(db, target_id).await {
|
|
Ok(Some(config)) => {
|
|
let article_payload = serde_json::json!({
|
|
"node_id": source_id.to_string(),
|
|
"collection_id": target_id.to_string(),
|
|
});
|
|
|
|
match crate::jobs::enqueue(db, "render_article", article_payload, Some(target_id), 5).await {
|
|
Ok(job_id) => {
|
|
tracing::info!(job_id = %job_id, node_id = %source_id, collection_id = %target_id, "render_article-jobb lagt i kø");
|
|
}
|
|
Err(e) => {
|
|
tracing::error!(node_id = %source_id, collection_id = %target_id, error = %e, "Kunne ikke legge render_article-jobb i kø");
|
|
}
|
|
}
|
|
|
|
let index_mode = config.index_mode.as_deref().unwrap_or("dynamic");
|
|
if index_mode == "static" {
|
|
let index_payload = serde_json::json!({ "collection_id": target_id.to_string() });
|
|
match crate::jobs::enqueue(db, "render_index", index_payload, Some(target_id), 4).await {
|
|
Ok(job_id) => {
|
|
tracing::info!(job_id = %job_id, collection_id = %target_id, "render_index-jobb lagt i kø (statisk modus)");
|
|
}
|
|
Err(e) => {
|
|
tracing::error!(collection_id = %target_id, error = %e, "Kunne ikke legge render_index-jobb i kø");
|
|
}
|
|
}
|
|
} else {
|
|
crate::publishing::invalidate_index_cache(index_cache, target_id).await;
|
|
}
|
|
}
|
|
Ok(None) => {}
|
|
Err(e) => {
|
|
tracing::error!(target_id = %target_id, error = %e, "Feil ved sjekk av publiseringssamling for rendering-trigger");
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Invaliderer forside-cache ved fjerning av belongs_to fra publiseringssamling.
|
|
async fn trigger_index_invalidation_if_publishing(
|
|
db: &PgPool,
|
|
index_cache: &IndexCache,
|
|
collection_id: Uuid,
|
|
) {
|
|
match crate::publishing::find_publishing_collection_by_id(db, collection_id).await {
|
|
Ok(Some(config)) => {
|
|
let index_mode = config.index_mode.as_deref().unwrap_or("dynamic");
|
|
if index_mode == "static" {
|
|
let index_payload = serde_json::json!({ "collection_id": collection_id.to_string() });
|
|
match crate::jobs::enqueue(db, "render_index", index_payload, Some(collection_id), 4).await {
|
|
Ok(job_id) => {
|
|
tracing::info!(job_id = %job_id, collection_id = %collection_id, "render_index-jobb lagt i kø etter avpublisering");
|
|
}
|
|
Err(e) => {
|
|
tracing::error!(collection_id = %collection_id, error = %e, "Kunne ikke legge render_index-jobb i kø");
|
|
}
|
|
}
|
|
} else {
|
|
crate::publishing::invalidate_index_cache(index_cache, collection_id).await;
|
|
}
|
|
}
|
|
Ok(None) => {}
|
|
Err(e) => {
|
|
tracing::error!(collection_id = %collection_id, error = %e, "Feil ved sjekk av publiseringssamling for cache-invalidering");
|
|
}
|
|
}
|
|
}
|