Ny standard-orkestrering "Overvåk RSS-feed" som bruker synops-feed CLI. Samlinger konfigurerer feed-abonnementer via metadata.feed_subscriptions[], med konfigurerbar URL, intervall og mål (inbox/channel). Komponenter: - Migration 030: synops-feed cli_tool-seed, orchestration-seed, indeks, prioritetsregel - feed_poller.rs: Bakgrunnstask som hvert 60s finner forfalne abonnementer og enqueuer feed_poll-jobber. Dedupliserer mot kjørende jobber. - feed_poll job handler: Spawner synops-feed CLI, oppdaterer last_polled_at - API: configure_feed_subscription + remove_feed_subscription endepunkter Verifisert: NRK toppsaker.rss → 100 noder opprettet, last_polled_at oppdatert.
881 lines
30 KiB
Rust
881 lines
30 KiB
Rust
// Jobbkø — PostgreSQL-basert asynkron jobbehandling.
|
||
//
|
||
// Enkel polling-loop med SELECT ... FOR UPDATE SKIP LOCKED.
|
||
// Dispatching til handler-funksjoner basert på job_type.
|
||
// Ressursstyring: semaphore for concurrency, prioritetsregler fra PG,
|
||
// LiveKit-bevisst resource governor (oppgave 15.5).
|
||
//
|
||
// Ref: docs/infra/jobbkø.md
|
||
|
||
use chrono::{DateTime, Utc};
|
||
use serde::Serialize;
|
||
use sqlx::PgPool;
|
||
use std::sync::Arc;
|
||
use tokio::sync::Semaphore;
|
||
use uuid::Uuid;
|
||
|
||
use crate::agent;
|
||
use crate::ai_edges;
|
||
use crate::ai_process;
|
||
use crate::audio;
|
||
use crate::cas::CasStore;
|
||
use crate::cli_dispatch;
|
||
use crate::clip;
|
||
use crate::maintenance::MaintenanceState;
|
||
use crate::pg_writes;
|
||
use crate::publishing::IndexCache;
|
||
use crate::resources::{self, PriorityRules};
|
||
use crate::script_compiler;
|
||
use crate::script_executor;
|
||
use crate::summarize;
|
||
use crate::transcribe;
|
||
use crate::tts;
|
||
|
||
/// Maks total CPU-vekt som kan kjøre samtidig.
|
||
/// Standard: 8 (passer for 8 vCPU der Whisper=5 + et lett kall=1+1+1).
|
||
const MAX_TOTAL_WEIGHT: i16 = 8;
|
||
|
||
/// Rad fra job_queue-tabellen.
|
||
#[derive(sqlx::FromRow, Debug)]
|
||
pub struct JobRow {
|
||
pub id: Uuid,
|
||
pub collection_node_id: Option<Uuid>,
|
||
pub job_type: String,
|
||
pub payload: serde_json::Value,
|
||
pub attempts: i16,
|
||
pub max_attempts: i16,
|
||
}
|
||
|
||
/// Legger en ny jobb i køen.
|
||
pub async fn enqueue(
|
||
db: &PgPool,
|
||
job_type: &str,
|
||
payload: serde_json::Value,
|
||
collection_node_id: Option<Uuid>,
|
||
priority: i16,
|
||
) -> Result<Uuid, sqlx::Error> {
|
||
let row = sqlx::query_scalar::<_, Uuid>(
|
||
r#"
|
||
INSERT INTO job_queue (job_type, payload, collection_node_id, priority)
|
||
VALUES ($1, $2, $3, $4)
|
||
RETURNING id
|
||
"#,
|
||
)
|
||
.bind(job_type)
|
||
.bind(&payload)
|
||
.bind(collection_node_id)
|
||
.bind(priority)
|
||
.fetch_one(db)
|
||
.await?;
|
||
|
||
tracing::info!(job_id = %row, job_type = %job_type, "Jobb lagt i kø");
|
||
Ok(row)
|
||
}
|
||
|
||
/// Henter neste ventende jobb (atomisk med FOR UPDATE SKIP LOCKED).
|
||
/// Setter status til 'running' og oppdaterer started_at.
|
||
async fn dequeue(db: &PgPool) -> Result<Option<JobRow>, sqlx::Error> {
|
||
let mut tx = db.begin().await?;
|
||
|
||
let job = sqlx::query_as::<_, JobRow>(
|
||
r#"
|
||
SELECT id, collection_node_id, job_type, payload, attempts, max_attempts
|
||
FROM job_queue
|
||
WHERE status IN ('pending', 'retry')
|
||
AND scheduled_for <= now()
|
||
ORDER BY priority DESC, scheduled_for ASC
|
||
LIMIT 1
|
||
FOR UPDATE SKIP LOCKED
|
||
"#,
|
||
)
|
||
.fetch_optional(&mut *tx)
|
||
.await?;
|
||
|
||
if let Some(ref job) = job {
|
||
sqlx::query(
|
||
"UPDATE job_queue SET status = 'running', started_at = now(), attempts = attempts + 1 WHERE id = $1",
|
||
)
|
||
.bind(job.id)
|
||
.execute(&mut *tx)
|
||
.await?;
|
||
}
|
||
|
||
tx.commit().await?;
|
||
Ok(job)
|
||
}
|
||
|
||
/// Markerer en jobb som fullført.
|
||
async fn complete_job(db: &PgPool, job_id: Uuid, result: serde_json::Value) -> Result<(), sqlx::Error> {
|
||
sqlx::query(
|
||
"UPDATE job_queue SET status = 'completed', result = $2, completed_at = now() WHERE id = $1",
|
||
)
|
||
.bind(job_id)
|
||
.bind(&result)
|
||
.execute(db)
|
||
.await?;
|
||
Ok(())
|
||
}
|
||
|
||
/// Markerer en jobb som feilet. Settes til 'retry' med backoff hvis forsøk gjenstår,
|
||
/// ellers 'error'.
|
||
async fn fail_job(db: &PgPool, job: &JobRow, error_msg: &str) -> Result<(), sqlx::Error> {
|
||
if job.attempts < job.max_attempts {
|
||
// Eksponentiell backoff: 30s × 2^(attempts-1)
|
||
let backoff_secs = 30i64 * 2i64.pow((job.attempts).max(0) as u32);
|
||
sqlx::query(
|
||
r#"
|
||
UPDATE job_queue
|
||
SET status = 'retry',
|
||
error_msg = $2,
|
||
scheduled_for = now() + ($3 || ' seconds')::interval
|
||
WHERE id = $1
|
||
"#,
|
||
)
|
||
.bind(job.id)
|
||
.bind(error_msg)
|
||
.bind(backoff_secs.to_string())
|
||
.execute(db)
|
||
.await?;
|
||
|
||
tracing::warn!(
|
||
job_id = %job.id,
|
||
attempt = job.attempts,
|
||
max = job.max_attempts,
|
||
backoff_secs = backoff_secs,
|
||
"Jobb feilet, retry planlagt"
|
||
);
|
||
} else {
|
||
sqlx::query(
|
||
"UPDATE job_queue SET status = 'error', error_msg = $2, completed_at = now() WHERE id = $1",
|
||
)
|
||
.bind(job.id)
|
||
.bind(error_msg)
|
||
.execute(db)
|
||
.await?;
|
||
|
||
tracing::error!(
|
||
job_id = %job.id,
|
||
attempts = job.attempts,
|
||
"Jobb permanent feilet"
|
||
);
|
||
}
|
||
Ok(())
|
||
}
|
||
|
||
/// Dispatcher — kjører riktig handler basert på job_type.
|
||
///
|
||
/// Alle handlere delegerer til CLI-verktøy (Command::new("synops-X"))
|
||
/// i tråd med Unix-filosofien. Unntaket er ai_process som fortsatt
|
||
/// kjører inline (mangler CLI-verktøy, planlagt i fremtidig oppgave).
|
||
///
|
||
/// Ref: docs/retninger/unix_filosofi.md
|
||
async fn dispatch(
|
||
job: &JobRow,
|
||
db: &PgPool,
|
||
cas: &CasStore,
|
||
index_cache: &IndexCache,
|
||
whisper_url: &str,
|
||
) -> Result<serde_json::Value, String> {
|
||
match job.job_type.as_str() {
|
||
"whisper_transcribe" => {
|
||
transcribe::handle_whisper_job(job, cas, whisper_url).await
|
||
}
|
||
"agent_respond" => {
|
||
agent::handle_agent_respond(job, db).await
|
||
}
|
||
"suggest_edges" => {
|
||
ai_edges::handle_suggest_edges(job, db).await
|
||
}
|
||
"summarize_communication" => {
|
||
summarize::handle_summarize_communication(job, db).await
|
||
}
|
||
"tts_generate" => {
|
||
tts::handle_tts_job(job, db).await
|
||
}
|
||
"audio_process" => {
|
||
audio::handle_audio_process_job(job, db, cas).await
|
||
}
|
||
"ai_process" => {
|
||
ai_process::handle_ai_process(job, db).await
|
||
}
|
||
"render_article" => {
|
||
handle_render_article(job, cas).await
|
||
}
|
||
"render_index" => {
|
||
handle_render_index(job, cas).await
|
||
}
|
||
// PG-skriveoperasjoner (oppgave 12.3): retry med backoff + dead letter queue
|
||
"pg_insert_node" => {
|
||
pg_writes::handle_insert_node(job, db).await
|
||
}
|
||
"pg_insert_edge" => {
|
||
pg_writes::handle_insert_edge(job, db, index_cache).await
|
||
}
|
||
"pg_update_node" => {
|
||
pg_writes::handle_update_node(job, db).await
|
||
}
|
||
"pg_delete_node" => {
|
||
pg_writes::handle_delete_node(job, db).await
|
||
}
|
||
"pg_delete_edge" => {
|
||
pg_writes::handle_delete_edge(job, db, index_cache).await
|
||
}
|
||
"clip_url" => {
|
||
clip::handle_clip_url(job, db).await
|
||
}
|
||
"describe_image" => {
|
||
crate::describe_image::handle_describe_image(job, db, cas).await
|
||
}
|
||
// Feed-polling: periodisk RSS/Atom-abonnement (oppgave 29.3)
|
||
"feed_poll" => {
|
||
crate::feed_poller::handle_feed_poll(job, db).await
|
||
}
|
||
// Orchestration: trigger-evaluering har lagt jobben i kø.
|
||
// Kompilatoren parser scriptet og validerer det.
|
||
// Utførelse av kompilert script kommer i oppgave 24.5.
|
||
"orchestrate" => {
|
||
handle_orchestrate(job, db).await
|
||
}
|
||
other => Err(format!("Ukjent jobbtype: {other}")),
|
||
}
|
||
}
|
||
|
||
/// Handler for `orchestrate`-jobb — kompilerer og utfører orchestration-script.
|
||
///
|
||
/// Flyt:
|
||
/// 1. Henter orchestration-nodens `content` (menneskelig script)
|
||
/// 2. Kompilerer via script_compiler → lagrer pipeline i metadata
|
||
/// 3. Substituerer {event.*}-variabler fra trigger-kontekst
|
||
/// 4. Utfører steg sekvensielt via generisk dispatch (script_executor)
|
||
/// 5. VED_FEIL-håndtering: steg-fallback → global fallback → stopp
|
||
/// 6. Logger hvert steg i orchestration_log
|
||
async fn handle_orchestrate(
|
||
job: &JobRow,
|
||
db: &PgPool,
|
||
) -> Result<serde_json::Value, String> {
|
||
let orch_id: Uuid = job
|
||
.payload
|
||
.get("orchestration_id")
|
||
.and_then(|v| v.as_str())
|
||
.and_then(|s| s.parse().ok())
|
||
.ok_or("Mangler orchestration_id i payload")?;
|
||
|
||
tracing::info!(orchestration_id = %orch_id, "Kompilerer orchestration-script");
|
||
|
||
// Hent orchestration-nodens content og metadata
|
||
let row = sqlx::query_as::<_, (Option<String>, serde_json::Value)>(
|
||
"SELECT content, metadata FROM nodes WHERE id = $1 AND node_kind = 'orchestration'"
|
||
)
|
||
.bind(orch_id)
|
||
.fetch_optional(db)
|
||
.await
|
||
.map_err(|e| format!("Feil ved henting av orchestration-node: {e}"))?
|
||
.ok_or_else(|| format!("Orchestration-node {orch_id} finnes ikke"))?;
|
||
|
||
let (content, _metadata) = row;
|
||
let script = content.ok_or("Orchestration-node mangler content (script)")?;
|
||
|
||
if script.trim().is_empty() {
|
||
return Err("Orchestration-script er tomt".into());
|
||
}
|
||
|
||
// Parse scriptet
|
||
let parsed = script_compiler::parse(&script)
|
||
.map_err(|e| format!("Parse-feil: {e}"))?;
|
||
|
||
// Hent verktøyregister fra cli_tool-noder
|
||
let registry = script_compiler::load_tool_registry(db).await?;
|
||
|
||
// Kompiler
|
||
let result = script_compiler::compile(&parsed, ®istry);
|
||
|
||
// Logg rapport
|
||
let report = result.format_report();
|
||
tracing::info!(
|
||
orchestration_id = %orch_id,
|
||
errors = result.has_errors(),
|
||
"\n{report}"
|
||
);
|
||
|
||
if result.has_errors() {
|
||
// Lagre feilrapport i metadata for UI-visning
|
||
let diagnostics_json = serde_json::to_value(&result.diagnostics)
|
||
.map_err(|e| format!("Serialiseringsfeil: {e}"))?;
|
||
|
||
sqlx::query(
|
||
r#"UPDATE nodes
|
||
SET metadata = jsonb_set(
|
||
jsonb_set(metadata, '{compile_errors}', $2),
|
||
'{compiled}', 'false'
|
||
)
|
||
WHERE id = $1"#,
|
||
)
|
||
.bind(orch_id)
|
||
.bind(&diagnostics_json)
|
||
.execute(db)
|
||
.await
|
||
.map_err(|e| format!("Feil ved lagring av kompileringsfeil: {e}"))?;
|
||
|
||
return Err(format!("Kompilering feilet:\n{report}"));
|
||
}
|
||
|
||
// Suksess — lagre kompilert pipeline i metadata
|
||
let compiled = result.compiled.as_ref().unwrap();
|
||
let pipeline_json = serde_json::to_value(&compiled.steps)
|
||
.map_err(|e| format!("Serialiseringsfeil: {e}"))?;
|
||
|
||
let global_fb_json = compiled
|
||
.global_fallback
|
||
.as_ref()
|
||
.map(|fb| serde_json::to_value(fb).unwrap_or_default())
|
||
.unwrap_or(serde_json::Value::Null);
|
||
|
||
sqlx::query(
|
||
r#"UPDATE nodes
|
||
SET metadata = jsonb_set(
|
||
jsonb_set(
|
||
jsonb_set(
|
||
metadata - 'compile_errors',
|
||
'{pipeline}', $2
|
||
),
|
||
'{compiled}', 'true'
|
||
),
|
||
'{global_fallback}', $3
|
||
)
|
||
WHERE id = $1"#,
|
||
)
|
||
.bind(orch_id)
|
||
.bind(&pipeline_json)
|
||
.bind(&global_fb_json)
|
||
.execute(db)
|
||
.await
|
||
.map_err(|e| format!("Feil ved lagring av kompilert pipeline: {e}"))?;
|
||
|
||
tracing::info!(
|
||
orchestration_id = %orch_id,
|
||
steps = compiled.steps.len(),
|
||
"Orchestration-script kompilert — starter utførelse"
|
||
);
|
||
|
||
// === Utførelse (oppgave 24.5) ===
|
||
// Bygg ExecutionContext fra trigger_context i payload
|
||
let exec_ctx = script_executor::ExecutionContext::from_payload(&job.payload);
|
||
|
||
let pipeline_result = script_executor::execute_pipeline(
|
||
db,
|
||
orch_id,
|
||
job.id,
|
||
&compiled.steps,
|
||
compiled.global_fallback.as_ref(),
|
||
&exec_ctx,
|
||
)
|
||
.await;
|
||
|
||
tracing::info!(
|
||
orchestration_id = %orch_id,
|
||
steps_run = pipeline_result.steps_run,
|
||
steps_ok = pipeline_result.steps_ok,
|
||
steps_failed = pipeline_result.steps_failed,
|
||
aborted = pipeline_result.aborted,
|
||
"Pipeline utført"
|
||
);
|
||
|
||
if pipeline_result.aborted {
|
||
return Err(format!(
|
||
"Pipeline avbrutt etter {}/{} steg: {}",
|
||
pipeline_result.steps_ok,
|
||
pipeline_result.steps_run,
|
||
pipeline_result.error.as_deref().unwrap_or("ukjent feil"),
|
||
));
|
||
}
|
||
|
||
let result_json = serde_json::json!({
|
||
"status": "executed",
|
||
"orchestration_id": orch_id.to_string(),
|
||
"steps_compiled": compiled.steps.len(),
|
||
"steps_run": pipeline_result.steps_run,
|
||
"steps_ok": pipeline_result.steps_ok,
|
||
"technical": compiled.technical,
|
||
});
|
||
|
||
// === Kaskade: triggers-edge til nedstrøms orkestreringer (oppgave 24.8) ===
|
||
let cascade_chain: Vec<String> = job
|
||
.payload
|
||
.get("cascade_chain")
|
||
.and_then(|v| serde_json::from_value(v.clone()).ok())
|
||
.unwrap_or_default();
|
||
|
||
match crate::orchestration_trigger::enqueue_cascade(
|
||
db,
|
||
orch_id,
|
||
&cascade_chain,
|
||
&result_json,
|
||
)
|
||
.await
|
||
{
|
||
Ok(0) => {} // Ingen kaskade-mål
|
||
Ok(n) => {
|
||
tracing::info!(
|
||
orchestration_id = %orch_id,
|
||
cascade_targets = n,
|
||
"Kaskade: {n} nedstrøms orkestrering(er) lagt i kø"
|
||
);
|
||
}
|
||
Err(e) => {
|
||
// Kaskade-feil er ikke-fatal — selve orkestreringen lyktes
|
||
tracing::error!(
|
||
orchestration_id = %orch_id,
|
||
error = %e,
|
||
"Feil ved kaskade-evaluering (orkestreringen selv lyktes)"
|
||
);
|
||
}
|
||
}
|
||
|
||
Ok(result_json)
|
||
}
|
||
|
||
/// Synops-render binary path.
|
||
fn render_bin() -> String {
|
||
std::env::var("SYNOPS_RENDER_BIN")
|
||
.unwrap_or_else(|_| "synops-render".to_string())
|
||
}
|
||
|
||
/// Handler for `render_article`-jobb — delegerer til synops-render CLI.
|
||
///
|
||
/// Payload: `{ "node_id": "...", "collection_id": "..." }`
|
||
/// CLI-verktøyet gjør Tera-rendering, CAS-lagring og metadata-oppdatering.
|
||
async fn handle_render_article(
|
||
job: &JobRow,
|
||
cas: &CasStore,
|
||
) -> Result<serde_json::Value, String> {
|
||
let node_id: Uuid = job
|
||
.payload
|
||
.get("node_id")
|
||
.and_then(|v| v.as_str())
|
||
.and_then(|s| s.parse().ok())
|
||
.ok_or("Mangler node_id i payload")?;
|
||
|
||
let collection_id: Uuid = job
|
||
.payload
|
||
.get("collection_id")
|
||
.and_then(|v| v.as_str())
|
||
.and_then(|s| s.parse().ok())
|
||
.ok_or("Mangler collection_id i payload")?;
|
||
|
||
let bin = render_bin();
|
||
let mut cmd = tokio::process::Command::new(&bin);
|
||
|
||
cmd.arg("--node-id").arg(node_id.to_string())
|
||
.arg("--collection-id").arg(collection_id.to_string())
|
||
.arg("--render-type").arg("article")
|
||
.arg("--write");
|
||
|
||
cli_dispatch::set_database_url(&mut cmd)?;
|
||
cmd.env("CAS_ROOT", cas.root().to_string_lossy().to_string());
|
||
|
||
tracing::info!(
|
||
node_id = %node_id,
|
||
collection_id = %collection_id,
|
||
bin = %bin,
|
||
"Starter synops-render (article)"
|
||
);
|
||
|
||
let result = cli_dispatch::run_cli_tool(&bin, &mut cmd).await?;
|
||
|
||
tracing::info!(
|
||
node_id = %node_id,
|
||
html_hash = result["html_hash"].as_str().unwrap_or("n/a"),
|
||
"synops-render (article) fullført"
|
||
);
|
||
|
||
Ok(result)
|
||
}
|
||
|
||
/// Handler for `render_index`-jobb — delegerer til synops-render CLI.
|
||
///
|
||
/// Payload: `{ "collection_id": "..." }`
|
||
/// CLI-verktøyet gjør Tera-rendering, CAS-lagring og metadata-oppdatering.
|
||
async fn handle_render_index(
|
||
job: &JobRow,
|
||
cas: &CasStore,
|
||
) -> Result<serde_json::Value, String> {
|
||
let collection_id: Uuid = job
|
||
.payload
|
||
.get("collection_id")
|
||
.and_then(|v| v.as_str())
|
||
.and_then(|s| s.parse().ok())
|
||
.ok_or("Mangler collection_id i payload")?;
|
||
|
||
let bin = render_bin();
|
||
let mut cmd = tokio::process::Command::new(&bin);
|
||
|
||
cmd.arg("--node-id").arg(collection_id.to_string())
|
||
.arg("--render-type").arg("index")
|
||
.arg("--write");
|
||
|
||
cli_dispatch::set_database_url(&mut cmd)?;
|
||
cmd.env("CAS_ROOT", cas.root().to_string_lossy().to_string());
|
||
|
||
tracing::info!(
|
||
collection_id = %collection_id,
|
||
bin = %bin,
|
||
"Starter synops-render (index)"
|
||
);
|
||
|
||
let result = cli_dispatch::run_cli_tool(&bin, &mut cmd).await?;
|
||
|
||
tracing::info!(
|
||
collection_id = %collection_id,
|
||
html_hash = result["html_hash"].as_str().unwrap_or("n/a"),
|
||
"synops-render (index) fullført"
|
||
);
|
||
|
||
Ok(result)
|
||
}
|
||
|
||
// =============================================================================
|
||
// Admin-API: spørring, retry og avbryt (oppgave 15.3)
|
||
// =============================================================================
|
||
|
||
/// Rad med alle felter for admin-oversikt.
|
||
#[derive(sqlx::FromRow, Debug, Serialize)]
|
||
pub struct JobDetail {
|
||
pub id: Uuid,
|
||
pub collection_node_id: Option<Uuid>,
|
||
pub job_type: String,
|
||
pub payload: serde_json::Value,
|
||
pub status: String,
|
||
pub priority: i16,
|
||
pub result: Option<serde_json::Value>,
|
||
pub error_msg: Option<String>,
|
||
pub attempts: i16,
|
||
pub max_attempts: i16,
|
||
pub created_at: DateTime<Utc>,
|
||
pub started_at: Option<DateTime<Utc>>,
|
||
pub completed_at: Option<DateTime<Utc>>,
|
||
pub scheduled_for: DateTime<Utc>,
|
||
}
|
||
|
||
/// Hent jobber med valgfri filtrering på status, type og samling.
|
||
/// Returnerer nyeste først, begrenset til `limit` rader.
|
||
pub async fn list_jobs(
|
||
db: &PgPool,
|
||
status_filter: Option<&str>,
|
||
type_filter: Option<&str>,
|
||
collection_filter: Option<Uuid>,
|
||
limit: i64,
|
||
offset: i64,
|
||
) -> Result<Vec<JobDetail>, sqlx::Error> {
|
||
// Bygg dynamisk WHERE-klausul
|
||
let mut conditions: Vec<String> = Vec::new();
|
||
|
||
if let Some(s) = status_filter {
|
||
conditions.push(format!("status = '{s}'"));
|
||
}
|
||
if let Some(t) = type_filter {
|
||
// Sanitize: kun alfanumeriske + underscore
|
||
let safe: String = t.chars().filter(|c| c.is_alphanumeric() || *c == '_').collect();
|
||
conditions.push(format!("job_type = '{safe}'"));
|
||
}
|
||
if let Some(cid) = collection_filter {
|
||
conditions.push(format!("collection_node_id = '{cid}'"));
|
||
}
|
||
|
||
let where_clause = if conditions.is_empty() {
|
||
String::new()
|
||
} else {
|
||
format!("WHERE {}", conditions.join(" AND "))
|
||
};
|
||
|
||
let query = format!(
|
||
r#"SELECT id, collection_node_id, job_type, payload,
|
||
status::text as status, priority, result, error_msg,
|
||
attempts, max_attempts, created_at, started_at,
|
||
completed_at, scheduled_for
|
||
FROM job_queue
|
||
{where_clause}
|
||
ORDER BY created_at DESC
|
||
LIMIT {limit} OFFSET {offset}"#
|
||
);
|
||
|
||
sqlx::query_as::<_, JobDetail>(&query)
|
||
.fetch_all(db)
|
||
.await
|
||
}
|
||
|
||
/// Tell jobber per status (for dashboard-oppsummering).
|
||
#[derive(sqlx::FromRow, Serialize, Debug)]
|
||
pub struct JobCountByStatus {
|
||
pub status: String,
|
||
pub count: i64,
|
||
}
|
||
|
||
pub async fn count_by_status(db: &PgPool) -> Result<Vec<JobCountByStatus>, sqlx::Error> {
|
||
sqlx::query_as::<_, JobCountByStatus>(
|
||
"SELECT status::text as status, count(*) as count FROM job_queue GROUP BY status"
|
||
)
|
||
.fetch_all(db)
|
||
.await
|
||
}
|
||
|
||
/// Hent distinkte jobbtyper (for filter-dropdown).
|
||
pub async fn distinct_job_types(db: &PgPool) -> Result<Vec<String>, sqlx::Error> {
|
||
sqlx::query_scalar::<_, String>(
|
||
"SELECT DISTINCT job_type FROM job_queue ORDER BY job_type"
|
||
)
|
||
.fetch_all(db)
|
||
.await
|
||
}
|
||
|
||
/// Sett en feilet jobb tilbake til 'pending' for manuell retry.
|
||
/// Kun jobber med status 'error' eller 'retry' kan retryes.
|
||
pub async fn retry_job(db: &PgPool, job_id: Uuid) -> Result<bool, sqlx::Error> {
|
||
let result = sqlx::query(
|
||
r#"UPDATE job_queue
|
||
SET status = 'pending',
|
||
error_msg = NULL,
|
||
scheduled_for = now(),
|
||
attempts = 0
|
||
WHERE id = $1
|
||
AND status IN ('error', 'retry')"#
|
||
)
|
||
.bind(job_id)
|
||
.execute(db)
|
||
.await?;
|
||
|
||
Ok(result.rows_affected() > 0)
|
||
}
|
||
|
||
/// Avbryt en ventende eller retry-jobb (sett til 'error' med melding).
|
||
/// Kjørende jobber kan ikke avbrytes via dette (de kjører allerede i en task).
|
||
pub async fn cancel_job(db: &PgPool, job_id: Uuid) -> Result<bool, sqlx::Error> {
|
||
let result = sqlx::query(
|
||
r#"UPDATE job_queue
|
||
SET status = 'error',
|
||
error_msg = 'Manuelt avbrutt av admin',
|
||
completed_at = now()
|
||
WHERE id = $1
|
||
AND status IN ('pending', 'retry')"#
|
||
)
|
||
.bind(job_id)
|
||
.execute(db)
|
||
.await?;
|
||
|
||
Ok(result.rows_affected() > 0)
|
||
}
|
||
|
||
/// Starter worker-loopen som poller job_queue.
|
||
/// Kjører som en bakgrunnsoppgave i tokio.
|
||
///
|
||
/// Ressursstyring (oppgave 15.5):
|
||
/// - Concurrency-kontroll via semaphore (maks 3 samtidige jobber)
|
||
/// - Prioritetsregler fra job_priority_rules-tabellen
|
||
/// - LiveKit-bevisst resource governor: nedprioriterer/blokkerer tunge
|
||
/// jobber når LiveKit-rom er aktive
|
||
/// - CPU-vektbasert kapasitetskontroll (MAX_TOTAL_WEIGHT)
|
||
///
|
||
/// Respekterer vedlikeholdsmodus: når `maintenance.is_active()` er true,
|
||
/// slutter workeren å dequeue nye jobber (kjørende jobber fullføres).
|
||
pub fn start_worker(
|
||
db: PgPool,
|
||
cas: CasStore,
|
||
index_cache: IndexCache,
|
||
maintenance: MaintenanceState,
|
||
priority_rules: PriorityRules,
|
||
) {
|
||
let whisper_url = std::env::var("WHISPER_URL")
|
||
.unwrap_or_else(|_| "http://faster-whisper:8000".to_string());
|
||
|
||
// Semaphore for maks 3 samtidige jobber (doc: --max-concurrent 3)
|
||
let semaphore = Arc::new(Semaphore::new(3));
|
||
|
||
// Cache LiveKit-status med 10-sekunders TTL for å unngå
|
||
// å spørre PG på hver poll-iterasjon
|
||
let livekit_cache: Arc<tokio::sync::RwLock<(bool, std::time::Instant)>> =
|
||
Arc::new(tokio::sync::RwLock::new((false, std::time::Instant::now())));
|
||
|
||
tokio::spawn(async move {
|
||
tracing::info!("Jobbkø-worker startet (poll: 2s, max-concurrent: 3, max-weight: {MAX_TOTAL_WEIGHT})");
|
||
|
||
loop {
|
||
// Sjekk vedlikeholdsmodus — ikke dequeue nye jobber
|
||
if maintenance.is_active() {
|
||
tracing::debug!("Vedlikeholdsmodus aktiv — jobbkø pauset");
|
||
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
|
||
continue;
|
||
}
|
||
|
||
// Sjekk om vi har ledig kapasitet (semaphore)
|
||
let permit = match semaphore.clone().try_acquire_owned() {
|
||
Ok(p) => p,
|
||
Err(_) => {
|
||
// Alle 3 slots er opptatt — vent litt
|
||
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
||
continue;
|
||
}
|
||
};
|
||
|
||
// Sjekk total CPU-vekt
|
||
let current_weight = resources::total_running_weight(&db, &priority_rules).await;
|
||
if current_weight >= MAX_TOTAL_WEIGHT {
|
||
drop(permit);
|
||
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
|
||
continue;
|
||
}
|
||
|
||
// Oppdater LiveKit-cache (TTL: 10 sekunder)
|
||
let livekit_active = {
|
||
let cache = livekit_cache.read().await;
|
||
if cache.1.elapsed() < std::time::Duration::from_secs(10) {
|
||
cache.0
|
||
} else {
|
||
drop(cache);
|
||
let active = resources::has_active_livekit_rooms(&db).await;
|
||
let mut cache = livekit_cache.write().await;
|
||
*cache = (active, std::time::Instant::now());
|
||
active
|
||
}
|
||
};
|
||
|
||
match dequeue(&db).await {
|
||
Ok(Some(job)) => {
|
||
// Sjekk prioritetsregler for denne jobbtypen
|
||
let rule = priority_rules.get(&job.job_type).await;
|
||
let decision = resources::evaluate_job(&rule, livekit_active);
|
||
|
||
// Sjekk om jobben skal utsettes (LiveKit-blokkering)
|
||
if decision.should_defer {
|
||
tracing::info!(
|
||
job_id = %job.id,
|
||
job_type = %job.job_type,
|
||
"Jobb utsatt — blokkert under aktiv LiveKit-sesjon"
|
||
);
|
||
// Sett tilbake til pending med kort delay
|
||
let _ = sqlx::query(
|
||
"UPDATE job_queue SET status = 'pending', started_at = NULL, attempts = attempts - 1, scheduled_for = now() + interval '30 seconds' WHERE id = $1"
|
||
)
|
||
.bind(job.id)
|
||
.execute(&db)
|
||
.await;
|
||
drop(permit);
|
||
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
|
||
continue;
|
||
}
|
||
|
||
// Sjekk max_concurrent per type
|
||
if rule.max_concurrent > 0 {
|
||
let running = resources::count_running_of_type(&db, &job.job_type)
|
||
.await
|
||
.unwrap_or(0);
|
||
// running inkluderer den vi nettopp satte til 'running',
|
||
// så sjekk mot max_concurrent
|
||
if running > rule.max_concurrent as i64 {
|
||
tracing::debug!(
|
||
job_type = %job.job_type,
|
||
running = running,
|
||
max = rule.max_concurrent,
|
||
"Max concurrent nådd for jobbtype — utsetter"
|
||
);
|
||
let _ = sqlx::query(
|
||
"UPDATE job_queue SET status = 'pending', started_at = NULL, attempts = attempts - 1, scheduled_for = now() + interval '5 seconds' WHERE id = $1"
|
||
)
|
||
.bind(job.id)
|
||
.execute(&db)
|
||
.await;
|
||
drop(permit);
|
||
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
|
||
continue;
|
||
}
|
||
}
|
||
|
||
// Sjekk at ny jobb ikke sprenger vektgrensen
|
||
if current_weight + rule.cpu_weight > MAX_TOTAL_WEIGHT {
|
||
tracing::debug!(
|
||
job_type = %job.job_type,
|
||
weight = rule.cpu_weight,
|
||
current = current_weight,
|
||
max = MAX_TOTAL_WEIGHT,
|
||
"CPU-vektgrense nådd — utsetter tung jobb"
|
||
);
|
||
let _ = sqlx::query(
|
||
"UPDATE job_queue SET status = 'pending', started_at = NULL, attempts = attempts - 1, scheduled_for = now() + interval '5 seconds' WHERE id = $1"
|
||
)
|
||
.bind(job.id)
|
||
.execute(&db)
|
||
.await;
|
||
drop(permit);
|
||
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
|
||
continue;
|
||
}
|
||
|
||
tracing::info!(
|
||
job_id = %job.id,
|
||
job_type = %job.job_type,
|
||
attempt = job.attempts,
|
||
weight = rule.cpu_weight,
|
||
livekit = livekit_active,
|
||
"Behandler jobb"
|
||
);
|
||
|
||
// Kjør jobben i en egen tokio-task (frigjør poll-loopen)
|
||
let db2 = db.clone();
|
||
let cas2 = cas.clone();
|
||
let index_cache2 = index_cache.clone();
|
||
let whisper_url2 = whisper_url.clone();
|
||
let timeout_secs = if rule.timeout_seconds > 0 {
|
||
rule.timeout_seconds as u64
|
||
} else {
|
||
600 // 10 min default
|
||
};
|
||
|
||
tokio::spawn(async move {
|
||
// Hold semaphore-permit til jobben er ferdig
|
||
let _permit = permit;
|
||
|
||
let result = tokio::time::timeout(
|
||
std::time::Duration::from_secs(timeout_secs),
|
||
dispatch(&job, &db2, &cas2, &index_cache2, &whisper_url2),
|
||
)
|
||
.await;
|
||
|
||
match result {
|
||
Ok(Ok(res)) => {
|
||
if let Err(e) = complete_job(&db2, job.id, res).await {
|
||
tracing::error!(job_id = %job.id, error = %e, "Kunne ikke markere jobb som fullført");
|
||
} else {
|
||
tracing::info!(job_id = %job.id, "Jobb fullført");
|
||
}
|
||
}
|
||
Ok(Err(err)) => {
|
||
tracing::error!(job_id = %job.id, error = %err, "Jobb feilet");
|
||
if let Err(e) = fail_job(&db2, &job, &err).await {
|
||
tracing::error!(job_id = %job.id, error = %e, "Kunne ikke markere jobb som feilet");
|
||
}
|
||
}
|
||
Err(_) => {
|
||
let msg = format!("Jobb tidsavbrutt etter {timeout_secs}s");
|
||
tracing::error!(job_id = %job.id, "{msg}");
|
||
if let Err(e) = fail_job(&db2, &job, &msg).await {
|
||
tracing::error!(job_id = %job.id, error = %e, "Kunne ikke markere tidsavbrutt jobb");
|
||
}
|
||
}
|
||
}
|
||
});
|
||
|
||
// Ikke vent — poll umiddelbart for neste jobb
|
||
continue;
|
||
}
|
||
Ok(None) => {
|
||
drop(permit);
|
||
// Ingen ventende jobber — vent før neste poll
|
||
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
|
||
}
|
||
Err(e) => {
|
||
drop(permit);
|
||
tracing::error!(error = %e, "Feil ved polling av jobbkø");
|
||
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
|
||
}
|
||
}
|
||
}
|
||
});
|
||
}
|