synops/maskinrommet/src/jobs.rs
vegard 3c1d85026b AI-oppsummering av kommunikasjonsnoder (oppgave 10.3)
Ny jobbtype `summarize_communication` som henter alle meldinger fra
en kommunikasjonsnode, sender dem til LiteLLM for oppsummering, og
oppretter en content-node med sammendraget. Sammendraget knyttes til
kommunikasjonsnoden med `belongs_to`-edge (del av samtalen) og
`summary`-edge (lett å finne sammendrag for en gitt samtale).

API-endepunkt: POST /intentions/summarize { communication_id }
Verifiserer at brukeren er deltaker i samtalen. Jobbprioritiet 3
(bakgrunn). Modell konfigurerbar via AI_SUMMARY_MODEL env-variabel.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-17 23:31:16 +00:00

216 lines
6.6 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// Jobbkø — PostgreSQL-basert asynkron jobbehandling.
//
// Enkel polling-loop med SELECT ... FOR UPDATE SKIP LOCKED.
// Dispatching til handler-funksjoner basert på job_type.
//
// Ref: docs/infra/jobbkø.md
use sqlx::PgPool;
use uuid::Uuid;
use crate::agent;
use crate::ai_edges;
use crate::cas::CasStore;
use crate::stdb::StdbClient;
use crate::summarize;
use crate::transcribe;
/// Rad fra job_queue-tabellen.
#[derive(sqlx::FromRow, Debug)]
pub struct JobRow {
pub id: Uuid,
pub collection_node_id: Option<Uuid>,
pub job_type: String,
pub payload: serde_json::Value,
pub attempts: i16,
pub max_attempts: i16,
}
/// Legger en ny jobb i køen.
pub async fn enqueue(
db: &PgPool,
job_type: &str,
payload: serde_json::Value,
collection_node_id: Option<Uuid>,
priority: i16,
) -> Result<Uuid, sqlx::Error> {
let row = sqlx::query_scalar::<_, Uuid>(
r#"
INSERT INTO job_queue (job_type, payload, collection_node_id, priority)
VALUES ($1, $2, $3, $4)
RETURNING id
"#,
)
.bind(job_type)
.bind(&payload)
.bind(collection_node_id)
.bind(priority)
.fetch_one(db)
.await?;
tracing::info!(job_id = %row, job_type = %job_type, "Jobb lagt i kø");
Ok(row)
}
/// Henter neste ventende jobb (atomisk med FOR UPDATE SKIP LOCKED).
/// Setter status til 'running' og oppdaterer started_at.
async fn dequeue(db: &PgPool) -> Result<Option<JobRow>, sqlx::Error> {
let mut tx = db.begin().await?;
let job = sqlx::query_as::<_, JobRow>(
r#"
SELECT id, collection_node_id, job_type, payload, attempts, max_attempts
FROM job_queue
WHERE status IN ('pending', 'retry')
AND scheduled_for <= now()
ORDER BY priority DESC, scheduled_for ASC
LIMIT 1
FOR UPDATE SKIP LOCKED
"#,
)
.fetch_optional(&mut *tx)
.await?;
if let Some(ref job) = job {
sqlx::query(
"UPDATE job_queue SET status = 'running', started_at = now(), attempts = attempts + 1 WHERE id = $1",
)
.bind(job.id)
.execute(&mut *tx)
.await?;
}
tx.commit().await?;
Ok(job)
}
/// Markerer en jobb som fullført.
async fn complete_job(db: &PgPool, job_id: Uuid, result: serde_json::Value) -> Result<(), sqlx::Error> {
sqlx::query(
"UPDATE job_queue SET status = 'completed', result = $2, completed_at = now() WHERE id = $1",
)
.bind(job_id)
.bind(&result)
.execute(db)
.await?;
Ok(())
}
/// Markerer en jobb som feilet. Settes til 'retry' med backoff hvis forsøk gjenstår,
/// ellers 'error'.
async fn fail_job(db: &PgPool, job: &JobRow, error_msg: &str) -> Result<(), sqlx::Error> {
if job.attempts < job.max_attempts {
// Eksponentiell backoff: 30s × 2^(attempts-1)
let backoff_secs = 30i64 * 2i64.pow((job.attempts).max(0) as u32);
sqlx::query(
r#"
UPDATE job_queue
SET status = 'retry',
error_msg = $2,
scheduled_for = now() + ($3 || ' seconds')::interval
WHERE id = $1
"#,
)
.bind(job.id)
.bind(error_msg)
.bind(backoff_secs.to_string())
.execute(db)
.await?;
tracing::warn!(
job_id = %job.id,
attempt = job.attempts,
max = job.max_attempts,
backoff_secs = backoff_secs,
"Jobb feilet, retry planlagt"
);
} else {
sqlx::query(
"UPDATE job_queue SET status = 'error', error_msg = $2, completed_at = now() WHERE id = $1",
)
.bind(job.id)
.bind(error_msg)
.execute(db)
.await?;
tracing::error!(
job_id = %job.id,
attempts = job.attempts,
"Jobb permanent feilet"
);
}
Ok(())
}
/// Dispatcher — kjører riktig handler basert på job_type.
async fn dispatch(
job: &JobRow,
db: &PgPool,
stdb: &StdbClient,
cas: &CasStore,
whisper_url: &str,
) -> Result<serde_json::Value, String> {
match job.job_type.as_str() {
"whisper_transcribe" => {
transcribe::handle_whisper_job(job, db, stdb, cas, whisper_url).await
}
"agent_respond" => {
agent::handle_agent_respond(job, db, stdb).await
}
"suggest_edges" => {
ai_edges::handle_suggest_edges(job, db, stdb).await
}
"summarize_communication" => {
summarize::handle_summarize_communication(job, db, stdb).await
}
other => Err(format!("Ukjent jobbtype: {other}")),
}
}
/// Starter worker-loopen som poller job_queue.
/// Kjører som en bakgrunnsoppgave i tokio.
pub fn start_worker(db: PgPool, stdb: StdbClient, cas: CasStore) {
let whisper_url = std::env::var("WHISPER_URL")
.unwrap_or_else(|_| "http://faster-whisper:8000".to_string());
tokio::spawn(async move {
tracing::info!("Jobbkø-worker startet (poll-intervall: 2s)");
loop {
match dequeue(&db).await {
Ok(Some(job)) => {
tracing::info!(
job_id = %job.id,
job_type = %job.job_type,
attempt = job.attempts,
"Behandler jobb"
);
match dispatch(&job, &db, &stdb, &cas, &whisper_url).await {
Ok(result) => {
if let Err(e) = complete_job(&db, job.id, result).await {
tracing::error!(job_id = %job.id, error = %e, "Kunne ikke markere jobb som fullført");
} else {
tracing::info!(job_id = %job.id, "Jobb fullført");
}
}
Err(err) => {
tracing::error!(job_id = %job.id, error = %err, "Jobb feilet");
if let Err(e) = fail_job(&db, &job, &err).await {
tracing::error!(job_id = %job.id, error = %e, "Kunne ikke markere jobb som feilet");
}
}
}
}
Ok(None) => {
// Ingen ventende jobber — vent før neste poll
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
}
Err(e) => {
tracing::error!(error = %e, "Feil ved polling av jobbkø");
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
}
}
}
});
}