synops/maskinrommet/src/jobs.rs
vegard 6370b02cc7 26.7 ferdig: utgående varsler med brukerpreferanser
Vaktmesteren kan nå sende epost-varsler og WebSocket-push til brukere
via synops-notify, med respekt for brukerens preferanser.

Endringer:
- jobs.rs: send_notification jobbtype som delegerer til synops-notify CLI
- synops-notify: preferansesjekk fra metadata.preferences.notifications
  (opt-out-modell, per-kanal og per-type bryter, --skip-preferences)
- intentions.rs: POST /intentions/send_notification (admin-only)
- Dokumentasjon: docs/features/varsler.md

Preferanseskjema (i brukernodens metadata):
  preferences.notifications.email: bool (global epost-bryter)
  preferences.notifications.ws: bool (global WS-bryter)
  preferences.notifications.<type>: bool (per-type, f.eks. task_assigned)
2026-03-19 02:08:00 +00:00

918 lines
31 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// Jobbkø — PostgreSQL-basert asynkron jobbehandling.
//
// Enkel polling-loop med SELECT ... FOR UPDATE SKIP LOCKED.
// Dispatching til handler-funksjoner basert på job_type.
// Ressursstyring: semaphore for concurrency, prioritetsregler fra PG,
// LiveKit-bevisst resource governor (oppgave 15.5).
//
// Ref: docs/infra/jobbkø.md
use chrono::{DateTime, Utc};
use serde::Serialize;
use sqlx::PgPool;
use std::sync::Arc;
use tokio::sync::Semaphore;
use uuid::Uuid;
use crate::agent;
use crate::ai_edges;
use crate::ai_process;
use crate::audio;
use crate::cas::CasStore;
use crate::cli_dispatch;
use crate::clip;
use crate::maintenance::MaintenanceState;
use crate::pg_writes;
use crate::publishing::IndexCache;
use crate::resources::{self, PriorityRules};
use crate::script_compiler;
use crate::script_executor;
use crate::summarize;
use crate::transcribe;
use crate::tts;
/// Maks total CPU-vekt som kan kjøre samtidig.
/// Standard: 8 (passer for 8 vCPU der Whisper=5 + et lett kall=1+1+1).
const MAX_TOTAL_WEIGHT: i16 = 8;
/// Rad fra job_queue-tabellen.
#[derive(sqlx::FromRow, Debug)]
pub struct JobRow {
pub id: Uuid,
pub collection_node_id: Option<Uuid>,
pub job_type: String,
pub payload: serde_json::Value,
pub attempts: i16,
pub max_attempts: i16,
}
/// Legger en ny jobb i køen.
pub async fn enqueue(
db: &PgPool,
job_type: &str,
payload: serde_json::Value,
collection_node_id: Option<Uuid>,
priority: i16,
) -> Result<Uuid, sqlx::Error> {
let row = sqlx::query_scalar::<_, Uuid>(
r#"
INSERT INTO job_queue (job_type, payload, collection_node_id, priority)
VALUES ($1, $2, $3, $4)
RETURNING id
"#,
)
.bind(job_type)
.bind(&payload)
.bind(collection_node_id)
.bind(priority)
.fetch_one(db)
.await?;
tracing::info!(job_id = %row, job_type = %job_type, "Jobb lagt i kø");
Ok(row)
}
/// Henter neste ventende jobb (atomisk med FOR UPDATE SKIP LOCKED).
/// Setter status til 'running' og oppdaterer started_at.
async fn dequeue(db: &PgPool) -> Result<Option<JobRow>, sqlx::Error> {
let mut tx = db.begin().await?;
let job = sqlx::query_as::<_, JobRow>(
r#"
SELECT id, collection_node_id, job_type, payload, attempts, max_attempts
FROM job_queue
WHERE status IN ('pending', 'retry')
AND scheduled_for <= now()
ORDER BY priority DESC, scheduled_for ASC
LIMIT 1
FOR UPDATE SKIP LOCKED
"#,
)
.fetch_optional(&mut *tx)
.await?;
if let Some(ref job) = job {
sqlx::query(
"UPDATE job_queue SET status = 'running', started_at = now(), attempts = attempts + 1 WHERE id = $1",
)
.bind(job.id)
.execute(&mut *tx)
.await?;
}
tx.commit().await?;
Ok(job)
}
/// Markerer en jobb som fullført.
async fn complete_job(db: &PgPool, job_id: Uuid, result: serde_json::Value) -> Result<(), sqlx::Error> {
sqlx::query(
"UPDATE job_queue SET status = 'completed', result = $2, completed_at = now() WHERE id = $1",
)
.bind(job_id)
.bind(&result)
.execute(db)
.await?;
Ok(())
}
/// Markerer en jobb som feilet. Settes til 'retry' med backoff hvis forsøk gjenstår,
/// ellers 'error'.
async fn fail_job(db: &PgPool, job: &JobRow, error_msg: &str) -> Result<(), sqlx::Error> {
if job.attempts < job.max_attempts {
// Eksponentiell backoff: 30s × 2^(attempts-1)
let backoff_secs = 30i64 * 2i64.pow((job.attempts).max(0) as u32);
sqlx::query(
r#"
UPDATE job_queue
SET status = 'retry',
error_msg = $2,
scheduled_for = now() + ($3 || ' seconds')::interval
WHERE id = $1
"#,
)
.bind(job.id)
.bind(error_msg)
.bind(backoff_secs.to_string())
.execute(db)
.await?;
tracing::warn!(
job_id = %job.id,
attempt = job.attempts,
max = job.max_attempts,
backoff_secs = backoff_secs,
"Jobb feilet, retry planlagt"
);
} else {
sqlx::query(
"UPDATE job_queue SET status = 'error', error_msg = $2, completed_at = now() WHERE id = $1",
)
.bind(job.id)
.bind(error_msg)
.execute(db)
.await?;
tracing::error!(
job_id = %job.id,
attempts = job.attempts,
"Jobb permanent feilet"
);
}
Ok(())
}
/// Dispatcher — kjører riktig handler basert på job_type.
///
/// Alle handlere delegerer til CLI-verktøy (Command::new("synops-X"))
/// i tråd med Unix-filosofien. Unntaket er ai_process som fortsatt
/// kjører inline (mangler CLI-verktøy, planlagt i fremtidig oppgave).
///
/// Ref: docs/retninger/unix_filosofi.md
async fn dispatch(
job: &JobRow,
db: &PgPool,
cas: &CasStore,
index_cache: &IndexCache,
whisper_url: &str,
) -> Result<serde_json::Value, String> {
match job.job_type.as_str() {
"whisper_transcribe" => {
transcribe::handle_whisper_job(job, cas, whisper_url).await
}
"agent_respond" => {
agent::handle_agent_respond(job, db).await
}
"suggest_edges" => {
ai_edges::handle_suggest_edges(job, db).await
}
"summarize_communication" => {
summarize::handle_summarize_communication(job, db).await
}
"tts_generate" => {
tts::handle_tts_job(job, db).await
}
"audio_process" => {
audio::handle_audio_process_job(job, db, cas).await
}
"ai_process" => {
ai_process::handle_ai_process(job, db).await
}
"render_article" => {
handle_render_article(job, cas).await
}
"render_index" => {
handle_render_index(job, cas).await
}
// PG-skriveoperasjoner (oppgave 12.3): retry med backoff + dead letter queue
"pg_insert_node" => {
pg_writes::handle_insert_node(job, db).await
}
"pg_insert_edge" => {
pg_writes::handle_insert_edge(job, db, index_cache).await
}
"pg_update_node" => {
pg_writes::handle_update_node(job, db).await
}
"pg_delete_node" => {
pg_writes::handle_delete_node(job, db).await
}
"pg_delete_edge" => {
pg_writes::handle_delete_edge(job, db, index_cache).await
}
"clip_url" => {
clip::handle_clip_url(job, db).await
}
"describe_image" => {
crate::describe_image::handle_describe_image(job, db, cas).await
}
// Feed-polling: periodisk RSS/Atom-abonnement (oppgave 29.3)
"feed_poll" => {
crate::feed_poller::handle_feed_poll(job, db).await
}
// Calendar-polling: periodisk CalDAV/ICS-abonnement (oppgave 29.12)
"calendar_poll" => {
crate::calendar_poller::handle_calendar_poll(job, db).await
}
// Orchestration: trigger-evaluering har lagt jobben i kø.
// Kompilatoren parser scriptet og validerer det.
// Utførelse av kompilert script kommer i oppgave 24.5.
"orchestrate" => {
handle_orchestrate(job, db).await
}
// Podcast-import fra RSS-feed (oppgave 30.7)
"import_podcast" => {
crate::podcast_import::handle_import_podcast(job).await
}
// Utgående varsler (oppgave 26.7): delegerer til synops-notify CLI
"send_notification" => {
handle_send_notification(job).await
}
other => Err(format!("Ukjent jobbtype: {other}")),
}
}
/// Handler for `orchestrate`-jobb — kompilerer og utfører orchestration-script.
///
/// Flyt:
/// 1. Henter orchestration-nodens `content` (menneskelig script)
/// 2. Kompilerer via script_compiler → lagrer pipeline i metadata
/// 3. Substituerer {event.*}-variabler fra trigger-kontekst
/// 4. Utfører steg sekvensielt via generisk dispatch (script_executor)
/// 5. VED_FEIL-håndtering: steg-fallback → global fallback → stopp
/// 6. Logger hvert steg i orchestration_log
async fn handle_orchestrate(
job: &JobRow,
db: &PgPool,
) -> Result<serde_json::Value, String> {
let orch_id: Uuid = job
.payload
.get("orchestration_id")
.and_then(|v| v.as_str())
.and_then(|s| s.parse().ok())
.ok_or("Mangler orchestration_id i payload")?;
tracing::info!(orchestration_id = %orch_id, "Kompilerer orchestration-script");
// Hent orchestration-nodens content og metadata
let row = sqlx::query_as::<_, (Option<String>, serde_json::Value)>(
"SELECT content, metadata FROM nodes WHERE id = $1 AND node_kind = 'orchestration'"
)
.bind(orch_id)
.fetch_optional(db)
.await
.map_err(|e| format!("Feil ved henting av orchestration-node: {e}"))?
.ok_or_else(|| format!("Orchestration-node {orch_id} finnes ikke"))?;
let (content, _metadata) = row;
let script = content.ok_or("Orchestration-node mangler content (script)")?;
if script.trim().is_empty() {
return Err("Orchestration-script er tomt".into());
}
// Parse scriptet
let parsed = script_compiler::parse(&script)
.map_err(|e| format!("Parse-feil: {e}"))?;
// Hent verktøyregister fra cli_tool-noder
let registry = script_compiler::load_tool_registry(db).await?;
// Kompiler
let result = script_compiler::compile(&parsed, &registry);
// Logg rapport
let report = result.format_report();
tracing::info!(
orchestration_id = %orch_id,
errors = result.has_errors(),
"\n{report}"
);
if result.has_errors() {
// Lagre feilrapport i metadata for UI-visning
let diagnostics_json = serde_json::to_value(&result.diagnostics)
.map_err(|e| format!("Serialiseringsfeil: {e}"))?;
sqlx::query(
r#"UPDATE nodes
SET metadata = jsonb_set(
jsonb_set(metadata, '{compile_errors}', $2),
'{compiled}', 'false'
)
WHERE id = $1"#,
)
.bind(orch_id)
.bind(&diagnostics_json)
.execute(db)
.await
.map_err(|e| format!("Feil ved lagring av kompileringsfeil: {e}"))?;
return Err(format!("Kompilering feilet:\n{report}"));
}
// Suksess — lagre kompilert pipeline i metadata
let compiled = result.compiled.as_ref().unwrap();
let pipeline_json = serde_json::to_value(&compiled.steps)
.map_err(|e| format!("Serialiseringsfeil: {e}"))?;
let global_fb_json = compiled
.global_fallback
.as_ref()
.map(|fb| serde_json::to_value(fb).unwrap_or_default())
.unwrap_or(serde_json::Value::Null);
sqlx::query(
r#"UPDATE nodes
SET metadata = jsonb_set(
jsonb_set(
jsonb_set(
metadata - 'compile_errors',
'{pipeline}', $2
),
'{compiled}', 'true'
),
'{global_fallback}', $3
)
WHERE id = $1"#,
)
.bind(orch_id)
.bind(&pipeline_json)
.bind(&global_fb_json)
.execute(db)
.await
.map_err(|e| format!("Feil ved lagring av kompilert pipeline: {e}"))?;
tracing::info!(
orchestration_id = %orch_id,
steps = compiled.steps.len(),
"Orchestration-script kompilert — starter utførelse"
);
// === Utførelse (oppgave 24.5) ===
// Bygg ExecutionContext fra trigger_context i payload
let exec_ctx = script_executor::ExecutionContext::from_payload(&job.payload);
let pipeline_result = script_executor::execute_pipeline(
db,
orch_id,
job.id,
&compiled.steps,
compiled.global_fallback.as_ref(),
&exec_ctx,
)
.await;
tracing::info!(
orchestration_id = %orch_id,
steps_run = pipeline_result.steps_run,
steps_ok = pipeline_result.steps_ok,
steps_failed = pipeline_result.steps_failed,
aborted = pipeline_result.aborted,
"Pipeline utført"
);
if pipeline_result.aborted {
return Err(format!(
"Pipeline avbrutt etter {}/{} steg: {}",
pipeline_result.steps_ok,
pipeline_result.steps_run,
pipeline_result.error.as_deref().unwrap_or("ukjent feil"),
));
}
let result_json = serde_json::json!({
"status": "executed",
"orchestration_id": orch_id.to_string(),
"steps_compiled": compiled.steps.len(),
"steps_run": pipeline_result.steps_run,
"steps_ok": pipeline_result.steps_ok,
"technical": compiled.technical,
});
// === Kaskade: triggers-edge til nedstrøms orkestreringer (oppgave 24.8) ===
let cascade_chain: Vec<String> = job
.payload
.get("cascade_chain")
.and_then(|v| serde_json::from_value(v.clone()).ok())
.unwrap_or_default();
match crate::orchestration_trigger::enqueue_cascade(
db,
orch_id,
&cascade_chain,
&result_json,
)
.await
{
Ok(0) => {} // Ingen kaskade-mål
Ok(n) => {
tracing::info!(
orchestration_id = %orch_id,
cascade_targets = n,
"Kaskade: {n} nedstrøms orkestrering(er) lagt i kø"
);
}
Err(e) => {
// Kaskade-feil er ikke-fatal — selve orkestreringen lyktes
tracing::error!(
orchestration_id = %orch_id,
error = %e,
"Feil ved kaskade-evaluering (orkestreringen selv lyktes)"
);
}
}
Ok(result_json)
}
/// Synops-render binary path.
fn render_bin() -> String {
std::env::var("SYNOPS_RENDER_BIN")
.unwrap_or_else(|_| "synops-render".to_string())
}
/// Handler for `render_article`-jobb — delegerer til synops-render CLI.
///
/// Payload: `{ "node_id": "...", "collection_id": "..." }`
/// CLI-verktøyet gjør Tera-rendering, CAS-lagring og metadata-oppdatering.
async fn handle_render_article(
job: &JobRow,
cas: &CasStore,
) -> Result<serde_json::Value, String> {
let node_id: Uuid = job
.payload
.get("node_id")
.and_then(|v| v.as_str())
.and_then(|s| s.parse().ok())
.ok_or("Mangler node_id i payload")?;
let collection_id: Uuid = job
.payload
.get("collection_id")
.and_then(|v| v.as_str())
.and_then(|s| s.parse().ok())
.ok_or("Mangler collection_id i payload")?;
let bin = render_bin();
let mut cmd = tokio::process::Command::new(&bin);
cmd.arg("--node-id").arg(node_id.to_string())
.arg("--collection-id").arg(collection_id.to_string())
.arg("--render-type").arg("article")
.arg("--write");
cli_dispatch::set_database_url(&mut cmd)?;
cmd.env("CAS_ROOT", cas.root().to_string_lossy().to_string());
tracing::info!(
node_id = %node_id,
collection_id = %collection_id,
bin = %bin,
"Starter synops-render (article)"
);
let result = cli_dispatch::run_cli_tool(&bin, &mut cmd).await?;
tracing::info!(
node_id = %node_id,
html_hash = result["html_hash"].as_str().unwrap_or("n/a"),
"synops-render (article) fullført"
);
Ok(result)
}
/// Handler for `render_index`-jobb — delegerer til synops-render CLI.
///
/// Payload: `{ "collection_id": "..." }`
/// CLI-verktøyet gjør Tera-rendering, CAS-lagring og metadata-oppdatering.
async fn handle_render_index(
job: &JobRow,
cas: &CasStore,
) -> Result<serde_json::Value, String> {
let collection_id: Uuid = job
.payload
.get("collection_id")
.and_then(|v| v.as_str())
.and_then(|s| s.parse().ok())
.ok_or("Mangler collection_id i payload")?;
let bin = render_bin();
let mut cmd = tokio::process::Command::new(&bin);
cmd.arg("--node-id").arg(collection_id.to_string())
.arg("--render-type").arg("index")
.arg("--write");
cli_dispatch::set_database_url(&mut cmd)?;
cmd.env("CAS_ROOT", cas.root().to_string_lossy().to_string());
tracing::info!(
collection_id = %collection_id,
bin = %bin,
"Starter synops-render (index)"
);
let result = cli_dispatch::run_cli_tool(&bin, &mut cmd).await?;
tracing::info!(
collection_id = %collection_id,
html_hash = result["html_hash"].as_str().unwrap_or("n/a"),
"synops-render (index) fullført"
);
Ok(result)
}
/// Handler for `send_notification`-jobb — delegerer til synops-notify CLI.
///
/// Payload: `{ "to": "uuid", "message": "...", "subject": "...", "channel": "email|ws|both" }`
/// synops-notify sjekker brukerens metadata.preferences.notifications før sending.
async fn handle_send_notification(
job: &JobRow,
) -> Result<serde_json::Value, String> {
let payload_str = serde_json::to_string(&job.payload)
.map_err(|e| format!("Kunne ikke serialisere payload: {e}"))?;
let mut cmd = tokio::process::Command::new("synops-notify");
cmd.arg("--payload-json").arg(&payload_str);
cli_dispatch::set_database_url(&mut cmd)?;
tracing::info!(
job_id = %job.id,
to = job.payload["to"].as_str().unwrap_or("?"),
channel = job.payload["channel"].as_str().unwrap_or("ws"),
"Sender varsel via synops-notify"
);
cli_dispatch::run_cli_tool("synops-notify", &mut cmd).await
}
// =============================================================================
// Admin-API: spørring, retry og avbryt (oppgave 15.3)
// =============================================================================
/// Rad med alle felter for admin-oversikt.
#[derive(sqlx::FromRow, Debug, Serialize)]
pub struct JobDetail {
pub id: Uuid,
pub collection_node_id: Option<Uuid>,
pub job_type: String,
pub payload: serde_json::Value,
pub status: String,
pub priority: i16,
pub result: Option<serde_json::Value>,
pub error_msg: Option<String>,
pub attempts: i16,
pub max_attempts: i16,
pub created_at: DateTime<Utc>,
pub started_at: Option<DateTime<Utc>>,
pub completed_at: Option<DateTime<Utc>>,
pub scheduled_for: DateTime<Utc>,
}
/// Hent jobber med valgfri filtrering på status, type og samling.
/// Returnerer nyeste først, begrenset til `limit` rader.
pub async fn list_jobs(
db: &PgPool,
status_filter: Option<&str>,
type_filter: Option<&str>,
collection_filter: Option<Uuid>,
limit: i64,
offset: i64,
) -> Result<Vec<JobDetail>, sqlx::Error> {
// Bygg dynamisk WHERE-klausul
let mut conditions: Vec<String> = Vec::new();
if let Some(s) = status_filter {
conditions.push(format!("status = '{s}'"));
}
if let Some(t) = type_filter {
// Sanitize: kun alfanumeriske + underscore
let safe: String = t.chars().filter(|c| c.is_alphanumeric() || *c == '_').collect();
conditions.push(format!("job_type = '{safe}'"));
}
if let Some(cid) = collection_filter {
conditions.push(format!("collection_node_id = '{cid}'"));
}
let where_clause = if conditions.is_empty() {
String::new()
} else {
format!("WHERE {}", conditions.join(" AND "))
};
let query = format!(
r#"SELECT id, collection_node_id, job_type, payload,
status::text as status, priority, result, error_msg,
attempts, max_attempts, created_at, started_at,
completed_at, scheduled_for
FROM job_queue
{where_clause}
ORDER BY created_at DESC
LIMIT {limit} OFFSET {offset}"#
);
sqlx::query_as::<_, JobDetail>(&query)
.fetch_all(db)
.await
}
/// Tell jobber per status (for dashboard-oppsummering).
#[derive(sqlx::FromRow, Serialize, Debug)]
pub struct JobCountByStatus {
pub status: String,
pub count: i64,
}
pub async fn count_by_status(db: &PgPool) -> Result<Vec<JobCountByStatus>, sqlx::Error> {
sqlx::query_as::<_, JobCountByStatus>(
"SELECT status::text as status, count(*) as count FROM job_queue GROUP BY status"
)
.fetch_all(db)
.await
}
/// Hent distinkte jobbtyper (for filter-dropdown).
pub async fn distinct_job_types(db: &PgPool) -> Result<Vec<String>, sqlx::Error> {
sqlx::query_scalar::<_, String>(
"SELECT DISTINCT job_type FROM job_queue ORDER BY job_type"
)
.fetch_all(db)
.await
}
/// Sett en feilet jobb tilbake til 'pending' for manuell retry.
/// Kun jobber med status 'error' eller 'retry' kan retryes.
pub async fn retry_job(db: &PgPool, job_id: Uuid) -> Result<bool, sqlx::Error> {
let result = sqlx::query(
r#"UPDATE job_queue
SET status = 'pending',
error_msg = NULL,
scheduled_for = now(),
attempts = 0
WHERE id = $1
AND status IN ('error', 'retry')"#
)
.bind(job_id)
.execute(db)
.await?;
Ok(result.rows_affected() > 0)
}
/// Avbryt en ventende eller retry-jobb (sett til 'error' med melding).
/// Kjørende jobber kan ikke avbrytes via dette (de kjører allerede i en task).
pub async fn cancel_job(db: &PgPool, job_id: Uuid) -> Result<bool, sqlx::Error> {
let result = sqlx::query(
r#"UPDATE job_queue
SET status = 'error',
error_msg = 'Manuelt avbrutt av admin',
completed_at = now()
WHERE id = $1
AND status IN ('pending', 'retry')"#
)
.bind(job_id)
.execute(db)
.await?;
Ok(result.rows_affected() > 0)
}
/// Starter worker-loopen som poller job_queue.
/// Kjører som en bakgrunnsoppgave i tokio.
///
/// Ressursstyring (oppgave 15.5):
/// - Concurrency-kontroll via semaphore (maks 3 samtidige jobber)
/// - Prioritetsregler fra job_priority_rules-tabellen
/// - LiveKit-bevisst resource governor: nedprioriterer/blokkerer tunge
/// jobber når LiveKit-rom er aktive
/// - CPU-vektbasert kapasitetskontroll (MAX_TOTAL_WEIGHT)
///
/// Respekterer vedlikeholdsmodus: når `maintenance.is_active()` er true,
/// slutter workeren å dequeue nye jobber (kjørende jobber fullføres).
pub fn start_worker(
db: PgPool,
cas: CasStore,
index_cache: IndexCache,
maintenance: MaintenanceState,
priority_rules: PriorityRules,
) {
let whisper_url = std::env::var("WHISPER_URL")
.unwrap_or_else(|_| "http://faster-whisper:8000".to_string());
// Semaphore for maks 3 samtidige jobber (doc: --max-concurrent 3)
let semaphore = Arc::new(Semaphore::new(3));
// Cache LiveKit-status med 10-sekunders TTL for å unngå
// å spørre PG på hver poll-iterasjon
let livekit_cache: Arc<tokio::sync::RwLock<(bool, std::time::Instant)>> =
Arc::new(tokio::sync::RwLock::new((false, std::time::Instant::now())));
tokio::spawn(async move {
tracing::info!("Jobbkø-worker startet (poll: 2s, max-concurrent: 3, max-weight: {MAX_TOTAL_WEIGHT})");
loop {
// Sjekk vedlikeholdsmodus — ikke dequeue nye jobber
if maintenance.is_active() {
tracing::debug!("Vedlikeholdsmodus aktiv — jobbkø pauset");
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
continue;
}
// Sjekk om vi har ledig kapasitet (semaphore)
let permit = match semaphore.clone().try_acquire_owned() {
Ok(p) => p,
Err(_) => {
// Alle 3 slots er opptatt — vent litt
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
continue;
}
};
// Sjekk total CPU-vekt
let current_weight = resources::total_running_weight(&db, &priority_rules).await;
if current_weight >= MAX_TOTAL_WEIGHT {
drop(permit);
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
continue;
}
// Oppdater LiveKit-cache (TTL: 10 sekunder)
let livekit_active = {
let cache = livekit_cache.read().await;
if cache.1.elapsed() < std::time::Duration::from_secs(10) {
cache.0
} else {
drop(cache);
let active = resources::has_active_livekit_rooms(&db).await;
let mut cache = livekit_cache.write().await;
*cache = (active, std::time::Instant::now());
active
}
};
match dequeue(&db).await {
Ok(Some(job)) => {
// Sjekk prioritetsregler for denne jobbtypen
let rule = priority_rules.get(&job.job_type).await;
let decision = resources::evaluate_job(&rule, livekit_active);
// Sjekk om jobben skal utsettes (LiveKit-blokkering)
if decision.should_defer {
tracing::info!(
job_id = %job.id,
job_type = %job.job_type,
"Jobb utsatt — blokkert under aktiv LiveKit-sesjon"
);
// Sett tilbake til pending med kort delay
let _ = sqlx::query(
"UPDATE job_queue SET status = 'pending', started_at = NULL, attempts = attempts - 1, scheduled_for = now() + interval '30 seconds' WHERE id = $1"
)
.bind(job.id)
.execute(&db)
.await;
drop(permit);
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
continue;
}
// Sjekk max_concurrent per type
if rule.max_concurrent > 0 {
let running = resources::count_running_of_type(&db, &job.job_type)
.await
.unwrap_or(0);
// running inkluderer den vi nettopp satte til 'running',
// så sjekk mot max_concurrent
if running > rule.max_concurrent as i64 {
tracing::debug!(
job_type = %job.job_type,
running = running,
max = rule.max_concurrent,
"Max concurrent nådd for jobbtype — utsetter"
);
let _ = sqlx::query(
"UPDATE job_queue SET status = 'pending', started_at = NULL, attempts = attempts - 1, scheduled_for = now() + interval '5 seconds' WHERE id = $1"
)
.bind(job.id)
.execute(&db)
.await;
drop(permit);
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
continue;
}
}
// Sjekk at ny jobb ikke sprenger vektgrensen
if current_weight + rule.cpu_weight > MAX_TOTAL_WEIGHT {
tracing::debug!(
job_type = %job.job_type,
weight = rule.cpu_weight,
current = current_weight,
max = MAX_TOTAL_WEIGHT,
"CPU-vektgrense nådd — utsetter tung jobb"
);
let _ = sqlx::query(
"UPDATE job_queue SET status = 'pending', started_at = NULL, attempts = attempts - 1, scheduled_for = now() + interval '5 seconds' WHERE id = $1"
)
.bind(job.id)
.execute(&db)
.await;
drop(permit);
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
continue;
}
tracing::info!(
job_id = %job.id,
job_type = %job.job_type,
attempt = job.attempts,
weight = rule.cpu_weight,
livekit = livekit_active,
"Behandler jobb"
);
// Kjør jobben i en egen tokio-task (frigjør poll-loopen)
let db2 = db.clone();
let cas2 = cas.clone();
let index_cache2 = index_cache.clone();
let whisper_url2 = whisper_url.clone();
let timeout_secs = if rule.timeout_seconds > 0 {
rule.timeout_seconds as u64
} else {
600 // 10 min default
};
tokio::spawn(async move {
// Hold semaphore-permit til jobben er ferdig
let _permit = permit;
let result = tokio::time::timeout(
std::time::Duration::from_secs(timeout_secs),
dispatch(&job, &db2, &cas2, &index_cache2, &whisper_url2),
)
.await;
match result {
Ok(Ok(res)) => {
if let Err(e) = complete_job(&db2, job.id, res).await {
tracing::error!(job_id = %job.id, error = %e, "Kunne ikke markere jobb som fullført");
} else {
tracing::info!(job_id = %job.id, "Jobb fullført");
}
}
Ok(Err(err)) => {
tracing::error!(job_id = %job.id, error = %err, "Jobb feilet");
if let Err(e) = fail_job(&db2, &job, &err).await {
tracing::error!(job_id = %job.id, error = %e, "Kunne ikke markere jobb som feilet");
}
}
Err(_) => {
let msg = format!("Jobb tidsavbrutt etter {timeout_secs}s");
tracing::error!(job_id = %job.id, "{msg}");
if let Err(e) = fail_job(&db2, &job, &msg).await {
tracing::error!(job_id = %job.id, error = %e, "Kunne ikke markere tidsavbrutt jobb");
}
}
}
});
// Ikke vent — poll umiddelbart for neste jobb
continue;
}
Ok(None) => {
drop(permit);
// Ingen ventende jobber — vent før neste poll
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
}
Err(e) => {
drop(permit);
tracing::error!(error = %e, "Feil ved polling av jobbkø");
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
}
}
}
});
}