Feilhåndtering: retry med backoff + dead letter queue for PG-skrivinger (oppgave 12.3)

Erstatter fire-and-forget tokio::spawn() i skrivestien med jobbkø-basert
persistering. Alle PG-skriveoperasjoner (insert/update/delete for noder
og edges) går nå gjennom den eksisterende jobbkøen som allerede har:

- Eksponentiell backoff (30s × 2^n) ved feil
- Dead letter queue (status='error' etter max_attempts=3)
- Admin-API for overvåking, manuell retry og avbryt
- Ressursstyring og prioritetsregler

Ny modul pg_writes.rs med:
- 5 enqueue-funksjoner (erstatter spawn_pg_*)
- 5 job-handlere for dispatch i worker-loopen
- Full paritet med gammel logikk: tilgangsgivende edges kjører
  recompute_access i transaksjon, synker til STDB, trigger rendering

Før: PG-skrivefeil logget og glemt → data kun i STDB, tapt fra PG.
Nå: automatisk retry → admin-synlig dead letter → manuell recovery.
This commit is contained in:
vegard 2026-03-18 11:26:48 +00:00
parent 9cd6c0ee5d
commit 0fc559a207
6 changed files with 566 additions and 397 deletions

View file

@ -181,6 +181,11 @@ Status lagres i `disk_status_log`-tabellen (siste 1000 målinger beholdes). Admi
| `summarize_communication` | Oppsummering (AI) | Generer AI-sammendrag av kommunikasjonsnode (chat/møte). Oppretter content-node med summary-edge tilbake. Trigges via `/intentions/summarize` | | `summarize_communication` | Oppsummering (AI) | Generer AI-sammendrag av kommunikasjonsnode (chat/møte). Oppretter content-node med summary-edge tilbake. Trigges via `/intentions/summarize` |
| `url_ingest` | Web Clipper (proposal) | Hent URL, oppsummer via AI, opprett research-klipp med graf-koblinger | | `url_ingest` | Web Clipper (proposal) | Hent URL, oppsummer via AI, opprett research-klipp med graf-koblinger |
| `generate_waveform` | Waveforms (proposal) | Generer audio-peaks fra lydfil for visuell bølgeform | | `generate_waveform` | Waveforms (proposal) | Generer audio-peaks fra lydfil for visuell bølgeform |
| `pg_insert_node` | PG-skrivestien | Persister node til PostgreSQL med retry og dead letter (oppgave 12.3) |
| `pg_insert_edge` | PG-skrivestien | Persister edge til PostgreSQL, inkl. recompute_access for tilgangsgivende edges |
| `pg_update_node` | PG-skrivestien | Oppdater node i PostgreSQL med retry |
| `pg_delete_node` | PG-skrivestien | Slett node fra PostgreSQL med retry |
| `pg_delete_edge` | PG-skrivestien | Slett edge fra PostgreSQL med retry, invalider publiserings-cache |
## 6. Tilgangsisolasjon ## 6. Tilgangsisolasjon
Alle jobber merkes med `collection_node_id`. Rust-workers kjører som superuser (bypasser RLS) og sikrer isolasjon i applikasjonskode: Alle jobber merkes med `collection_node_id`. Rust-workers kjører som superuser (bypasser RLS) og sikrer isolasjon i applikasjonskode:

View file

