synops/maskinrommet/src/jobs.rs
vegard 26f03ef21d Trigger-evaluering i portvokteren (oppgave 24.2)
Ved node/edge-events fra PG LISTEN/NOTIFY evaluerer portvokteren nå
om noen orchestration-noder matcher triggeren. Implementert som non-blocking
async task som ikke blokkerer WebSocket-flyten.

Ny modul orchestration_trigger.rs:
- Mapper NOTIFY-events til trigger-typer (node.created, edge.created)
- Effektiv lookup via funksjonell B-tree-indeks på metadata->trigger->event
- Evaluerer observes-edges (eksplisitt) vs conditions (implisitt)
- Betingelser: node_kind, edge_type, has_trait, has_tag (AND-logikk)
- Legger matchende orkestreringer i jobbkøen som "orchestrate"-jobb

Ny migration 021: indeks for trigger-event lookup på orchestration-noder.
Jobbkø-dispatcher håndterer "orchestrate" med placeholder (24.3 implementerer utførelse).

Verifisert: content-node trigrer matching orchestration, communication-node hoppes over.
2026-03-18 16:53:59 +00:00

685 lines
24 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// Jobbkø — PostgreSQL-basert asynkron jobbehandling.
//
// Enkel polling-loop med SELECT ... FOR UPDATE SKIP LOCKED.
// Dispatching til handler-funksjoner basert på job_type.
// Ressursstyring: semaphore for concurrency, prioritetsregler fra PG,
// LiveKit-bevisst resource governor (oppgave 15.5).
//
// Ref: docs/infra/jobbkø.md
use chrono::{DateTime, Utc};
use serde::Serialize;
use sqlx::PgPool;
use std::sync::Arc;
use tokio::sync::Semaphore;
use uuid::Uuid;
use crate::agent;
use crate::ai_edges;
use crate::ai_process;
use crate::audio;
use crate::cas::CasStore;
use crate::cli_dispatch;
use crate::maintenance::MaintenanceState;
use crate::pg_writes;
use crate::publishing::IndexCache;
use crate::resources::{self, PriorityRules};
use crate::summarize;
use crate::transcribe;
use crate::tts;
/// Maks total CPU-vekt som kan kjøre samtidig.
/// Standard: 8 (passer for 8 vCPU der Whisper=5 + et lett kall=1+1+1).
const MAX_TOTAL_WEIGHT: i16 = 8;
/// Rad fra job_queue-tabellen.
#[derive(sqlx::FromRow, Debug)]
pub struct JobRow {
pub id: Uuid,
pub collection_node_id: Option<Uuid>,
pub job_type: String,
pub payload: serde_json::Value,
pub attempts: i16,
pub max_attempts: i16,
}
/// Legger en ny jobb i køen.
pub async fn enqueue(
db: &PgPool,
job_type: &str,
payload: serde_json::Value,
collection_node_id: Option<Uuid>,
priority: i16,
) -> Result<Uuid, sqlx::Error> {
let row = sqlx::query_scalar::<_, Uuid>(
r#"
INSERT INTO job_queue (job_type, payload, collection_node_id, priority)
VALUES ($1, $2, $3, $4)
RETURNING id
"#,
)
.bind(job_type)
.bind(&payload)
.bind(collection_node_id)
.bind(priority)
.fetch_one(db)
.await?;
tracing::info!(job_id = %row, job_type = %job_type, "Jobb lagt i kø");
Ok(row)
}
/// Henter neste ventende jobb (atomisk med FOR UPDATE SKIP LOCKED).
/// Setter status til 'running' og oppdaterer started_at.
async fn dequeue(db: &PgPool) -> Result<Option<JobRow>, sqlx::Error> {
let mut tx = db.begin().await?;
let job = sqlx::query_as::<_, JobRow>(
r#"
SELECT id, collection_node_id, job_type, payload, attempts, max_attempts
FROM job_queue
WHERE status IN ('pending', 'retry')
AND scheduled_for <= now()
ORDER BY priority DESC, scheduled_for ASC
LIMIT 1
FOR UPDATE SKIP LOCKED
"#,
)
.fetch_optional(&mut *tx)
.await?;
if let Some(ref job) = job {
sqlx::query(
"UPDATE job_queue SET status = 'running', started_at = now(), attempts = attempts + 1 WHERE id = $1",
)
.bind(job.id)
.execute(&mut *tx)
.await?;
}
tx.commit().await?;
Ok(job)
}
/// Markerer en jobb som fullført.
async fn complete_job(db: &PgPool, job_id: Uuid, result: serde_json::Value) -> Result<(), sqlx::Error> {
sqlx::query(
"UPDATE job_queue SET status = 'completed', result = $2, completed_at = now() WHERE id = $1",
)
.bind(job_id)
.bind(&result)
.execute(db)
.await?;
Ok(())
}
/// Markerer en jobb som feilet. Settes til 'retry' med backoff hvis forsøk gjenstår,
/// ellers 'error'.
async fn fail_job(db: &PgPool, job: &JobRow, error_msg: &str) -> Result<(), sqlx::Error> {
if job.attempts < job.max_attempts {
// Eksponentiell backoff: 30s × 2^(attempts-1)
let backoff_secs = 30i64 * 2i64.pow((job.attempts).max(0) as u32);
sqlx::query(
r#"
UPDATE job_queue
SET status = 'retry',
error_msg = $2,
scheduled_for = now() + ($3 || ' seconds')::interval
WHERE id = $1
"#,
)
.bind(job.id)
.bind(error_msg)
.bind(backoff_secs.to_string())
.execute(db)
.await?;
tracing::warn!(
job_id = %job.id,
attempt = job.attempts,
max = job.max_attempts,
backoff_secs = backoff_secs,
"Jobb feilet, retry planlagt"
);
} else {
sqlx::query(
"UPDATE job_queue SET status = 'error', error_msg = $2, completed_at = now() WHERE id = $1",
)
.bind(job.id)
.bind(error_msg)
.execute(db)
.await?;
tracing::error!(
job_id = %job.id,
attempts = job.attempts,
"Jobb permanent feilet"
);
}
Ok(())
}
/// Dispatcher — kjører riktig handler basert på job_type.
///
/// Alle handlere delegerer til CLI-verktøy (Command::new("synops-X"))
/// i tråd med Unix-filosofien. Unntaket er ai_process som fortsatt
/// kjører inline (mangler CLI-verktøy, planlagt i fremtidig oppgave).
///
/// Ref: docs/retninger/unix_filosofi.md
async fn dispatch(
job: &JobRow,
db: &PgPool,
cas: &CasStore,
index_cache: &IndexCache,
whisper_url: &str,
) -> Result<serde_json::Value, String> {
match job.job_type.as_str() {
"whisper_transcribe" => {
transcribe::handle_whisper_job(job, cas, whisper_url).await
}
"agent_respond" => {
agent::handle_agent_respond(job, db).await
}
"suggest_edges" => {
ai_edges::handle_suggest_edges(job, db).await
}
"summarize_communication" => {
summarize::handle_summarize_communication(job, db).await
}
"tts_generate" => {
tts::handle_tts_job(job, db).await
}
"audio_process" => {
audio::handle_audio_process_job(job, db, cas).await
}
"ai_process" => {
ai_process::handle_ai_process(job, db).await
}
"render_article" => {
handle_render_article(job, cas).await
}
"render_index" => {
handle_render_index(job, cas).await
}
// PG-skriveoperasjoner (oppgave 12.3): retry med backoff + dead letter queue
"pg_insert_node" => {
pg_writes::handle_insert_node(job, db).await
}
"pg_insert_edge" => {
pg_writes::handle_insert_edge(job, db, index_cache).await
}
"pg_update_node" => {
pg_writes::handle_update_node(job, db).await
}
"pg_delete_node" => {
pg_writes::handle_delete_node(job, db).await
}
"pg_delete_edge" => {
pg_writes::handle_delete_edge(job, db, index_cache).await
}
// Orchestration: trigger-evaluering har lagt jobben i kø,
// men utførelsen implementeres i oppgave 24.3.
// Foreløpig logger vi og returnerer OK.
"orchestrate" => {
let orch_id = job.payload.get("orchestration_id")
.and_then(|v| v.as_str())
.unwrap_or("ukjent");
tracing::info!(
orchestration_id = %orch_id,
"Orchestrate-jobb mottatt (utførelse kommer i oppgave 24.3)"
);
Ok(serde_json::json!({
"status": "pending_implementation",
"orchestration_id": orch_id,
"message": "Orchestration execution not yet implemented (task 24.3)"
}))
}
other => Err(format!("Ukjent jobbtype: {other}")),
}
}
/// Synops-render binary path.
fn render_bin() -> String {
std::env::var("SYNOPS_RENDER_BIN")
.unwrap_or_else(|_| "synops-render".to_string())
}
/// Handler for `render_article`-jobb — delegerer til synops-render CLI.
///
/// Payload: `{ "node_id": "...", "collection_id": "..." }`
/// CLI-verktøyet gjør Tera-rendering, CAS-lagring og metadata-oppdatering.
async fn handle_render_article(
job: &JobRow,
cas: &CasStore,
) -> Result<serde_json::Value, String> {
let node_id: Uuid = job
.payload
.get("node_id")
.and_then(|v| v.as_str())
.and_then(|s| s.parse().ok())
.ok_or("Mangler node_id i payload")?;
let collection_id: Uuid = job
.payload
.get("collection_id")
.and_then(|v| v.as_str())
.and_then(|s| s.parse().ok())
.ok_or("Mangler collection_id i payload")?;
let bin = render_bin();
let mut cmd = tokio::process::Command::new(&bin);
cmd.arg("--node-id").arg(node_id.to_string())
.arg("--collection-id").arg(collection_id.to_string())
.arg("--render-type").arg("article")
.arg("--write");
cli_dispatch::set_database_url(&mut cmd)?;
cmd.env("CAS_ROOT", cas.root().to_string_lossy().to_string());
tracing::info!(
node_id = %node_id,
collection_id = %collection_id,
bin = %bin,
"Starter synops-render (article)"
);
let result = cli_dispatch::run_cli_tool(&bin, &mut cmd).await?;
tracing::info!(
node_id = %node_id,
html_hash = result["html_hash"].as_str().unwrap_or("n/a"),
"synops-render (article) fullført"
);
Ok(result)
}
/// Handler for `render_index`-jobb — delegerer til synops-render CLI.
///
/// Payload: `{ "collection_id": "..." }`
/// CLI-verktøyet gjør Tera-rendering, CAS-lagring og metadata-oppdatering.
async fn handle_render_index(
job: &JobRow,
cas: &CasStore,
) -> Result<serde_json::Value, String> {
let collection_id: Uuid = job
.payload
.get("collection_id")
.and_then(|v| v.as_str())
.and_then(|s| s.parse().ok())
.ok_or("Mangler collection_id i payload")?;
let bin = render_bin();
let mut cmd = tokio::process::Command::new(&bin);
cmd.arg("--node-id").arg(collection_id.to_string())
.arg("--render-type").arg("index")
.arg("--write");
cli_dispatch::set_database_url(&mut cmd)?;
cmd.env("CAS_ROOT", cas.root().to_string_lossy().to_string());
tracing::info!(
collection_id = %collection_id,
bin = %bin,
"Starter synops-render (index)"
);
let result = cli_dispatch::run_cli_tool(&bin, &mut cmd).await?;
tracing::info!(
collection_id = %collection_id,
html_hash = result["html_hash"].as_str().unwrap_or("n/a"),
"synops-render (index) fullført"
);
Ok(result)
}
// =============================================================================
// Admin-API: spørring, retry og avbryt (oppgave 15.3)
// =============================================================================
/// Rad med alle felter for admin-oversikt.
#[derive(sqlx::FromRow, Debug, Serialize)]
pub struct JobDetail {
pub id: Uuid,
pub collection_node_id: Option<Uuid>,
pub job_type: String,
pub payload: serde_json::Value,
pub status: String,
pub priority: i16,
pub result: Option<serde_json::Value>,
pub error_msg: Option<String>,
pub attempts: i16,
pub max_attempts: i16,
pub created_at: DateTime<Utc>,
pub started_at: Option<DateTime<Utc>>,
pub completed_at: Option<DateTime<Utc>>,
pub scheduled_for: DateTime<Utc>,
}
/// Hent jobber med valgfri filtrering på status, type og samling.
/// Returnerer nyeste først, begrenset til `limit` rader.
pub async fn list_jobs(
db: &PgPool,
status_filter: Option<&str>,
type_filter: Option<&str>,
collection_filter: Option<Uuid>,
limit: i64,
offset: i64,
) -> Result<Vec<JobDetail>, sqlx::Error> {
// Bygg dynamisk WHERE-klausul
let mut conditions: Vec<String> = Vec::new();
if let Some(s) = status_filter {
conditions.push(format!("status = '{s}'"));
}
if let Some(t) = type_filter {
// Sanitize: kun alfanumeriske + underscore
let safe: String = t.chars().filter(|c| c.is_alphanumeric() || *c == '_').collect();
conditions.push(format!("job_type = '{safe}'"));
}
if let Some(cid) = collection_filter {
conditions.push(format!("collection_node_id = '{cid}'"));
}
let where_clause = if conditions.is_empty() {
String::new()
} else {
format!("WHERE {}", conditions.join(" AND "))
};
let query = format!(
r#"SELECT id, collection_node_id, job_type, payload,
status::text as status, priority, result, error_msg,
attempts, max_attempts, created_at, started_at,
completed_at, scheduled_for
FROM job_queue
{where_clause}
ORDER BY created_at DESC
LIMIT {limit} OFFSET {offset}"#
);
sqlx::query_as::<_, JobDetail>(&query)
.fetch_all(db)
.await
}
/// Tell jobber per status (for dashboard-oppsummering).
#[derive(sqlx::FromRow, Serialize, Debug)]
pub struct JobCountByStatus {
pub status: String,
pub count: i64,
}
pub async fn count_by_status(db: &PgPool) -> Result<Vec<JobCountByStatus>, sqlx::Error> {
sqlx::query_as::<_, JobCountByStatus>(
"SELECT status::text as status, count(*) as count FROM job_queue GROUP BY status"
)
.fetch_all(db)
.await
}
/// Hent distinkte jobbtyper (for filter-dropdown).
pub async fn distinct_job_types(db: &PgPool) -> Result<Vec<String>, sqlx::Error> {
sqlx::query_scalar::<_, String>(
"SELECT DISTINCT job_type FROM job_queue ORDER BY job_type"
)
.fetch_all(db)
.await
}
/// Sett en feilet jobb tilbake til 'pending' for manuell retry.
/// Kun jobber med status 'error' eller 'retry' kan retryes.
pub async fn retry_job(db: &PgPool, job_id: Uuid) -> Result<bool, sqlx::Error> {
let result = sqlx::query(
r#"UPDATE job_queue
SET status = 'pending',
error_msg = NULL,
scheduled_for = now(),
attempts = 0
WHERE id = $1
AND status IN ('error', 'retry')"#
)
.bind(job_id)
.execute(db)
.await?;
Ok(result.rows_affected() > 0)
}
/// Avbryt en ventende eller retry-jobb (sett til 'error' med melding).
/// Kjørende jobber kan ikke avbrytes via dette (de kjører allerede i en task).
pub async fn cancel_job(db: &PgPool, job_id: Uuid) -> Result<bool, sqlx::Error> {
let result = sqlx::query(
r#"UPDATE job_queue
SET status = 'error',
error_msg = 'Manuelt avbrutt av admin',
completed_at = now()
WHERE id = $1
AND status IN ('pending', 'retry')"#
)
.bind(job_id)
.execute(db)
.await?;
Ok(result.rows_affected() > 0)
}
/// Starter worker-loopen som poller job_queue.
/// Kjører som en bakgrunnsoppgave i tokio.
///
/// Ressursstyring (oppgave 15.5):
/// - Concurrency-kontroll via semaphore (maks 3 samtidige jobber)
/// - Prioritetsregler fra job_priority_rules-tabellen
/// - LiveKit-bevisst resource governor: nedprioriterer/blokkerer tunge
/// jobber når LiveKit-rom er aktive
/// - CPU-vektbasert kapasitetskontroll (MAX_TOTAL_WEIGHT)
///
/// Respekterer vedlikeholdsmodus: når `maintenance.is_active()` er true,
/// slutter workeren å dequeue nye jobber (kjørende jobber fullføres).
pub fn start_worker(
db: PgPool,
cas: CasStore,
index_cache: IndexCache,
maintenance: MaintenanceState,
priority_rules: PriorityRules,
) {
let whisper_url = std::env::var("WHISPER_URL")
.unwrap_or_else(|_| "http://faster-whisper:8000".to_string());
// Semaphore for maks 3 samtidige jobber (doc: --max-concurrent 3)
let semaphore = Arc::new(Semaphore::new(3));
// Cache LiveKit-status med 10-sekunders TTL for å unngå
// å spørre PG på hver poll-iterasjon
let livekit_cache: Arc<tokio::sync::RwLock<(bool, std::time::Instant)>> =
Arc::new(tokio::sync::RwLock::new((false, std::time::Instant::now())));
tokio::spawn(async move {
tracing::info!("Jobbkø-worker startet (poll: 2s, max-concurrent: 3, max-weight: {MAX_TOTAL_WEIGHT})");
loop {
// Sjekk vedlikeholdsmodus — ikke dequeue nye jobber
if maintenance.is_active() {
tracing::debug!("Vedlikeholdsmodus aktiv — jobbkø pauset");
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
continue;
}
// Sjekk om vi har ledig kapasitet (semaphore)
let permit = match semaphore.clone().try_acquire_owned() {
Ok(p) => p,
Err(_) => {
// Alle 3 slots er opptatt — vent litt
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
continue;
}
};
// Sjekk total CPU-vekt
let current_weight = resources::total_running_weight(&db, &priority_rules).await;
if current_weight >= MAX_TOTAL_WEIGHT {
drop(permit);
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
continue;
}
// Oppdater LiveKit-cache (TTL: 10 sekunder)
let livekit_active = {
let cache = livekit_cache.read().await;
if cache.1.elapsed() < std::time::Duration::from_secs(10) {
cache.0
} else {
drop(cache);
let active = resources::has_active_livekit_rooms(&db).await;
let mut cache = livekit_cache.write().await;
*cache = (active, std::time::Instant::now());
active
}
};
match dequeue(&db).await {
Ok(Some(job)) => {
// Sjekk prioritetsregler for denne jobbtypen
let rule = priority_rules.get(&job.job_type).await;
let decision = resources::evaluate_job(&rule, livekit_active);
// Sjekk om jobben skal utsettes (LiveKit-blokkering)
if decision.should_defer {
tracing::info!(
job_id = %job.id,
job_type = %job.job_type,
"Jobb utsatt — blokkert under aktiv LiveKit-sesjon"
);
// Sett tilbake til pending med kort delay
let _ = sqlx::query(
"UPDATE job_queue SET status = 'pending', started_at = NULL, attempts = attempts - 1, scheduled_for = now() + interval '30 seconds' WHERE id = $1"
)
.bind(job.id)
.execute(&db)
.await;
drop(permit);
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
continue;
}
// Sjekk max_concurrent per type
if rule.max_concurrent > 0 {
let running = resources::count_running_of_type(&db, &job.job_type)
.await
.unwrap_or(0);
// running inkluderer den vi nettopp satte til 'running',
// så sjekk mot max_concurrent
if running > rule.max_concurrent as i64 {
tracing::debug!(
job_type = %job.job_type,
running = running,
max = rule.max_concurrent,
"Max concurrent nådd for jobbtype — utsetter"
);
let _ = sqlx::query(
"UPDATE job_queue SET status = 'pending', started_at = NULL, attempts = attempts - 1, scheduled_for = now() + interval '5 seconds' WHERE id = $1"
)
.bind(job.id)
.execute(&db)
.await;
drop(permit);
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
continue;
}
}
// Sjekk at ny jobb ikke sprenger vektgrensen
if current_weight + rule.cpu_weight > MAX_TOTAL_WEIGHT {
tracing::debug!(
job_type = %job.job_type,
weight = rule.cpu_weight,
current = current_weight,
max = MAX_TOTAL_WEIGHT,
"CPU-vektgrense nådd — utsetter tung jobb"
);
let _ = sqlx::query(
"UPDATE job_queue SET status = 'pending', started_at = NULL, attempts = attempts - 1, scheduled_for = now() + interval '5 seconds' WHERE id = $1"
)
.bind(job.id)
.execute(&db)
.await;
drop(permit);
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
continue;
}
tracing::info!(
job_id = %job.id,
job_type = %job.job_type,
attempt = job.attempts,
weight = rule.cpu_weight,
livekit = livekit_active,
"Behandler jobb"
);
// Kjør jobben i en egen tokio-task (frigjør poll-loopen)
let db2 = db.clone();
let cas2 = cas.clone();
let index_cache2 = index_cache.clone();
let whisper_url2 = whisper_url.clone();
let timeout_secs = if rule.timeout_seconds > 0 {
rule.timeout_seconds as u64
} else {
600 // 10 min default
};
tokio::spawn(async move {
// Hold semaphore-permit til jobben er ferdig
let _permit = permit;
let result = tokio::time::timeout(
std::time::Duration::from_secs(timeout_secs),
dispatch(&job, &db2, &cas2, &index_cache2, &whisper_url2),
)
.await;
match result {
Ok(Ok(res)) => {
if let Err(e) = complete_job(&db2, job.id, res).await {
tracing::error!(job_id = %job.id, error = %e, "Kunne ikke markere jobb som fullført");
} else {
tracing::info!(job_id = %job.id, "Jobb fullført");
}
}
Ok(Err(err)) => {
tracing::error!(job_id = %job.id, error = %err, "Jobb feilet");
if let Err(e) = fail_job(&db2, &job, &err).await {
tracing::error!(job_id = %job.id, error = %e, "Kunne ikke markere jobb som feilet");
}
}
Err(_) => {
let msg = format!("Jobb tidsavbrutt etter {timeout_secs}s");
tracing::error!(job_id = %job.id, "{msg}");
if let Err(e) = fail_job(&db2, &job, &msg).await {
tracing::error!(job_id = %job.id, error = %e, "Kunne ikke markere tidsavbrutt jobb");
}
}
}
});
// Ikke vent — poll umiddelbart for neste jobb
continue;
}
Ok(None) => {
drop(permit);
// Ingen ventende jobber — vent før neste poll
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
}
Err(e) => {
drop(permit);
tracing::error!(error = %e, "Feil ved polling av jobbkø");
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
}
}
}
});
}