// Jobbkø — PostgreSQL-basert asynkron jobbehandling. // // Enkel polling-loop med SELECT ... FOR UPDATE SKIP LOCKED. // Dispatching til handler-funksjoner basert på job_type. // // Ref: docs/infra/jobbkø.md use sqlx::PgPool; use uuid::Uuid; use crate::agent; use crate::ai_edges; use crate::audio; use crate::cas::CasStore; use crate::maintenance::MaintenanceState; use crate::publishing; use crate::stdb::StdbClient; use crate::summarize; use crate::transcribe; use crate::tts; /// Rad fra job_queue-tabellen. #[derive(sqlx::FromRow, Debug)] pub struct JobRow { pub id: Uuid, pub collection_node_id: Option, pub job_type: String, pub payload: serde_json::Value, pub attempts: i16, pub max_attempts: i16, } /// Legger en ny jobb i køen. pub async fn enqueue( db: &PgPool, job_type: &str, payload: serde_json::Value, collection_node_id: Option, priority: i16, ) -> Result { let row = sqlx::query_scalar::<_, Uuid>( r#" INSERT INTO job_queue (job_type, payload, collection_node_id, priority) VALUES ($1, $2, $3, $4) RETURNING id "#, ) .bind(job_type) .bind(&payload) .bind(collection_node_id) .bind(priority) .fetch_one(db) .await?; tracing::info!(job_id = %row, job_type = %job_type, "Jobb lagt i kø"); Ok(row) } /// Henter neste ventende jobb (atomisk med FOR UPDATE SKIP LOCKED). /// Setter status til 'running' og oppdaterer started_at. async fn dequeue(db: &PgPool) -> Result, sqlx::Error> { let mut tx = db.begin().await?; let job = sqlx::query_as::<_, JobRow>( r#" SELECT id, collection_node_id, job_type, payload, attempts, max_attempts FROM job_queue WHERE status IN ('pending', 'retry') AND scheduled_for <= now() ORDER BY priority DESC, scheduled_for ASC LIMIT 1 FOR UPDATE SKIP LOCKED "#, ) .fetch_optional(&mut *tx) .await?; if let Some(ref job) = job { sqlx::query( "UPDATE job_queue SET status = 'running', started_at = now(), attempts = attempts + 1 WHERE id = $1", ) .bind(job.id) .execute(&mut *tx) .await?; } tx.commit().await?; Ok(job) } /// Markerer en jobb som fullført. async fn complete_job(db: &PgPool, job_id: Uuid, result: serde_json::Value) -> Result<(), sqlx::Error> { sqlx::query( "UPDATE job_queue SET status = 'completed', result = $2, completed_at = now() WHERE id = $1", ) .bind(job_id) .bind(&result) .execute(db) .await?; Ok(()) } /// Markerer en jobb som feilet. Settes til 'retry' med backoff hvis forsøk gjenstår, /// ellers 'error'. async fn fail_job(db: &PgPool, job: &JobRow, error_msg: &str) -> Result<(), sqlx::Error> { if job.attempts < job.max_attempts { // Eksponentiell backoff: 30s × 2^(attempts-1) let backoff_secs = 30i64 * 2i64.pow((job.attempts).max(0) as u32); sqlx::query( r#" UPDATE job_queue SET status = 'retry', error_msg = $2, scheduled_for = now() + ($3 || ' seconds')::interval WHERE id = $1 "#, ) .bind(job.id) .bind(error_msg) .bind(backoff_secs.to_string()) .execute(db) .await?; tracing::warn!( job_id = %job.id, attempt = job.attempts, max = job.max_attempts, backoff_secs = backoff_secs, "Jobb feilet, retry planlagt" ); } else { sqlx::query( "UPDATE job_queue SET status = 'error', error_msg = $2, completed_at = now() WHERE id = $1", ) .bind(job.id) .bind(error_msg) .execute(db) .await?; tracing::error!( job_id = %job.id, attempts = job.attempts, "Jobb permanent feilet" ); } Ok(()) } /// Dispatcher — kjører riktig handler basert på job_type. async fn dispatch( job: &JobRow, db: &PgPool, stdb: &StdbClient, cas: &CasStore, whisper_url: &str, ) -> Result { match job.job_type.as_str() { "whisper_transcribe" => { transcribe::handle_whisper_job(job, db, stdb, cas, whisper_url).await } "agent_respond" => { agent::handle_agent_respond(job, db, stdb).await } "suggest_edges" => { ai_edges::handle_suggest_edges(job, db, stdb).await } "summarize_communication" => { summarize::handle_summarize_communication(job, db, stdb).await } "tts_generate" => { tts::handle_tts_job(job, db, stdb, cas).await } "audio_process" => { audio::handle_audio_process_job(job, db, stdb, cas).await } "render_article" => { handle_render_article(job, db, cas).await } "render_index" => { handle_render_index(job, db, cas).await } other => Err(format!("Ukjent jobbtype: {other}")), } } /// Handler for `render_article`-jobb. /// /// Payload: `{ "node_id": "...", "collection_id": "..." }` /// Rendrer artikkelens metadata.document til HTML via Tera, lagrer i CAS, /// oppdaterer nodens metadata.rendered. async fn handle_render_article( job: &JobRow, db: &PgPool, cas: &CasStore, ) -> Result { let node_id: Uuid = job .payload .get("node_id") .and_then(|v| v.as_str()) .and_then(|s| s.parse().ok()) .ok_or("Mangler node_id i payload")?; let collection_id: Uuid = job .payload .get("collection_id") .and_then(|v| v.as_str()) .and_then(|s| s.parse().ok()) .ok_or("Mangler collection_id i payload")?; publishing::render_article_to_cas(db, cas, node_id, collection_id).await } /// Handler for `render_index`-jobb. /// /// Payload: `{ "collection_id": "..." }` /// Rendrer forsiden til HTML via Tera, lagrer i CAS, /// oppdaterer samlingens metadata.rendered_index. async fn handle_render_index( job: &JobRow, db: &PgPool, cas: &CasStore, ) -> Result { let collection_id: Uuid = job .payload .get("collection_id") .and_then(|v| v.as_str()) .and_then(|s| s.parse().ok()) .ok_or("Mangler collection_id i payload")?; publishing::render_index_to_cas(db, cas, collection_id).await } /// Starter worker-loopen som poller job_queue. /// Kjører som en bakgrunnsoppgave i tokio. /// /// Respekterer vedlikeholdsmodus: når `maintenance.is_active()` er true, /// slutter workeren å dequeue nye jobber (kjørende jobber fullføres). pub fn start_worker(db: PgPool, stdb: StdbClient, cas: CasStore, maintenance: MaintenanceState) { let whisper_url = std::env::var("WHISPER_URL") .unwrap_or_else(|_| "http://faster-whisper:8000".to_string()); tokio::spawn(async move { tracing::info!("Jobbkø-worker startet (poll-intervall: 2s)"); loop { // Sjekk vedlikeholdsmodus — ikke dequeue nye jobber if maintenance.is_active() { tracing::debug!("Vedlikeholdsmodus aktiv — jobbkø pauset"); tokio::time::sleep(std::time::Duration::from_secs(5)).await; continue; } match dequeue(&db).await { Ok(Some(job)) => { tracing::info!( job_id = %job.id, job_type = %job.job_type, attempt = job.attempts, "Behandler jobb" ); match dispatch(&job, &db, &stdb, &cas, &whisper_url).await { Ok(result) => { if let Err(e) = complete_job(&db, job.id, result).await { tracing::error!(job_id = %job.id, error = %e, "Kunne ikke markere jobb som fullført"); } else { tracing::info!(job_id = %job.id, "Jobb fullført"); } } Err(err) => { tracing::error!(job_id = %job.id, error = %err, "Jobb feilet"); if let Err(e) = fail_job(&db, &job, &err).await { tracing::error!(job_id = %job.id, error = %e, "Kunne ikke markere jobb som feilet"); } } } } Ok(None) => { // Ingen ventende jobber — vent før neste poll tokio::time::sleep(std::time::Duration::from_secs(2)).await; } Err(e) => { tracing::error!(error = %e, "Feil ved polling av jobbkø"); tokio::time::sleep(std::time::Duration::from_secs(5)).await; } } } }); }