Ny jobbtype `suggest_edges` som automatisk trigges ved opprettelse av content-noder med tilstrekkelig tekst (≥20 tegn). Sender innholdet til LiteLLM (sidelinja/rutine) via AI Gateway, parser JSON-respons med topics og mentions, og oppretter topic-noder + mentions-edges i grafen. Flyten: 1. create_node oppdager content-node med nok tekst → enqueue suggest_edges 2. Worker henter node-innhold og eksisterende topics fra PG 3. LLM analyserer tekst og returnerer foreslåtte topics/mentions 4. Nye topic-noder opprettes (med ai_generated-flagg i metadata) 5. mentions-edges opprettes fra innholdsnode til topic/entitet-noder 6. Deduplisering: gjenbruker eksisterende topics ved case-insensitivt match Filer: - maskinrommet/src/ai_edges.rs: Ny modul med LLM-kall og edge-opprettelse - maskinrommet/src/jobs.rs: suggest_edges registrert i dispatcher - maskinrommet/src/intentions.rs: Trigger i create_node - docs/: Oppdatert jobbkø og AI gateway-docs med ny jobbtype NB: Krever gyldig API-nøkkel i LiteLLM (OpenRouter/Gemini/Anthropic). Jobben feiler gracefully med retry+backoff ved manglende nøkkel. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
212 lines
6.5 KiB
Rust
212 lines
6.5 KiB
Rust
// Jobbkø — PostgreSQL-basert asynkron jobbehandling.
|
||
//
|
||
// Enkel polling-loop med SELECT ... FOR UPDATE SKIP LOCKED.
|
||
// Dispatching til handler-funksjoner basert på job_type.
|
||
//
|
||
// Ref: docs/infra/jobbkø.md
|
||
|
||
use sqlx::PgPool;
|
||
use uuid::Uuid;
|
||
|
||
use crate::agent;
|
||
use crate::ai_edges;
|
||
use crate::cas::CasStore;
|
||
use crate::stdb::StdbClient;
|
||
use crate::transcribe;
|
||
|
||
/// Rad fra job_queue-tabellen.
|
||
#[derive(sqlx::FromRow, Debug)]
|
||
pub struct JobRow {
|
||
pub id: Uuid,
|
||
pub collection_node_id: Option<Uuid>,
|
||
pub job_type: String,
|
||
pub payload: serde_json::Value,
|
||
pub attempts: i16,
|
||
pub max_attempts: i16,
|
||
}
|
||
|
||
/// Legger en ny jobb i køen.
|
||
pub async fn enqueue(
|
||
db: &PgPool,
|
||
job_type: &str,
|
||
payload: serde_json::Value,
|
||
collection_node_id: Option<Uuid>,
|
||
priority: i16,
|
||
) -> Result<Uuid, sqlx::Error> {
|
||
let row = sqlx::query_scalar::<_, Uuid>(
|
||
r#"
|
||
INSERT INTO job_queue (job_type, payload, collection_node_id, priority)
|
||
VALUES ($1, $2, $3, $4)
|
||
RETURNING id
|
||
"#,
|
||
)
|
||
.bind(job_type)
|
||
.bind(&payload)
|
||
.bind(collection_node_id)
|
||
.bind(priority)
|
||
.fetch_one(db)
|
||
.await?;
|
||
|
||
tracing::info!(job_id = %row, job_type = %job_type, "Jobb lagt i kø");
|
||
Ok(row)
|
||
}
|
||
|
||
/// Henter neste ventende jobb (atomisk med FOR UPDATE SKIP LOCKED).
|
||
/// Setter status til 'running' og oppdaterer started_at.
|
||
async fn dequeue(db: &PgPool) -> Result<Option<JobRow>, sqlx::Error> {
|
||
let mut tx = db.begin().await?;
|
||
|
||
let job = sqlx::query_as::<_, JobRow>(
|
||
r#"
|
||
SELECT id, collection_node_id, job_type, payload, attempts, max_attempts
|
||
FROM job_queue
|
||
WHERE status IN ('pending', 'retry')
|
||
AND scheduled_for <= now()
|
||
ORDER BY priority DESC, scheduled_for ASC
|
||
LIMIT 1
|
||
FOR UPDATE SKIP LOCKED
|
||
"#,
|
||
)
|
||
.fetch_optional(&mut *tx)
|
||
.await?;
|
||
|
||
if let Some(ref job) = job {
|
||
sqlx::query(
|
||
"UPDATE job_queue SET status = 'running', started_at = now(), attempts = attempts + 1 WHERE id = $1",
|
||
)
|
||
.bind(job.id)
|
||
.execute(&mut *tx)
|
||
.await?;
|
||
}
|
||
|
||
tx.commit().await?;
|
||
Ok(job)
|
||
}
|
||
|
||
/// Markerer en jobb som fullført.
|
||
async fn complete_job(db: &PgPool, job_id: Uuid, result: serde_json::Value) -> Result<(), sqlx::Error> {
|
||
sqlx::query(
|
||
"UPDATE job_queue SET status = 'completed', result = $2, completed_at = now() WHERE id = $1",
|
||
)
|
||
.bind(job_id)
|
||
.bind(&result)
|
||
.execute(db)
|
||
.await?;
|
||
Ok(())
|
||
}
|
||
|
||
/// Markerer en jobb som feilet. Settes til 'retry' med backoff hvis forsøk gjenstår,
|
||
/// ellers 'error'.
|
||
async fn fail_job(db: &PgPool, job: &JobRow, error_msg: &str) -> Result<(), sqlx::Error> {
|
||
if job.attempts < job.max_attempts {
|
||
// Eksponentiell backoff: 30s × 2^(attempts-1)
|
||
let backoff_secs = 30i64 * 2i64.pow((job.attempts).max(0) as u32);
|
||
sqlx::query(
|
||
r#"
|
||
UPDATE job_queue
|
||
SET status = 'retry',
|
||
error_msg = $2,
|
||
scheduled_for = now() + ($3 || ' seconds')::interval
|
||
WHERE id = $1
|
||
"#,
|
||
)
|
||
.bind(job.id)
|
||
.bind(error_msg)
|
||
.bind(backoff_secs.to_string())
|
||
.execute(db)
|
||
.await?;
|
||
|
||
tracing::warn!(
|
||
job_id = %job.id,
|
||
attempt = job.attempts,
|
||
max = job.max_attempts,
|
||
backoff_secs = backoff_secs,
|
||
"Jobb feilet, retry planlagt"
|
||
);
|
||
} else {
|
||
sqlx::query(
|
||
"UPDATE job_queue SET status = 'error', error_msg = $2, completed_at = now() WHERE id = $1",
|
||
)
|
||
.bind(job.id)
|
||
.bind(error_msg)
|
||
.execute(db)
|
||
.await?;
|
||
|
||
tracing::error!(
|
||
job_id = %job.id,
|
||
attempts = job.attempts,
|
||
"Jobb permanent feilet"
|
||
);
|
||
}
|
||
Ok(())
|
||
}
|
||
|
||
/// Dispatcher — kjører riktig handler basert på job_type.
|
||
async fn dispatch(
|
||
job: &JobRow,
|
||
db: &PgPool,
|
||
stdb: &StdbClient,
|
||
cas: &CasStore,
|
||
whisper_url: &str,
|
||
) -> Result<serde_json::Value, String> {
|
||
match job.job_type.as_str() {
|
||
"whisper_transcribe" => {
|
||
transcribe::handle_whisper_job(job, db, stdb, cas, whisper_url).await
|
||
}
|
||
"agent_respond" => {
|
||
agent::handle_agent_respond(job, db, stdb).await
|
||
}
|
||
"suggest_edges" => {
|
||
ai_edges::handle_suggest_edges(job, db, stdb).await
|
||
}
|
||
other => Err(format!("Ukjent jobbtype: {other}")),
|
||
}
|
||
}
|
||
|
||
/// Starter worker-loopen som poller job_queue.
|
||
/// Kjører som en bakgrunnsoppgave i tokio.
|
||
pub fn start_worker(db: PgPool, stdb: StdbClient, cas: CasStore) {
|
||
let whisper_url = std::env::var("WHISPER_URL")
|
||
.unwrap_or_else(|_| "http://faster-whisper:8000".to_string());
|
||
|
||
tokio::spawn(async move {
|
||
tracing::info!("Jobbkø-worker startet (poll-intervall: 2s)");
|
||
|
||
loop {
|
||
match dequeue(&db).await {
|
||
Ok(Some(job)) => {
|
||
tracing::info!(
|
||
job_id = %job.id,
|
||
job_type = %job.job_type,
|
||
attempt = job.attempts,
|
||
"Behandler jobb"
|
||
);
|
||
|
||
match dispatch(&job, &db, &stdb, &cas, &whisper_url).await {
|
||
Ok(result) => {
|
||
if let Err(e) = complete_job(&db, job.id, result).await {
|
||
tracing::error!(job_id = %job.id, error = %e, "Kunne ikke markere jobb som fullført");
|
||
} else {
|
||
tracing::info!(job_id = %job.id, "Jobb fullført");
|
||
}
|
||
}
|
||
Err(err) => {
|
||
tracing::error!(job_id = %job.id, error = %err, "Jobb feilet");
|
||
if let Err(e) = fail_job(&db, &job, &err).await {
|
||
tracing::error!(job_id = %job.id, error = %e, "Kunne ikke markere jobb som feilet");
|
||
}
|
||
}
|
||
}
|
||
}
|
||
Ok(None) => {
|
||
// Ingen ventende jobber — vent før neste poll
|
||
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
|
||
}
|
||
Err(e) => {
|
||
tracing::error!(error = %e, "Feil ved polling av jobbkø");
|
||
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
|
||
}
|
||
}
|
||
}
|
||
});
|
||
}
|