@ -568,12 +568,12 @@ pub async fn create_node(
"Node opprettet i STDB" "Node opprettet i STDB"
); );
// Fang verdier for AI-trigger før de flyttes inn i spawn_pg_insert_node // Fang verdier for AI-trigger før de flyttes inn i enqueue_insert_node
let is_content_node = node_kind == "content"; let is_content_node = node_kind == "content";
let has_enough_text = content.len() >= 20 || title.len() >= 20; let has_enough_text = content.len() >= 20 || title.len() >= 20;
// -- Spawn async PG-skriving -- // -- PG-skriving via jobbkø (retry + dead letter) --
spawn_pg_insert_node( crate::pg_writes::enqueue_insert_node(
state.db.clone(), state.db.clone(),
node_id, node_id,
node_kind, node_kind,
@ -613,11 +613,9 @@ pub async fn create_node(
"belongs_to-edge opprettet i STDB (kontekst-arv)" "belongs_to-edge opprettet i STDB (kontekst-arv)"
); );
// belongs_to er ikke tilgangsgivende — enkel PG-insert // belongs_to er ikke tilgangsgivende — PG-insert via jobbkø
spawn_pg_insert_edge( crate::pg_writes::enqueue_insert_edge(
state.db.clone(), state.db.clone(),
state.stdb.clone(),
state.index_cache.clone(),
edge_id, edge_id,
node_id, node_id,
ctx_id, ctx_id,
@ -679,7 +677,7 @@ pub async fn create_node(
// -- AI edge-forslag: analyser innholdet for topics og mentions -- // -- AI edge-forslag: analyser innholdet for topics og mentions --
// Trigges for content-noder med nok tekst. Lav prioritet (bakgrunnsjobb). // Trigges for content-noder med nok tekst. Lav prioritet (bakgrunnsjobb).
// NB: node_kind, title og content er flyttet inn i spawn_pg_insert_node over, // NB: node_kind, title og content er flyttet inn i enqueue_insert_node over,
// så vi sjekker på kopi av verdiene tatt før move. // så vi sjekker på kopi av verdiene tatt før move.
if is_content_node && has_enough_text { if is_content_node && has_enough_text {
let db_clone = state.db.clone(); let db_clone = state.db.clone();
@ -874,12 +872,10 @@ pub async fn create_edge(
"Edge opprettet i STDB" "Edge opprettet i STDB"
); );
// -- Spawn async PG-skriving -- // -- PG-skriving via jobbkø (retry + dead letter) --
let edge_type = req.edge_type.clone(); let edge_type = req.edge_type.clone();
spawn_pg_insert_edge( crate::pg_writes::enqueue_insert_edge(
state.db.clone(), state.db.clone(),
state.stdb.clone(),
state.index_cache.clone(),
edge_id, edge_id,
req.source_id, req.source_id,
req.target_id, req.target_id,
@ -1092,8 +1088,8 @@ pub async fn update_node(
"Node oppdatert i STDB" "Node oppdatert i STDB"
); );
// -- Spawn async PG-skriving -- // -- PG-skriving via jobbkø (retry + dead letter) --
spawn_pg_update_node( crate::pg_writes::enqueue_update_node(
state.db.clone(), state.db.clone(),
req.node_id, req.node_id,
node_kind, node_kind,
@ -1209,8 +1205,8 @@ pub async fn delete_node(
"Node slettet fra STDB" "Node slettet fra STDB"
); );
// -- Spawn async PG-sletting -- // -- PG-sletting via jobbkø (retry + dead letter) --
spawn_pg_delete_node(state.db.clone(), req.node_id); crate::pg_writes::enqueue_delete_node(state.db.clone(), req.node_id);
Ok(Json(DeleteNodeResponse { deleted: true })) Ok(Json(DeleteNodeResponse { deleted: true }))
} }
@ -1452,9 +1448,8 @@ pub async fn delete_edge(
); );
// -- Spawn async PG-sletting + publiserings-invalidering -- // -- Spawn async PG-sletting + publiserings-invalidering --
spawn_pg_delete_edge( crate::pg_writes::enqueue_delete_edge(
state.db.clone(), state.db.clone(),
state.index_cache.clone(),
req.edge_id, req.edge_id,
edge_info.source_id, edge_info.source_id,
edge_info.target_id, edge_info.target_id,
@ -1464,70 +1459,6 @@ pub async fn delete_edge(
Ok(Json(DeleteEdgeResponse { deleted: true })) Ok(Json(DeleteEdgeResponse { deleted: true }))
} }
/// Spawner en tokio-task som sletter edgen fra PostgreSQL
/// og invaliderer publiserings-cache ved behov.
fn spawn_pg_delete_edge(
db: PgPool,
index_cache: crate::publishing::IndexCache,
edge_id: Uuid,
_source_id: Uuid,
target_id: Uuid,
edge_type: String,
) {
tokio::spawn(async move {
let result = sqlx::query("DELETE FROM edges WHERE id = $1")
.bind(edge_id)
.execute(&db)
.await;
match result {
Ok(_) => {
tracing::info!(edge_id = %edge_id, "Edge slettet fra PostgreSQL");
// Ved fjerning av belongs_to til publiseringssamling: invalider forside-cache
if edge_type == "belongs_to" {
trigger_index_invalidation_if_publishing(&db, &index_cache, target_id).await;
}
}
Err(e) => {
tracing::error!(edge_id = %edge_id, error = %e, "Kunne ikke slette edge fra PostgreSQL");
}
}
});
}
/// Invaliderer forside-cache (dynamisk modus) eller legger render_index-jobb i køen
/// (statisk modus) når en edge fjernes fra en publiseringssamling.
async fn trigger_index_invalidation_if_publishing(
db: &PgPool,
index_cache: &crate::publishing::IndexCache,
collection_id: Uuid,
) {
match crate::publishing::find_publishing_collection_by_id(db, collection_id).await {
Ok(Some(config)) => {
let index_mode = config.index_mode.as_deref().unwrap_or("dynamic");
if index_mode == "static" {
let index_payload = serde_json::json!({
"collection_id": collection_id.to_string(),
});
match crate::jobs::enqueue(db, "render_index", index_payload, Some(collection_id), 4).await {
Ok(job_id) => {
tracing::info!(job_id = %job_id, collection_id = %collection_id, "render_index-jobb lagt i kø etter avpublisering");
}
Err(e) => {
tracing::error!(collection_id = %collection_id, error = %e, "Kunne ikke legge render_index-jobb i kø");
}
}
} else {
crate::publishing::invalidate_index_cache(index_cache, collection_id).await;
}
}
Ok(None) => {}
Err(e) => {
tracing::error!(collection_id = %collection_id, error = %e, "Feil ved sjekk av publiseringssamling for cache-invalidering");
}
}
}
// ============================================================================= // =============================================================================
// set_slot — Redaksjonell slot-håndtering for publiseringssamlinger // set_slot — Redaksjonell slot-håndtering for publiseringssamlinger
@ -1900,8 +1831,8 @@ pub async fn create_communication(
"Kommunikasjonsnode opprettet i STDB" "Kommunikasjonsnode opprettet i STDB"
); );
// Spawn PG-skriving for noden // PG-skriving via jobbkø
spawn_pg_insert_node( crate::pg_writes::enqueue_insert_node(
state.db.clone(), state.db.clone(),
node_id, node_id,
"communication".to_string(), "communication".to_string(),
@ -1937,11 +1868,9 @@ pub async fn create_communication(
.await .await
.map_err(|e| stdb_error("create_edge (owner)", e))?; .map_err(|e| stdb_error("create_edge (owner)", e))?;
// Spawn PG-skriving for owner-edge (med access recompute) // PG-skriving for owner-edge via jobbkø (med access recompute)
spawn_pg_insert_edge( crate::pg_writes::enqueue_insert_edge(
state.db.clone(), state.db.clone(),
state.stdb.clone(),
state.index_cache.clone(),
owner_edge_id, owner_edge_id,
user.node_id, user.node_id,
node_id, node_id,
@ -1980,10 +1909,8 @@ pub async fn create_communication(
.await .await
.map_err(|e| stdb_error("create_edge (member_of)", e))?; .map_err(|e| stdb_error("create_edge (member_of)", e))?;
spawn_pg_insert_edge( crate::pg_writes::enqueue_insert_edge(
state.db.clone(), state.db.clone(),
state.stdb.clone(),
state.index_cache.clone(),
edge_id, edge_id,
*participant_id, *participant_id,
node_id, node_id,
@ -2031,10 +1958,8 @@ pub async fn create_communication(
.await .await
.map_err(|e| stdb_error("create_edge (belongs_to context)", e))?; .map_err(|e| stdb_error("create_edge (belongs_to context)", e))?;
spawn_pg_insert_edge( crate::pg_writes::enqueue_insert_edge(
state.db.clone(), state.db.clone(),
state.stdb.clone(),
state.index_cache.clone(),
ctx_edge_id, ctx_edge_id,
node_id, node_id,
context_id, context_id,
@ -2219,8 +2144,8 @@ pub async fn upload_media(
"Media-node opprettet i STDB" "Media-node opprettet i STDB"
); );
// Spawn async PG-skriving for media-noden // PG-skriving for media-noden via jobbkø
spawn_pg_insert_node( crate::pg_writes::enqueue_insert_node(
state.db.clone(), state.db.clone(),
media_node_id, media_node_id,
"media".to_string(), "media".to_string(),
@ -2260,11 +2185,9 @@ pub async fn upload_media(
"has_media-edge opprettet i STDB" "has_media-edge opprettet i STDB"
); );
// has_media er ikke tilgangsgivende — enkel PG-insert // has_media er ikke tilgangsgivende — PG-insert via jobbkø
spawn_pg_insert_edge( crate::pg_writes::enqueue_insert_edge(
state.db.clone(), state.db.clone(),
state.stdb.clone(),
state.index_cache.clone(),
edge_id, edge_id,
src_id, src_id,
media_node_id, media_node_id,
@ -2543,139 +2466,7 @@ struct NodeKindRow {
node_kind: String, node_kind: String,
} }
/// Spawner en tokio-task som skriver noden til PostgreSQL i bakgrunnen.
fn spawn_pg_insert_node(
db: PgPool,
node_id: Uuid,
node_kind: String,
title: String,
content: String,
visibility: String,
metadata: serde_json::Value,
created_by: Uuid,
) {
tokio::spawn(async move {
let result = sqlx::query(
r#"
INSERT INTO nodes (id, node_kind, title, content, visibility, metadata, created_by)
VALUES ($1, $2, NULLIF($3, ''), NULLIF($4, ''), $5::visibility, $6, $7)
"#,
)
.bind(node_id)
.bind(&node_kind)
.bind(&title)
.bind(&content)
.bind(&visibility)
.bind(&metadata)
.bind(created_by)
.execute(&db)
.await;
match result {
Ok(_) => {
tracing::info!(node_id = %node_id, "Node persistert til PostgreSQL");
}
Err(e) => {
tracing::error!(node_id = %node_id, error = %e, "Kunne ikke persistere node til PostgreSQL");
}
}
});
}
/// Mapper edge_type til access_level for tilgangsgivende edges.
/// Returnerer None for edges som ikke gir tilgang.
fn edge_type_to_access_level(edge_type: &str) -> Option<&'static str> {
match edge_type {
"owner" => Some("owner"),
"admin" => Some("admin"),
"member_of" => Some("member"),
"reader" => Some("reader"),
_ => None,
}
}
/// Spawner en tokio-task som skriver edgen til PostgreSQL i bakgrunnen.
/// For tilgangsgivende edges (owner, admin, member_of, reader) kalles
/// recompute_access i samme transaksjon — ingen vindu med stale tilgang.
/// Synker også node_access til STDB for visibility-filtrering i frontend.
fn spawn_pg_insert_edge(
db: PgPool,
stdb: crate::stdb::StdbClient,
index_cache: crate::publishing::IndexCache,
edge_id: Uuid,
source_id: Uuid,
target_id: Uuid,
edge_type: String,
metadata: serde_json::Value,
system: bool,
created_by: Uuid,
) {
tokio::spawn(async move {
let access_level = edge_type_to_access_level(&edge_type);
if let Some(level) = access_level {
// Tilgangsgivende edge: wrap i transaksjon med recompute_access
let result = insert_edge_with_access(&db, edge_id, source_id, target_id, &edge_type, &metadata, system, created_by, level).await;
match result {
Ok(_) => {
tracing::info!(
edge_id = %edge_id,
edge_type = %edge_type,
access_level = %level,
"Edge + node_access persistert til PostgreSQL"
);
// Synk oppdatert node_access til STDB
sync_node_access_to_stdb(&db, &stdb, source_id).await;
}
Err(e) => {
tracing::error!(
edge_id = %edge_id,
error = %e,
"Kunne ikke persistere edge + node_access til PostgreSQL"
);
}
}
} else {
// Vanlig edge uten tilgangspåvirkning
let result = sqlx::query(
r#"
INSERT INTO edges (id, source_id, target_id, edge_type, metadata, system, created_by)
VALUES ($1, $2, $3, $4, $5, $6, $7)
"#,
)
.bind(edge_id)
.bind(source_id)
.bind(target_id)
.bind(&edge_type)
.bind(&metadata)
.bind(system)
.bind(created_by)
.execute(&db)
.await;
match result {
Ok(_) => {
tracing::info!(edge_id = %edge_id, "Edge persistert til PostgreSQL");
// Trigger artikkelrendering ved belongs_to til publiseringssamling
if edge_type == "belongs_to" {
trigger_render_if_publishing(&db, &index_cache, source_id, target_id).await;
}
// Sjekk om dette er en presentasjonselement-edge og start A/B-test
// hvis det finnes >1 variant av samme type (oppgave 14.17)
if matches!(edge_type.as_str(), "title" | "subtitle" | "summary" | "og_image") {
crate::publishing::maybe_start_ab_test(&db, target_id, &edge_type).await;
}
}
Err(e) => {
tracing::error!(edge_id = %edge_id, error = %e, "Kunne ikke persistere edge til PostgreSQL");
}
}
}
});
}
/// Sjekker om target er en samling med publishing-trait, og legger i så fall /// Sjekker om target er en samling med publishing-trait, og legger i så fall
/// en `render_article`-jobb i køen. For statisk modus legges også en /// en `render_article`-jobb i køen. For statisk modus legges også en
@ -2755,163 +2546,6 @@ async fn trigger_render_if_publishing(
} }
} }
/// Synkroniserer node_access-rader for et subject fra PG til STDB.
/// Kalles etter recompute_access for å holde STDB i synk.
async fn sync_node_access_to_stdb(db: &PgPool, stdb: &crate::stdb::StdbClient, subject_id: Uuid) {
let rows = sqlx::query_as::<_, NodeAccessRow>(
"SELECT subject_id, object_id, access::text as access, \
COALESCE(via_edge::text, '') as via_edge \
FROM node_access WHERE subject_id = $1",
)
.bind(subject_id)
.fetch_all(db)
.await;
match rows {
Ok(rows) => {
for row in &rows {
if let Err(e) = stdb
.upsert_node_access(
&row.subject_id.to_string(),
&row.object_id.to_string(),
&row.access,
&row.via_edge,
)
.await
{
tracing::error!(
subject_id = %row.subject_id,
object_id = %row.object_id,
error = %e,
"Kunne ikke synke node_access til STDB"
);
}
}
tracing::info!(
subject_id = %subject_id,
count = rows.len(),
"node_access synket til STDB"
);
}
Err(e) => {
tracing::error!(subject_id = %subject_id, error = %e, "Kunne ikke hente node_access fra PG");
}
}
}
#[derive(sqlx::FromRow)]
struct NodeAccessRow {
subject_id: Uuid,
object_id: Uuid,
access: String,
via_edge: String,
}
/// Inserter en tilgangsgivende edge og oppdaterer node_access i én transaksjon.
/// source_id = subject (bruker/team), target_id = object (noden det gis tilgang til).
async fn insert_edge_with_access(
db: &PgPool,
edge_id: Uuid,
source_id: Uuid,
target_id: Uuid,
edge_type: &str,
metadata: &serde_json::Value,
system: bool,
created_by: Uuid,
access_level: &str,
) -> Result<(), sqlx::Error> {
let mut tx = db.begin().await?;
sqlx::query(
r#"
INSERT INTO edges (id, source_id, target_id, edge_type, metadata, system, created_by)
VALUES ($1, $2, $3, $4, $5, $6, $7)
"#,
)
.bind(edge_id)
.bind(source_id)
.bind(target_id)
.bind(edge_type)
.bind(metadata)
.bind(system)
.bind(created_by)
.execute(&mut *tx)
.await?;
// Kall recompute_access: subject=source_id, object=target_id
sqlx::query(
"SELECT recompute_access($1, $2, $3::access_level, $4)",
)
.bind(source_id)
.bind(target_id)
.bind(access_level)
.bind(edge_id)
.execute(&mut *tx)
.await?;
tx.commit().await?;
Ok(())
}
/// Spawner en tokio-task som oppdaterer noden i PostgreSQL.
fn spawn_pg_update_node(
db: PgPool,
node_id: Uuid,
node_kind: String,
title: String,
content: String,
visibility: String,
metadata: serde_json::Value,
) {
tokio::spawn(async move {
let result = sqlx::query(
r#"
UPDATE nodes
SET node_kind = $2, title = NULLIF($3, ''), content = NULLIF($4, ''),
visibility = $5::visibility, metadata = $6
WHERE id = $1
"#,
)
.bind(node_id)
.bind(&node_kind)
.bind(&title)
.bind(&content)
.bind(&visibility)
.bind(&metadata)
.execute(&db)
.await;
match result {
Ok(_) => {
tracing::info!(node_id = %node_id, "Node oppdatert i PostgreSQL");
}
Err(e) => {
tracing::error!(node_id = %node_id, error = %e, "Kunne ikke oppdatere node i PostgreSQL");
}
}
});
}
/// Spawner en tokio-task som sletter noden fra PostgreSQL.
/// Edges slettes automatisk via ON DELETE CASCADE.
fn spawn_pg_delete_node(db: PgPool, node_id: Uuid) {
tokio::spawn(async move {
let result = sqlx::query("DELETE FROM nodes WHERE id = $1")
.bind(node_id)
.execute(&db)
.await;
match result {
Ok(_) => {
tracing::info!(node_id = %node_id, "Node slettet fra PostgreSQL");
}
Err(e) => {
tracing::error!(node_id = %node_id, error = %e, "Kunne ikke slette node fra PostgreSQL");
}
}
});
}
// ============================================================================= // =============================================================================
// POST /intentions/update_segment — rediger transkripsjons-segment // POST /intentions/update_segment — rediger transkripsjons-segment
@ -4385,8 +4019,8 @@ pub async fn create_announcement(
"Systemvarsel opprettet i STDB" "Systemvarsel opprettet i STDB"
); );
// -- Persister til PostgreSQL asynkront -- // -- Persister til PostgreSQL via jobbkø --
spawn_pg_insert_node( crate::pg_writes::enqueue_insert_node(
state.db.clone(), state.db.clone(),
node_id, node_id,
"system_announcement".to_string(), "system_announcement".to_string(),

View file

@ -21,6 +21,8 @@ use crate::audio;
use crate::cas::CasStore; use crate::cas::CasStore;
use crate::cli_dispatch; use crate::cli_dispatch;
use crate::maintenance::MaintenanceState; use crate::maintenance::MaintenanceState;
use crate::pg_writes;
use crate::publishing::IndexCache;
use crate::resources::{self, PriorityRules}; use crate::resources::{self, PriorityRules};
use crate::stdb::StdbClient; use crate::stdb::StdbClient;
use crate::summarize; use crate::summarize;
@ -170,6 +172,7 @@ async fn dispatch(
db: &PgPool, db: &PgPool,
stdb: &StdbClient, stdb: &StdbClient,
cas: &CasStore, cas: &CasStore,
index_cache: &IndexCache,
whisper_url: &str, whisper_url: &str,
) -> Result<serde_json::Value, String> { ) -> Result<serde_json::Value, String> {
match job.job_type.as_str() { match job.job_type.as_str() {
@ -202,6 +205,22 @@ async fn dispatch(
"render_index" => { "render_index" => {
handle_render_index(job, cas).await handle_render_index(job, cas).await
} }
// PG-skriveoperasjoner (oppgave 12.3): retry med backoff + dead letter queue
"pg_insert_node" => {
pg_writes::handle_insert_node(job, db).await
}
"pg_insert_edge" => {
pg_writes::handle_insert_edge(job, db, stdb, index_cache).await
}
"pg_update_node" => {
pg_writes::handle_update_node(job, db).await
}
"pg_delete_node" => {
pg_writes::handle_delete_node(job, db).await
}
"pg_delete_edge" => {
pg_writes::handle_delete_edge(job, db, index_cache).await
}
other => Err(format!("Ukjent jobbtype: {other}")), other => Err(format!("Ukjent jobbtype: {other}")),
} }
} }
@ -452,6 +471,7 @@ pub fn start_worker(
db: PgPool, db: PgPool,
stdb: StdbClient, stdb: StdbClient,
cas: CasStore, cas: CasStore,
index_cache: IndexCache,
maintenance: MaintenanceState, maintenance: MaintenanceState,
priority_rules: PriorityRules, priority_rules: PriorityRules,
) { ) {
@ -593,6 +613,7 @@ pub fn start_worker(
let db2 = db.clone(); let db2 = db.clone();
let stdb2 = stdb.clone(); let stdb2 = stdb.clone();
let cas2 = cas.clone(); let cas2 = cas.clone();
let index_cache2 = index_cache.clone();
let whisper_url2 = whisper_url.clone(); let whisper_url2 = whisper_url.clone();
let timeout_secs = if rule.timeout_seconds > 0 { let timeout_secs = if rule.timeout_seconds > 0 {
rule.timeout_seconds as u64 rule.timeout_seconds as u64
@ -606,7 +627,7 @@ pub fn start_worker(
let result = tokio::time::timeout( let result = tokio::time::timeout(
std::time::Duration::from_secs(timeout_secs), std::time::Duration::from_secs(timeout_secs),
dispatch(&job, &db2, &stdb2, &cas2, &whisper_url2), dispatch(&job, &db2, &stdb2, &cas2, &index_cache2, &whisper_url2),
) )
.await; .await;

View file

@ -15,6 +15,7 @@ pub mod maintenance;
pub mod metrics; pub mod metrics;
pub mod pruning; pub mod pruning;
mod queries; mod queries;
pub mod pg_writes;
pub mod publishing; pub mod publishing;
pub mod health; pub mod health;
pub mod resource_usage; pub mod resource_usage;
@ -166,8 +167,10 @@ async fn main() {
.await .await
.expect("Kunne ikke laste prioritetsregler fra PG"); .expect("Kunne ikke laste prioritetsregler fra PG");
let index_cache = publishing::new_index_cache();
// Start jobbkø-worker i bakgrunnen (med ressursstyring, oppgave 15.5) // Start jobbkø-worker i bakgrunnen (med ressursstyring, oppgave 15.5)
jobs::start_worker(db.clone(), stdb.clone(), cas.clone(), maintenance.clone(), priority_rules.clone()); jobs::start_worker(db.clone(), stdb.clone(), cas.clone(), index_cache.clone(), maintenance.clone(), priority_rules.clone());
// Start periodisk CAS-pruning i bakgrunnen // Start periodisk CAS-pruning i bakgrunnen
pruning::start_pruning_loop(db.clone(), cas.clone()); pruning::start_pruning_loop(db.clone(), cas.clone());
@ -189,8 +192,6 @@ async fn main() {
// Start periodisk CAS tmp-opprydding (oppgave 17.6) // Start periodisk CAS tmp-opprydding (oppgave 17.6)
cas::start_tmp_cleanup_loop(cas.clone()); cas::start_tmp_cleanup_loop(cas.clone());
let index_cache = publishing::new_index_cache();
let dynamic_page_cache = publishing::new_dynamic_page_cache(); let dynamic_page_cache = publishing::new_dynamic_page_cache();
let metrics = metrics::MetricsCollector::new(); let metrics = metrics::MetricsCollector::new();
let state = AppState { db, jwks, stdb, cas, index_cache, dynamic_page_cache, maintenance, priority_rules, metrics }; let state = AppState { db, jwks, stdb, cas, index_cache, dynamic_page_cache, maintenance, priority_rules, metrics };

View file

@ -0,0 +1,509 @@
// pg_writes — Jobbkø-handlere for PG-skriveoperasjoner.
//
// Erstatter fire-and-forget `tokio::spawn()` med retry via jobbkøen.
// Hver skriveoperasjon (insert/update/delete for nodes og edges) er en
// egen jobbtype som behandles av den eksisterende worker-loopen med
// eksponentiell backoff (30s × 2^n) og dead letter queue (status='error'
// etter max_attempts).
//
// Jobbtyper:
// pg_insert_node, pg_insert_edge, pg_update_node,
// pg_delete_node, pg_delete_edge
//
// Ref: docs/infra/jobbkø.md, oppgave 12.3
use serde_json::json;
use sqlx::PgPool;
use uuid::Uuid;
use crate::jobs::JobRow;
use crate::publishing::IndexCache;
use crate::stdb::StdbClient;
/// Prioritet for PG-skriveoperasjoner. Høy — data-konsistens er kritisk.
const PG_WRITE_PRIORITY: i16 = 8;
// =============================================================================
// Enqueue-funksjoner (erstatter spawn_pg_*)
// =============================================================================
/// Legger en insert_node-operasjon i jobbkøen.
pub fn enqueue_insert_node(
db: PgPool,
node_id: Uuid,
node_kind: String,
title: String,
content: String,
visibility: String,
metadata: serde_json::Value,
created_by: Uuid,
) {
let payload = json!({
"node_id": node_id,
"node_kind": node_kind,
"title": title,
"content": content,
"visibility": visibility,
"metadata": metadata,
"created_by": created_by,
});
tokio::spawn(async move {
if let Err(e) = crate::jobs::enqueue(&db, "pg_insert_node", payload, None, PG_WRITE_PRIORITY).await {
tracing::error!(node_id = %node_id, error = %e, "Kunne ikke legge pg_insert_node i jobbkø");
}
});
}
/// Legger en insert_edge-operasjon i jobbkøen.
pub fn enqueue_insert_edge(
db: PgPool,
edge_id: Uuid,
source_id: Uuid,
target_id: Uuid,
edge_type: String,
metadata: serde_json::Value,
system: bool,
created_by: Uuid,
) {
let payload = json!({
"edge_id": edge_id,
"source_id": source_id,
"target_id": target_id,
"edge_type": edge_type,
"metadata": metadata,
"system": system,
"created_by": created_by,
});
tokio::spawn(async move {
if let Err(e) = crate::jobs::enqueue(&db, "pg_insert_edge", payload, None, PG_WRITE_PRIORITY).await {
tracing::error!(edge_id = %edge_id, error = %e, "Kunne ikke legge pg_insert_edge i jobbkø");
}
});
}
/// Legger en update_node-operasjon i jobbkøen.
pub fn enqueue_update_node(
db: PgPool,
node_id: Uuid,
node_kind: String,
title: String,
content: String,
visibility: String,
metadata: serde_json::Value,
) {
let payload = json!({
"node_id": node_id,
"node_kind": node_kind,
"title": title,
"content": content,
"visibility": visibility,
"metadata": metadata,
});
tokio::spawn(async move {
if let Err(e) = crate::jobs::enqueue(&db, "pg_update_node", payload, None, PG_WRITE_PRIORITY).await {
tracing::error!(node_id = %node_id, error = %e, "Kunne ikke legge pg_update_node i jobbkø");
}
});
}
/// Legger en delete_node-operasjon i jobbkøen.
pub fn enqueue_delete_node(db: PgPool, node_id: Uuid) {
let payload = json!({ "node_id": node_id });
tokio::spawn(async move {
if let Err(e) = crate::jobs::enqueue(&db, "pg_delete_node", payload, None, PG_WRITE_PRIORITY).await {
tracing::error!(node_id = %node_id, error = %e, "Kunne ikke legge pg_delete_node i jobbkø");
}
});
}
/// Legger en delete_edge-operasjon i jobbkøen.
pub fn enqueue_delete_edge(
db: PgPool,
edge_id: Uuid,
source_id: Uuid,
target_id: Uuid,
edge_type: String,
) {
let payload = json!({
"edge_id": edge_id,
"source_id": source_id,
"target_id": target_id,
"edge_type": edge_type,
});
tokio::spawn(async move {
if let Err(e) = crate::jobs::enqueue(&db, "pg_delete_edge", payload, None, PG_WRITE_PRIORITY).await {
tracing::error!(edge_id = %edge_id, error = %e, "Kunne ikke legge pg_delete_edge i jobbkø");
}
});
}
// =============================================================================
// Job-handlere (kalles fra dispatch i jobs.rs)
// =============================================================================
/// Helpers for å parse UUID fra payload.
fn uuid_from_payload(payload: &serde_json::Value, key: &str) -> Result<Uuid, String> {
payload
.get(key)
.and_then(|v| v.as_str())
.and_then(|s| s.parse().ok())
.ok_or_else(|| format!("Mangler eller ugyldig {key} i payload"))
}
fn string_from_payload(payload: &serde_json::Value, key: &str) -> String {
payload.get(key).and_then(|v| v.as_str()).unwrap_or("").to_string()
}
/// Handler: pg_insert_node
pub async fn handle_insert_node(
job: &JobRow,
db: &PgPool,
) -> Result<serde_json::Value, String> {
let p = &job.payload;
let node_id = uuid_from_payload(p, "node_id")?;
let node_kind = string_from_payload(p, "node_kind");
let title = string_from_payload(p, "title");
let content = string_from_payload(p, "content");
let visibility = string_from_payload(p, "visibility");
let metadata = p.get("metadata").cloned().unwrap_or(serde_json::json!({}));
let created_by = uuid_from_payload(p, "created_by")?;
sqlx::query(
r#"
INSERT INTO nodes (id, node_kind, title, content, visibility, metadata, created_by)
VALUES ($1, $2, NULLIF($3, ''), NULLIF($4, ''), $5::visibility, $6, $7)
"#,
)
.bind(node_id)
.bind(&node_kind)
.bind(&title)
.bind(&content)
.bind(&visibility)
.bind(&metadata)
.bind(created_by)
.execute(db)
.await
.map_err(|e| format!("PG insert node {node_id}: {e}"))?;
tracing::info!(node_id = %node_id, "Node persistert til PostgreSQL (via jobbkø)");
Ok(json!({ "node_id": node_id.to_string(), "op": "insert_node" }))
}
/// Mapper edge_type til access_level for tilgangsgivende edges.
fn edge_type_to_access_level(edge_type: &str) -> Option<&'static str> {
match edge_type {
"owner" => Some("owner"),
"admin" => Some("admin"),
"member_of" => Some("member"),
"reader" => Some("reader"),
_ => None,
}
}
/// Handler: pg_insert_edge
///
/// Håndterer tilgangsgivende edges (owner/admin/member_of/reader) med
/// recompute_access i transaksjon, og synker til STDB. For belongs_to-edges
/// trigges artikkelrendering hvis target er en publiseringssamling.
pub async fn handle_insert_edge(
job: &JobRow,
db: &PgPool,
stdb: &StdbClient,
index_cache: &IndexCache,
) -> Result<serde_json::Value, String> {
let p = &job.payload;
let edge_id = uuid_from_payload(p, "edge_id")?;
let source_id = uuid_from_payload(p, "source_id")?;
let target_id = uuid_from_payload(p, "target_id")?;
let edge_type = string_from_payload(p, "edge_type");
let metadata = p.get("metadata").cloned().unwrap_or(serde_json::json!({}));
let system = p.get("system").and_then(|v| v.as_bool()).unwrap_or(false);
let created_by = uuid_from_payload(p, "created_by")?;
let access_level = edge_type_to_access_level(&edge_type);
if let Some(level) = access_level {
// Tilgangsgivende edge: transaksjon med recompute_access
let mut tx = db.begin().await.map_err(|e| format!("PG begin: {e}"))?;
sqlx::query(
r#"
INSERT INTO edges (id, source_id, target_id, edge_type, metadata, system, created_by)
VALUES ($1, $2, $3, $4, $5, $6, $7)
"#,
)
.bind(edge_id)
.bind(source_id)
.bind(target_id)
.bind(&edge_type)
.bind(&metadata)
.bind(system)
.bind(created_by)
.execute(&mut *tx)
.await
.map_err(|e| format!("PG insert edge {edge_id}: {e}"))?;
sqlx::query("SELECT recompute_access($1, $2, $3::access_level, $4)")
.bind(source_id)
.bind(target_id)
.bind(level)
.bind(edge_id)
.execute(&mut *tx)
.await
.map_err(|e| format!("recompute_access: {e}"))?;
tx.commit().await.map_err(|e| format!("PG commit: {e}"))?;
tracing::info!(
edge_id = %edge_id,
edge_type = %edge_type,
access_level = %level,
"Edge + node_access persistert til PostgreSQL (via jobbkø)"
);
// Synk node_access til STDB (best-effort, feil logger men feiler ikke jobben)
sync_node_access_to_stdb(db, stdb, source_id).await;
} else {
// Vanlig edge
sqlx::query(
r#"
INSERT INTO edges (id, source_id, target_id, edge_type, metadata, system, created_by)
VALUES ($1, $2, $3, $4, $5, $6, $7)
"#,
)
.bind(edge_id)
.bind(source_id)
.bind(target_id)
.bind(&edge_type)
.bind(&metadata)
.bind(system)
.bind(created_by)
.execute(db)
.await
.map_err(|e| format!("PG insert edge {edge_id}: {e}"))?;
tracing::info!(edge_id = %edge_id, "Edge persistert til PostgreSQL (via jobbkø)");
// Trigger rendering ved belongs_to
if edge_type == "belongs_to" {
trigger_render_if_publishing(db, index_cache, source_id, target_id).await;
}
// A/B-test for presentasjonselement-edges
if matches!(edge_type.as_str(), "title" | "subtitle" | "summary" | "og_image") {
crate::publishing::maybe_start_ab_test(db, target_id, &edge_type).await;
}
}
Ok(json!({ "edge_id": edge_id.to_string(), "op": "insert_edge" }))
}
/// Handler: pg_update_node
pub async fn handle_update_node(
job: &JobRow,
db: &PgPool,
) -> Result<serde_json::Value, String> {
let p = &job.payload;
let node_id = uuid_from_payload(p, "node_id")?;
let node_kind = string_from_payload(p, "node_kind");
let title = string_from_payload(p, "title");
let content = string_from_payload(p, "content");
let visibility = string_from_payload(p, "visibility");
let metadata = p.get("metadata").cloned().unwrap_or(serde_json::json!({}));
sqlx::query(
r#"
UPDATE nodes
SET node_kind = $2, title = NULLIF($3, ''), content = NULLIF($4, ''),
visibility = $5::visibility, metadata = $6
WHERE id = $1
"#,
)
.bind(node_id)
.bind(&node_kind)
.bind(&title)
.bind(&content)
.bind(&visibility)
.bind(&metadata)
.execute(db)
.await
.map_err(|e| format!("PG update node {node_id}: {e}"))?;
tracing::info!(node_id = %node_id, "Node oppdatert i PostgreSQL (via jobbkø)");
Ok(json!({ "node_id": node_id.to_string(), "op": "update_node" }))
}
/// Handler: pg_delete_node
pub async fn handle_delete_node(
job: &JobRow,
db: &PgPool,
) -> Result<serde_json::Value, String> {
let p = &job.payload;
let node_id = uuid_from_payload(p, "node_id")?;
sqlx::query("DELETE FROM nodes WHERE id = $1")
.bind(node_id)
.execute(db)
.await
.map_err(|e| format!("PG delete node {node_id}: {e}"))?;
tracing::info!(node_id = %node_id, "Node slettet fra PostgreSQL (via jobbkø)");
Ok(json!({ "node_id": node_id.to_string(), "op": "delete_node" }))
}
/// Handler: pg_delete_edge
pub async fn handle_delete_edge(
job: &JobRow,
db: &PgPool,
index_cache: &IndexCache,
) -> Result<serde_json::Value, String> {
let p = &job.payload;
let edge_id = uuid_from_payload(p, "edge_id")?;
let target_id = uuid_from_payload(p, "target_id")?;
let edge_type = string_from_payload(p, "edge_type");
sqlx::query("DELETE FROM edges WHERE id = $1")
.bind(edge_id)
.execute(db)
.await
.map_err(|e| format!("PG delete edge {edge_id}: {e}"))?;
tracing::info!(edge_id = %edge_id, "Edge slettet fra PostgreSQL (via jobbkø)");
// Invalider publiserings-cache ved fjerning av belongs_to
if edge_type == "belongs_to" {
trigger_index_invalidation_if_publishing(db, index_cache, target_id).await;
}
Ok(json!({ "edge_id": edge_id.to_string(), "op": "delete_edge" }))
}
// =============================================================================
// Hjelpefunksjoner (flyttet fra intentions.rs for gjenbruk)
// =============================================================================
/// Synkroniserer node_access-rader for et subject fra PG til STDB.
async fn sync_node_access_to_stdb(db: &PgPool, stdb: &StdbClient, subject_id: Uuid) {
#[derive(sqlx::FromRow)]
struct NodeAccessRow {
subject_id: Uuid,
object_id: Uuid,
access: String,
via_edge: String,
}
let rows = sqlx::query_as::<_, NodeAccessRow>(
"SELECT subject_id, object_id, access::text as access, \
COALESCE(via_edge::text, '') as via_edge \
FROM node_access WHERE subject_id = $1",
)
.bind(subject_id)
.fetch_all(db)
.await;
match rows {
Ok(rows) => {
for row in &rows {
if let Err(e) = stdb
.upsert_node_access(
&row.subject_id.to_string(),
&row.object_id.to_string(),
&row.access,
&row.via_edge,
)
.await
{
tracing::error!(
subject_id = %row.subject_id,
object_id = %row.object_id,
error = %e,
"Kunne ikke synke node_access til STDB (pg_writes)"
);
}
}
tracing::info!(
subject_id = %subject_id,
count = rows.len(),
"node_access synket til STDB (via jobbkø)"
);
}
Err(e) => {
tracing::error!(subject_id = %subject_id, error = %e, "Kunne ikke hente node_access fra PG");
}
}
}
/// Trigger artikkelrendering hvis target er en publiseringssamling.
async fn trigger_render_if_publishing(
db: &PgPool,
index_cache: &IndexCache,
source_id: Uuid,
target_id: Uuid,
) {
match crate::publishing::find_publishing_collection_by_id(db, target_id).await {
Ok(Some(config)) => {
let article_payload = serde_json::json!({
"node_id": source_id.to_string(),
"collection_id": target_id.to_string(),
});
match crate::jobs::enqueue(db, "render_article", article_payload, Some(target_id), 5).await {
Ok(job_id) => {
tracing::info!(job_id = %job_id, node_id = %source_id, collection_id = %target_id, "render_article-jobb lagt i kø");
}
Err(e) => {
tracing::error!(node_id = %source_id, collection_id = %target_id, error = %e, "Kunne ikke legge render_article-jobb i kø");
}
}
let index_mode = config.index_mode.as_deref().unwrap_or("dynamic");
if index_mode == "static" {
let index_payload = serde_json::json!({ "collection_id": target_id.to_string() });
match crate::jobs::enqueue(db, "render_index", index_payload, Some(target_id), 4).await {
Ok(job_id) => {
tracing::info!(job_id = %job_id, collection_id = %target_id, "render_index-jobb lagt i kø (statisk modus)");
}
Err(e) => {
tracing::error!(collection_id = %target_id, error = %e, "Kunne ikke legge render_index-jobb i kø");
}
}
} else {
crate::publishing::invalidate_index_cache(index_cache, target_id).await;
}
}
Ok(None) => {}
Err(e) => {
tracing::error!(target_id = %target_id, error = %e, "Feil ved sjekk av publiseringssamling for rendering-trigger");
}
}
}
/// Invaliderer forside-cache ved fjerning av belongs_to fra publiseringssamling.
async fn trigger_index_invalidation_if_publishing(
db: &PgPool,
index_cache: &IndexCache,
collection_id: Uuid,
) {
match crate::publishing::find_publishing_collection_by_id(db, collection_id).await {
Ok(Some(config)) => {
let index_mode = config.index_mode.as_deref().unwrap_or("dynamic");
if index_mode == "static" {
let index_payload = serde_json::json!({ "collection_id": collection_id.to_string() });
match crate::jobs::enqueue(db, "render_index", index_payload, Some(collection_id), 4).await {
Ok(job_id) => {
tracing::info!(job_id = %job_id, collection_id = %collection_id, "render_index-jobb lagt i kø etter avpublisering");
}
Err(e) => {
tracing::error!(collection_id = %collection_id, error = %e, "Kunne ikke legge render_index-jobb i kø");
}
}
} else {
crate::publishing::invalidate_index_cache(index_cache, collection_id).await;
}
}
Ok(None) => {}
Err(e) => {
tracing::error!(collection_id = %collection_id, error = %e, "Feil ved sjekk av publiseringssamling for cache-invalidering");
}
}
}

View file

@ -269,8 +269,7 @@ kaller dem direkte. Samme verktøy, to brukere.
- [x] 12.1 Observerbarhet: strukturert logging, metrikker (request latency, queue depth, AI cost). - [x] 12.1 Observerbarhet: strukturert logging, metrikker (request latency, queue depth, AI cost).
- [x] 12.2 Backup: PG-dump rutine, STDB → PG gjenoppbygging ved krasj. - [x] 12.2 Backup: PG-dump rutine, STDB → PG gjenoppbygging ved krasj.
- [~] 12.3 Feilhåndtering: retry med backoff i skrivestien, dead letter queue for feilede PG-skrivinger. - [x] 12.3 Feilhåndtering: retry med backoff i skrivestien, dead letter queue for feilede PG-skrivinger.
> Påbegynt: 2026-03-18T11:16
- [ ] 12.4 Ytelse: profiler PG-spørringer, optimaliser node_access-oppdatering. - [ ] 12.4 Ytelse: profiler PG-spørringer, optimaliser node_access-oppdatering.
## Fase 22: SpacetimeDB-migrering — PG LISTEN/NOTIFY ## Fase 22: SpacetimeDB-migrering — PG LISTEN/NOTIFY