synops/maskinrommet/src/jobs.rs
vegard f537f3dcf3 Forside-rendering med to moduser: statisk CAS og dynamisk in-memory cache (oppgave 14.3)
serve_index støtter nå index_mode fra publishing-trait:
- "static": render_index-jobb rendrer forsiden til CAS ved publisering,
  samlingens metadata.rendered_index.index_hash peker til CAS-fil,
  serveres med Cache-Control: immutable
- "dynamic" (default): in-memory cache med konfigurerbar TTL
  (index_cache_ttl, default 300s), invalidert ved belongs_to-endringer

Tre separate indekserte PG-spørringer erstatter den gamle
alt-i-ett-spørringen — filtrerer på slot i edge-metadata
(hero/featured/strøm) med LIMIT, bruker GIN-indeks.

Trigger-logikk utvidet: belongs_to-edge-opprettelse legger
render_index-jobb i kø (statisk) eller invaliderer cache (dynamisk).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-18 01:04:31 +00:00

278 lines
8.4 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// Jobbkø — PostgreSQL-basert asynkron jobbehandling.
//
// Enkel polling-loop med SELECT ... FOR UPDATE SKIP LOCKED.
// Dispatching til handler-funksjoner basert på job_type.
//
// Ref: docs/infra/jobbkø.md
use sqlx::PgPool;
use uuid::Uuid;
use crate::agent;
use crate::ai_edges;
use crate::audio;
use crate::cas::CasStore;
use crate::publishing;
use crate::stdb::StdbClient;
use crate::summarize;
use crate::transcribe;
use crate::tts;
/// Rad fra job_queue-tabellen.
#[derive(sqlx::FromRow, Debug)]
pub struct JobRow {
pub id: Uuid,
pub collection_node_id: Option<Uuid>,
pub job_type: String,
pub payload: serde_json::Value,
pub attempts: i16,
pub max_attempts: i16,
}
/// Legger en ny jobb i køen.
pub async fn enqueue(
db: &PgPool,
job_type: &str,
payload: serde_json::Value,
collection_node_id: Option<Uuid>,
priority: i16,
) -> Result<Uuid, sqlx::Error> {
let row = sqlx::query_scalar::<_, Uuid>(
r#"
INSERT INTO job_queue (job_type, payload, collection_node_id, priority)
VALUES ($1, $2, $3, $4)
RETURNING id
"#,
)
.bind(job_type)
.bind(&payload)
.bind(collection_node_id)
.bind(priority)
.fetch_one(db)
.await?;
tracing::info!(job_id = %row, job_type = %job_type, "Jobb lagt i kø");
Ok(row)
}
/// Henter neste ventende jobb (atomisk med FOR UPDATE SKIP LOCKED).
/// Setter status til 'running' og oppdaterer started_at.
async fn dequeue(db: &PgPool) -> Result<Option<JobRow>, sqlx::Error> {
let mut tx = db.begin().await?;
let job = sqlx::query_as::<_, JobRow>(
r#"
SELECT id, collection_node_id, job_type, payload, attempts, max_attempts
FROM job_queue
WHERE status IN ('pending', 'retry')
AND scheduled_for <= now()
ORDER BY priority DESC, scheduled_for ASC
LIMIT 1
FOR UPDATE SKIP LOCKED
"#,
)
.fetch_optional(&mut *tx)
.await?;
if let Some(ref job) = job {
sqlx::query(
"UPDATE job_queue SET status = 'running', started_at = now(), attempts = attempts + 1 WHERE id = $1",
)
.bind(job.id)
.execute(&mut *tx)
.await?;
}
tx.commit().await?;
Ok(job)
}
/// Markerer en jobb som fullført.
async fn complete_job(db: &PgPool, job_id: Uuid, result: serde_json::Value) -> Result<(), sqlx::Error> {
sqlx::query(
"UPDATE job_queue SET status = 'completed', result = $2, completed_at = now() WHERE id = $1",
)
.bind(job_id)
.bind(&result)
.execute(db)
.await?;
Ok(())
}
/// Markerer en jobb som feilet. Settes til 'retry' med backoff hvis forsøk gjenstår,
/// ellers 'error'.
async fn fail_job(db: &PgPool, job: &JobRow, error_msg: &str) -> Result<(), sqlx::Error> {
if job.attempts < job.max_attempts {
// Eksponentiell backoff: 30s × 2^(attempts-1)
let backoff_secs = 30i64 * 2i64.pow((job.attempts).max(0) as u32);
sqlx::query(
r#"
UPDATE job_queue
SET status = 'retry',
error_msg = $2,
scheduled_for = now() + ($3 || ' seconds')::interval
WHERE id = $1
"#,
)
.bind(job.id)
.bind(error_msg)
.bind(backoff_secs.to_string())
.execute(db)
.await?;
tracing::warn!(
job_id = %job.id,
attempt = job.attempts,
max = job.max_attempts,
backoff_secs = backoff_secs,
"Jobb feilet, retry planlagt"
);
} else {
sqlx::query(
"UPDATE job_queue SET status = 'error', error_msg = $2, completed_at = now() WHERE id = $1",
)
.bind(job.id)
.bind(error_msg)
.execute(db)
.await?;
tracing::error!(
job_id = %job.id,
attempts = job.attempts,
"Jobb permanent feilet"
);
}
Ok(())
}
/// Dispatcher — kjører riktig handler basert på job_type.
async fn dispatch(
job: &JobRow,
db: &PgPool,
stdb: &StdbClient,
cas: &CasStore,
whisper_url: &str,
) -> Result<serde_json::Value, String> {
match job.job_type.as_str() {
"whisper_transcribe" => {
transcribe::handle_whisper_job(job, db, stdb, cas, whisper_url).await
}
"agent_respond" => {
agent::handle_agent_respond(job, db, stdb).await
}
"suggest_edges" => {
ai_edges::handle_suggest_edges(job, db, stdb).await
}
"summarize_communication" => {
summarize::handle_summarize_communication(job, db, stdb).await
}
"tts_generate" => {
tts::handle_tts_job(job, db, stdb, cas).await
}
"audio_process" => {
audio::handle_audio_process_job(job, db, stdb, cas).await
}
"render_article" => {
handle_render_article(job, db, cas).await
}
"render_index" => {
handle_render_index(job, db, cas).await
}
other => Err(format!("Ukjent jobbtype: {other}")),
}
}
/// Handler for `render_article`-jobb.
///
/// Payload: `{ "node_id": "...", "collection_id": "..." }`
/// Rendrer artikkelens metadata.document til HTML via Tera, lagrer i CAS,
/// oppdaterer nodens metadata.rendered.
async fn handle_render_article(
job: &JobRow,
db: &PgPool,
cas: &CasStore,
) -> Result<serde_json::Value, String> {
let node_id: Uuid = job
.payload
.get("node_id")
.and_then(|v| v.as_str())
.and_then(|s| s.parse().ok())
.ok_or("Mangler node_id i payload")?;
let collection_id: Uuid = job
.payload
.get("collection_id")
.and_then(|v| v.as_str())
.and_then(|s| s.parse().ok())
.ok_or("Mangler collection_id i payload")?;
publishing::render_article_to_cas(db, cas, node_id, collection_id).await
}
/// Handler for `render_index`-jobb.
///
/// Payload: `{ "collection_id": "..." }`
/// Rendrer forsiden til HTML via Tera, lagrer i CAS,
/// oppdaterer samlingens metadata.rendered_index.
async fn handle_render_index(
job: &JobRow,
db: &PgPool,
cas: &CasStore,
) -> Result<serde_json::Value, String> {
let collection_id: Uuid = job
.payload
.get("collection_id")
.and_then(|v| v.as_str())
.and_then(|s| s.parse().ok())
.ok_or("Mangler collection_id i payload")?;
publishing::render_index_to_cas(db, cas, collection_id).await
}
/// Starter worker-loopen som poller job_queue.
/// Kjører som en bakgrunnsoppgave i tokio.
pub fn start_worker(db: PgPool, stdb: StdbClient, cas: CasStore) {
let whisper_url = std::env::var("WHISPER_URL")
.unwrap_or_else(|_| "http://faster-whisper:8000".to_string());
tokio::spawn(async move {
tracing::info!("Jobbkø-worker startet (poll-intervall: 2s)");
loop {
match dequeue(&db).await {
Ok(Some(job)) => {
tracing::info!(
job_id = %job.id,
job_type = %job.job_type,
attempt = job.attempts,
"Behandler jobb"
);
match dispatch(&job, &db, &stdb, &cas, &whisper_url).await {
Ok(result) => {
if let Err(e) = complete_job(&db, job.id, result).await {
tracing::error!(job_id = %job.id, error = %e, "Kunne ikke markere jobb som fullført");
} else {
tracing::info!(job_id = %job.id, "Jobb fullført");
}
}
Err(err) => {
tracing::error!(job_id = %job.id, error = %err, "Jobb feilet");
if let Err(e) = fail_job(&db, &job, &err).await {
tracing::error!(job_id = %job.id, error = %e, "Kunne ikke markere jobb som feilet");
}
}
}
}
Ok(None) => {
// Ingen ventende jobber — vent før neste poll
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
}
Err(e) => {
tracing::error!(error = %e, "Feil ved polling av jobbkø");
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
}
}
}
});
}