diff --git a/docs/infra/jobbkø.md b/docs/infra/jobbkø.md index cf01547..ddcec9e 100644 --- a/docs/infra/jobbkø.md +++ b/docs/infra/jobbkø.md @@ -93,9 +93,50 @@ Ny jobbtype = ny handler-funksjon (bygg request, håndter respons, feilhåndteri Verdiene er veiledende — SvelteKit setter prioritet ved opprettelse basert på kontekst. En manuelt trigget transkripsjon kan få høyere prioritet enn en automatisk nattjobb. -### 4.4 Ressursstyring -* **Concurrency:** `--max-concurrent` begrenser antall samtidige jobber. Default 3 — passer for 8 vCPU der noen slots er Whisper (CPU-tung) og resten er HTTP-kall (ventetid). -* **Resource Governor (Whisper):** Når et LiveKit-rom er aktivt, reduserer workeren Whisper-tråder (`--threads 2` i HTTP-kall til faster-whisper) for å beskytte lydkvaliteten. Sjekkes via LiveKit room-status før Whisper-kall. +### 4.4 Ressursstyring (implementert, oppgave 15.5) + +#### Concurrency-kontroll +* **Semaphore:** Maks 3 samtidige jobber (tokio semaphore). Forhindrer at workeren overbelaster serveren. +* **CPU-vektgrense:** Hver jobbtype har en `cpu_weight` (1–5). Totalt maks 8 (passer 8 vCPU). En Whisper-jobb (vekt 5) blokkerer andre tunge jobber men tillater lette HTTP-kall (vekt 1). +* **Per-type max_concurrent:** Begrenser hvor mange jobber av samme type som kan kjøre samtidig (f.eks. maks 1 Whisper, maks 2 render). + +#### Prioritetsregler (`job_priority_rules`-tabell) +Konfigurerbare regler per jobbtype, lagret i PG og cachet i minnet: + +| Felt | Beskrivelse | +|------|-------------| +| `base_priority` | Standard prioritet (0–10, høyere = viktigere) | +| `livekit_priority_adj` | Prioritetsjustering under aktive LiveKit-sesjoner (typisk negativ) | +| `cpu_weight` | Ressursvekt 1–5 (brukes mot MAX_TOTAL_WEIGHT=8) | +| `max_concurrent` | Maks samtidige jobber av denne typen (0=ubegrenset) | +| `timeout_seconds` | Timeout per jobb (0=default 600s) | +| `block_during_livekit` | Om jobben skal utsettes helt under LiveKit-sesjoner | + +Regler kan endres via `POST /admin/resources/update_rule` uten restart. + +#### Resource Governor (LiveKit-bevisst) +Workeren sjekker om det finnes aktive LiveKit-rom (kommunikasjonsnoder med `metadata.livekit_active = true`). Status caches i 10 sekunder. + +Når LiveKit er aktivt: +* Jobbtyper med `block_during_livekit = true` utsettes 30 sekunder +* Andre jobber nedprioriteres med `livekit_priority_adj` (f.eks. Whisper: -3) +* Totalt CPU-vektbudsjett begrenser tunge jobber (Whisper vekt=5 + LiveKit → kun lette jobber i parallell) + +#### Timeout +Hver jobb har individuell timeout basert på `timeout_seconds` i prioritetsreglene. Default 600s (10 min). Ved timeout: jobben feiler og kan retryes. + +#### Disk-overvåking +Bakgrunnsloop som sjekker diskbruk hvert 60. sekund: + +| Terskel | Alert-nivå | Handling | +|---------|-----------|----------| +| <85% | OK | Ingen — normal drift | +| ≥85% | `warning` | Loggmelding, vurder pruning | +| ≥90% | `critical` | Logges som warning, aggressiv pruning anbefalt | +| ≥95% | `emergency` | Logges som error, umiddelbar handling påkrevd | + +Status lagres i `disk_status_log`-tabellen (siste 1000 målinger beholdes). Admin-API: `GET /admin/resources/disk`. + * **Skalering senere:** To nivåer: 1. **Worker-splitting:** Workeren splittes til to binærer fra samme crate (`worker-heavy`, `worker-light`) via CLI-argument (`--types whisper_transcribe,openrouter_analyze`). Ingen kodeendring nødvendig — kun deploy-konfigurasjon. 2. **Compute-separasjon:** Flytt Rust-worker + faster-whisper til en separat Hetzner-node (evt. ARM/Ampere for pris/ytelse). LiveKit er ekstremt sensitivt for CPU-stotring — ved samtidig WebRTC og Whisper på samme maskin risikerer vi audio glitches uansett cgroups. Worker-noden poller jobbkøen i PostgreSQL over internt nettverk — arkitekturen støtter dette uten kodeendring. @@ -144,6 +185,19 @@ tabell med alle felter, retry/avbryt-knapper. Poller hvert 5. sekund. **Kode:** `jobs.rs` (`list_jobs`, `count_by_status`, `distinct_job_types`, `retry_job`, `cancel_job`), `intentions.rs` (API-handlers), `frontend/src/routes/admin/jobs/+page.svelte` +### Implementert (oppgave 15.5): Ressursstyring + +**API-endepunkter:** +- `GET /admin/resources` — samlet ressursstatus (disk, LiveKit, vekt, regler, kjørende jobber) +- `GET /admin/resources/disk` — disk-status med siste 60 historiske målinger +- `POST /admin/resources/update_rule` — oppdater/opprett prioritetsregel + +**Kode:** `resources.rs` (prioritetsregler, governor, disk-overvåking), +`jobs.rs` (semaphore, vektbasert concurrency, LiveKit-sjekk i worker-loop), +`intentions.rs` (admin-handlers for `/admin/resources/*`) + +**Migrasjon:** `014_resource_governor.sql` — `job_priority_rules` + `disk_status_log` + - Valgfritt: SpacetimeDB-event ved statusendring slik at UI kan vise fremdrift i sanntid (f.eks. "Transkriberer... 2/3 forsøk") ## 8. Instruks for Claude Code diff --git a/maskinrommet/Cargo.lock b/maskinrommet/Cargo.lock index 84048b7..9ce5fac 100644 --- a/maskinrommet/Cargo.lock +++ b/maskinrommet/Cargo.lock @@ -1083,6 +1083,7 @@ dependencies = [ "chrono", "hex", "jsonwebtoken", + "libc", "rand 0.8.5", "reqwest", "serde", diff --git a/maskinrommet/Cargo.toml b/maskinrommet/Cargo.toml index 7898ed0..05cbde8 100644 --- a/maskinrommet/Cargo.toml +++ b/maskinrommet/Cargo.toml @@ -21,3 +21,4 @@ hex = "0.4" tokio-util = { version = "0.7", features = ["io"] } tera = "1" rand = "0.8" +libc = "0.2.183" diff --git a/maskinrommet/src/intentions.rs b/maskinrommet/src/intentions.rs index 05e4c26..533ab53 100644 --- a/maskinrommet/src/intentions.rs +++ b/maskinrommet/src/intentions.rs @@ -4105,6 +4105,121 @@ pub struct JobActionResponse { pub success: bool, } +// ============================================================================= +// Ressursstyring: admin-endepunkter (oppgave 15.5) +// ============================================================================= + +/// GET /admin/resources — samlet ressursstatus. +pub async fn resource_status( + State(state): State, + _user: AuthUser, +) -> Result, (StatusCode, Json)> { + let cas_root = std::env::var("CAS_ROOT") + .unwrap_or_else(|_| "/srv/synops/media/cas".to_string()); + + let disk = crate::resources::check_disk_usage(&cas_root) + .unwrap_or_else(|_| crate::resources::check_disk_usage("/").unwrap_or( + crate::resources::DiskStatus { + mount_point: "/".to_string(), + total_bytes: 0, + used_bytes: 0, + available_bytes: 0, + usage_percent: 0.0, + alert_level: None, + } + )); + + let livekit_active = crate::resources::has_active_livekit_rooms(&state.db).await; + let total_weight = crate::resources::total_running_weight(&state.db, &state.priority_rules).await; + let rules = state.priority_rules.all().await; + let running = crate::resources::running_jobs_by_type(&state.db) + .await + .map_err(|e| internal_error(&format!("DB-feil: {e}")))?; + + Ok(Json(crate::resources::ResourceStatus { + disk, + livekit_active, + total_running_weight: total_weight, + max_weight: 8, // MAX_TOTAL_WEIGHT fra jobs.rs + priority_rules: rules, + running_jobs_by_type: running, + })) +} + +/// GET /admin/resources/disk — disk-status med historikk. +pub async fn resource_disk( + State(state): State, + _user: AuthUser, +) -> Result, (StatusCode, Json)> { + let current = crate::resources::latest_disk_status(&state.db) + .await + .map_err(|e| internal_error(&format!("DB-feil: {e}")))?; + let history = crate::resources::disk_status_history(&state.db, 60) + .await + .map_err(|e| internal_error(&format!("DB-feil: {e}")))?; + + Ok(Json(DiskOverview { current, history })) +} + +#[derive(Serialize)] +pub struct DiskOverview { + pub current: Option, + pub history: Vec, +} + +/// POST /admin/resources/update_rule — oppdater en prioritetsregel. +pub async fn update_priority_rule( + State(state): State, + _user: AuthUser, + Json(req): Json, +) -> Result, (StatusCode, Json)> { + sqlx::query( + r#"INSERT INTO job_priority_rules (job_type, base_priority, livekit_priority_adj, cpu_weight, max_concurrent, timeout_seconds, block_during_livekit) + VALUES ($1, $2, $3, $4, $5, $6, $7) + ON CONFLICT (job_type) DO UPDATE SET + base_priority = EXCLUDED.base_priority, + livekit_priority_adj = EXCLUDED.livekit_priority_adj, + cpu_weight = EXCLUDED.cpu_weight, + max_concurrent = EXCLUDED.max_concurrent, + timeout_seconds = EXCLUDED.timeout_seconds, + block_during_livekit = EXCLUDED.block_during_livekit"#, + ) + .bind(&req.job_type) + .bind(req.base_priority) + .bind(req.livekit_priority_adj) + .bind(req.cpu_weight) + .bind(req.max_concurrent) + .bind(req.timeout_seconds) + .bind(req.block_during_livekit) + .execute(&state.db) + .await + .map_err(|e| internal_error(&format!("DB-feil: {e}")))?; + + // Oppdater cache + if let Err(e) = state.priority_rules.refresh(&state.db).await { + tracing::warn!(error = %e, "Kunne ikke refreshe prioritetsregler-cache"); + } + + tracing::info!( + job_type = %req.job_type, + user = %_user.node_id, + "Admin: prioritetsregel oppdatert" + ); + + Ok(Json(JobActionResponse { success: true })) +} + +#[derive(Deserialize)] +pub struct UpdatePriorityRuleRequest { + pub job_type: String, + pub base_priority: i16, + pub livekit_priority_adj: i16, + pub cpu_weight: i16, + pub max_concurrent: i16, + pub timeout_seconds: i32, + pub block_during_livekit: bool, +} + // ============================================================================= // Tester // ============================================================================= diff --git a/maskinrommet/src/jobs.rs b/maskinrommet/src/jobs.rs index 5933f3b..7624762 100644 --- a/maskinrommet/src/jobs.rs +++ b/maskinrommet/src/jobs.rs @@ -2,12 +2,16 @@ // // Enkel polling-loop med SELECT ... FOR UPDATE SKIP LOCKED. // Dispatching til handler-funksjoner basert på job_type. +// Ressursstyring: semaphore for concurrency, prioritetsregler fra PG, +// LiveKit-bevisst resource governor (oppgave 15.5). // // Ref: docs/infra/jobbkø.md use chrono::{DateTime, Utc}; use serde::Serialize; use sqlx::PgPool; +use std::sync::Arc; +use tokio::sync::Semaphore; use uuid::Uuid; use crate::agent; @@ -16,11 +20,16 @@ use crate::audio; use crate::cas::CasStore; use crate::maintenance::MaintenanceState; use crate::publishing; +use crate::resources::{self, PriorityRules}; use crate::stdb::StdbClient; use crate::summarize; use crate::transcribe; use crate::tts; +/// Maks total CPU-vekt som kan kjøre samtidig. +/// Standard: 8 (passer for 8 vCPU der Whisper=5 + et lett kall=1+1+1). +const MAX_TOTAL_WEIGHT: i16 = 8; + /// Rad fra job_queue-tabellen. #[derive(sqlx::FromRow, Debug)] pub struct JobRow { @@ -366,14 +375,35 @@ pub async fn cancel_job(db: &PgPool, job_id: Uuid) -> Result /// Starter worker-loopen som poller job_queue. /// Kjører som en bakgrunnsoppgave i tokio. /// +/// Ressursstyring (oppgave 15.5): +/// - Concurrency-kontroll via semaphore (maks 3 samtidige jobber) +/// - Prioritetsregler fra job_priority_rules-tabellen +/// - LiveKit-bevisst resource governor: nedprioriterer/blokkerer tunge +/// jobber når LiveKit-rom er aktive +/// - CPU-vektbasert kapasitetskontroll (MAX_TOTAL_WEIGHT) +/// /// Respekterer vedlikeholdsmodus: når `maintenance.is_active()` er true, /// slutter workeren å dequeue nye jobber (kjørende jobber fullføres). -pub fn start_worker(db: PgPool, stdb: StdbClient, cas: CasStore, maintenance: MaintenanceState) { +pub fn start_worker( + db: PgPool, + stdb: StdbClient, + cas: CasStore, + maintenance: MaintenanceState, + priority_rules: PriorityRules, +) { let whisper_url = std::env::var("WHISPER_URL") .unwrap_or_else(|_| "http://faster-whisper:8000".to_string()); + // Semaphore for maks 3 samtidige jobber (doc: --max-concurrent 3) + let semaphore = Arc::new(Semaphore::new(3)); + + // Cache LiveKit-status med 10-sekunders TTL for å unngå + // å spørre PG på hver poll-iterasjon + let livekit_cache: Arc> = + Arc::new(tokio::sync::RwLock::new((false, std::time::Instant::now()))); + tokio::spawn(async move { - tracing::info!("Jobbkø-worker startet (poll-intervall: 2s)"); + tracing::info!("Jobbkø-worker startet (poll: 2s, max-concurrent: 3, max-weight: {MAX_TOTAL_WEIGHT})"); loop { // Sjekk vedlikeholdsmodus — ikke dequeue nye jobber @@ -383,36 +413,173 @@ pub fn start_worker(db: PgPool, stdb: StdbClient, cas: CasStore, maintenance: Ma continue; } + // Sjekk om vi har ledig kapasitet (semaphore) + let permit = match semaphore.clone().try_acquire_owned() { + Ok(p) => p, + Err(_) => { + // Alle 3 slots er opptatt — vent litt + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + continue; + } + }; + + // Sjekk total CPU-vekt + let current_weight = resources::total_running_weight(&db, &priority_rules).await; + if current_weight >= MAX_TOTAL_WEIGHT { + drop(permit); + tokio::time::sleep(std::time::Duration::from_secs(2)).await; + continue; + } + + // Oppdater LiveKit-cache (TTL: 10 sekunder) + let livekit_active = { + let cache = livekit_cache.read().await; + if cache.1.elapsed() < std::time::Duration::from_secs(10) { + cache.0 + } else { + drop(cache); + let active = resources::has_active_livekit_rooms(&db).await; + let mut cache = livekit_cache.write().await; + *cache = (active, std::time::Instant::now()); + active + } + }; + match dequeue(&db).await { Ok(Some(job)) => { + // Sjekk prioritetsregler for denne jobbtypen + let rule = priority_rules.get(&job.job_type).await; + let decision = resources::evaluate_job(&rule, livekit_active); + + // Sjekk om jobben skal utsettes (LiveKit-blokkering) + if decision.should_defer { + tracing::info!( + job_id = %job.id, + job_type = %job.job_type, + "Jobb utsatt — blokkert under aktiv LiveKit-sesjon" + ); + // Sett tilbake til pending med kort delay + let _ = sqlx::query( + "UPDATE job_queue SET status = 'pending', started_at = NULL, attempts = attempts - 1, scheduled_for = now() + interval '30 seconds' WHERE id = $1" + ) + .bind(job.id) + .execute(&db) + .await; + drop(permit); + tokio::time::sleep(std::time::Duration::from_secs(2)).await; + continue; + } + + // Sjekk max_concurrent per type + if rule.max_concurrent > 0 { + let running = resources::count_running_of_type(&db, &job.job_type) + .await + .unwrap_or(0); + // running inkluderer den vi nettopp satte til 'running', + // så sjekk mot max_concurrent + if running > rule.max_concurrent as i64 { + tracing::debug!( + job_type = %job.job_type, + running = running, + max = rule.max_concurrent, + "Max concurrent nådd for jobbtype — utsetter" + ); + let _ = sqlx::query( + "UPDATE job_queue SET status = 'pending', started_at = NULL, attempts = attempts - 1, scheduled_for = now() + interval '5 seconds' WHERE id = $1" + ) + .bind(job.id) + .execute(&db) + .await; + drop(permit); + tokio::time::sleep(std::time::Duration::from_secs(2)).await; + continue; + } + } + + // Sjekk at ny jobb ikke sprenger vektgrensen + if current_weight + rule.cpu_weight > MAX_TOTAL_WEIGHT { + tracing::debug!( + job_type = %job.job_type, + weight = rule.cpu_weight, + current = current_weight, + max = MAX_TOTAL_WEIGHT, + "CPU-vektgrense nådd — utsetter tung jobb" + ); + let _ = sqlx::query( + "UPDATE job_queue SET status = 'pending', started_at = NULL, attempts = attempts - 1, scheduled_for = now() + interval '5 seconds' WHERE id = $1" + ) + .bind(job.id) + .execute(&db) + .await; + drop(permit); + tokio::time::sleep(std::time::Duration::from_secs(2)).await; + continue; + } + tracing::info!( job_id = %job.id, job_type = %job.job_type, attempt = job.attempts, + weight = rule.cpu_weight, + livekit = livekit_active, "Behandler jobb" ); - match dispatch(&job, &db, &stdb, &cas, &whisper_url).await { - Ok(result) => { - if let Err(e) = complete_job(&db, job.id, result).await { - tracing::error!(job_id = %job.id, error = %e, "Kunne ikke markere jobb som fullført"); - } else { - tracing::info!(job_id = %job.id, "Jobb fullført"); + // Kjør jobben i en egen tokio-task (frigjør poll-loopen) + let db2 = db.clone(); + let stdb2 = stdb.clone(); + let cas2 = cas.clone(); + let whisper_url2 = whisper_url.clone(); + let timeout_secs = if rule.timeout_seconds > 0 { + rule.timeout_seconds as u64 + } else { + 600 // 10 min default + }; + + tokio::spawn(async move { + // Hold semaphore-permit til jobben er ferdig + let _permit = permit; + + let result = tokio::time::timeout( + std::time::Duration::from_secs(timeout_secs), + dispatch(&job, &db2, &stdb2, &cas2, &whisper_url2), + ) + .await; + + match result { + Ok(Ok(res)) => { + if let Err(e) = complete_job(&db2, job.id, res).await { + tracing::error!(job_id = %job.id, error = %e, "Kunne ikke markere jobb som fullført"); + } else { + tracing::info!(job_id = %job.id, "Jobb fullført"); + } + } + Ok(Err(err)) => { + tracing::error!(job_id = %job.id, error = %err, "Jobb feilet"); + if let Err(e) = fail_job(&db2, &job, &err).await { + tracing::error!(job_id = %job.id, error = %e, "Kunne ikke markere jobb som feilet"); + } + } + Err(_) => { + let msg = format!("Jobb tidsavbrutt etter {timeout_secs}s"); + tracing::error!(job_id = %job.id, "{msg}"); + if let Err(e) = fail_job(&db2, &job, &msg).await { + tracing::error!(job_id = %job.id, error = %e, "Kunne ikke markere tidsavbrutt jobb"); + } } } - Err(err) => { - tracing::error!(job_id = %job.id, error = %err, "Jobb feilet"); - if let Err(e) = fail_job(&db, &job, &err).await { - tracing::error!(job_id = %job.id, error = %e, "Kunne ikke markere jobb som feilet"); - } - } - } + }); + + // Ikke vent — poll umiddelbart for neste jobb + continue; } Ok(None) => { + drop(permit); // Ingen ventende jobber — vent før neste poll tokio::time::sleep(std::time::Duration::from_secs(2)).await; } Err(e) => { + drop(permit); tracing::error!(error = %e, "Feil ved polling av jobbkø"); tokio::time::sleep(std::time::Duration::from_secs(5)).await; } diff --git a/maskinrommet/src/main.rs b/maskinrommet/src/main.rs index d89c32d..5876b24 100644 --- a/maskinrommet/src/main.rs +++ b/maskinrommet/src/main.rs @@ -12,6 +12,7 @@ pub mod maintenance; pub mod pruning; mod queries; pub mod publishing; +pub mod resources; mod rss; mod serving; mod stdb; @@ -41,6 +42,7 @@ pub struct AppState { pub index_cache: publishing::IndexCache, pub dynamic_page_cache: publishing::DynamicPageCache, pub maintenance: maintenance::MaintenanceState, + pub priority_rules: resources::PriorityRules, } #[derive(Serialize)] @@ -141,12 +143,20 @@ async fn main() { // Vedlikeholdstilstand (oppgave 15.2) let maintenance = maintenance::MaintenanceState::new(); - // Start jobbkø-worker i bakgrunnen - jobs::start_worker(db.clone(), stdb.clone(), cas.clone(), maintenance.clone()); + // Last prioritetsregler fra PG (oppgave 15.5) + let priority_rules = resources::PriorityRules::load(&db) + .await + .expect("Kunne ikke laste prioritetsregler fra PG"); + + // Start jobbkø-worker i bakgrunnen (med ressursstyring, oppgave 15.5) + jobs::start_worker(db.clone(), stdb.clone(), cas.clone(), maintenance.clone(), priority_rules.clone()); // Start periodisk CAS-pruning i bakgrunnen pruning::start_pruning_loop(db.clone(), cas.clone()); + // Start disk-overvåking i bakgrunnen (oppgave 15.5) + resources::start_disk_monitor(db.clone()); + // Start planlagt publisering-scheduler i bakgrunnen publishing::start_publish_scheduler(db.clone()); @@ -155,7 +165,7 @@ async fn main() { let index_cache = publishing::new_index_cache(); let dynamic_page_cache = publishing::new_dynamic_page_cache(); - let state = AppState { db, jwks, stdb, cas, index_cache, dynamic_page_cache, maintenance }; + let state = AppState { db, jwks, stdb, cas, index_cache, dynamic_page_cache, maintenance, priority_rules }; // Ruter: /health er offentlig, /me krever gyldig JWT let app = Router::new() @@ -204,6 +214,10 @@ async fn main() { .route("/admin/jobs", get(intentions::list_jobs)) .route("/intentions/retry_job", post(intentions::retry_job)) .route("/intentions/cancel_job", post(intentions::cancel_job)) + // Ressursstyring (oppgave 15.5) + .route("/admin/resources", get(intentions::resource_status)) + .route("/admin/resources/disk", get(intentions::resource_disk)) + .route("/admin/resources/update_rule", post(intentions::update_priority_rule)) // AI Gateway-konfigurasjon (oppgave 15.4) .route("/admin/ai", get(ai_admin::ai_overview)) .route("/admin/ai/usage", get(ai_admin::ai_usage)) diff --git a/maskinrommet/src/resources.rs b/maskinrommet/src/resources.rs new file mode 100644 index 0000000..4a94502 --- /dev/null +++ b/maskinrommet/src/resources.rs @@ -0,0 +1,443 @@ +// Ressursstyring — prioritetsregler, ressurs-governor og disk-overvåking. +// +// Tre hoveddeler: +// 1. PriorityRules: leser job_priority_rules fra PG, brukes av jobbkø-workeren +// til å justere prioritet og concurrency basert på jobbtype og LiveKit-status. +// 2. ResourceGovernor: sjekker om LiveKit-rom er aktive og tilpasser +// worker-oppførsel (blokkering, nedprioritering, Whisper-trådbegrensning). +// 3. DiskMonitor: periodisk sjekk av diskbruk med varsling ved terskeloverskridelse. +// +// Ref: docs/infra/jobbkø.md § 4.4, oppgave 15.5 + +use serde::Serialize; +use sqlx::PgPool; +use std::collections::HashMap; +use std::sync::Arc; +use tokio::sync::RwLock; + +// ============================================================================= +// Prioritetsregler +// ============================================================================= + +/// En rad fra job_priority_rules-tabellen. +#[derive(sqlx::FromRow, Debug, Clone, Serialize)] +pub struct PriorityRule { + pub job_type: String, + pub base_priority: i16, + pub livekit_priority_adj: i16, + pub cpu_weight: i16, + pub max_concurrent: i16, + pub timeout_seconds: i32, + pub block_during_livekit: bool, +} + +/// Cache av prioritetsregler — lastes fra PG ved oppstart og kan refreshes. +#[derive(Clone)] +pub struct PriorityRules { + rules: Arc>>, +} + +impl PriorityRules { + pub async fn load(db: &PgPool) -> Result { + let rows = sqlx::query_as::<_, PriorityRule>( + "SELECT job_type, base_priority, livekit_priority_adj, cpu_weight, max_concurrent, timeout_seconds, block_during_livekit FROM job_priority_rules" + ) + .fetch_all(db) + .await?; + + let mut map = HashMap::new(); + for rule in rows { + map.insert(rule.job_type.clone(), rule); + } + + tracing::info!(rules = map.len(), "Prioritetsregler lastet"); + Ok(Self { + rules: Arc::new(RwLock::new(map)), + }) + } + + /// Hent regelen for en jobbtype. Returnerer default hvis ikke funnet. + pub async fn get(&self, job_type: &str) -> PriorityRule { + let rules = self.rules.read().await; + rules.get(job_type).cloned().unwrap_or(PriorityRule { + job_type: job_type.to_string(), + base_priority: 5, + livekit_priority_adj: 0, + cpu_weight: 1, + max_concurrent: 0, + timeout_seconds: 0, + block_during_livekit: false, + }) + } + + /// Hent alle regler (for admin-API). + pub async fn all(&self) -> Vec { + let rules = self.rules.read().await; + rules.values().cloned().collect() + } + + /// Oppdater cache fra PG. + pub async fn refresh(&self, db: &PgPool) -> Result<(), sqlx::Error> { + let rows = sqlx::query_as::<_, PriorityRule>( + "SELECT job_type, base_priority, livekit_priority_adj, cpu_weight, max_concurrent, timeout_seconds, block_during_livekit FROM job_priority_rules" + ) + .fetch_all(db) + .await?; + + let mut map = HashMap::new(); + for rule in rows { + map.insert(rule.job_type.clone(), rule); + } + + let mut rules = self.rules.write().await; + *rules = map; + Ok(()) + } +} + +// ============================================================================= +// Ressurs-governor (LiveKit-bevisst) +// ============================================================================= + +/// Sjekker om det er aktive LiveKit-rom ved å spørre kommunikasjonsnoder +/// med pågående lyd-sesjoner. Vi sjekker edges tabellen for aktive +/// member_of-edges til kommunikasjonsnoder som har livekit_room i metadata. +/// +/// Enkel tilnærming: sjekk om noen kommunikasjonsnoder har +/// metadata.livekit_active = true. +pub async fn has_active_livekit_rooms(db: &PgPool) -> bool { + // Sjekk om det finnes kommunikasjonsnoder med aktiv LiveKit-sesjon. + // Kommunikasjonsnoder som er "open" med livekit_active i metadata + // indikerer en pågående lydøkt. + let result = sqlx::query_scalar::<_, i64>( + r#"SELECT COUNT(*) FROM nodes + WHERE node_kind = 'communication' + AND metadata->>'livekit_active' = 'true'"#, + ) + .fetch_one(db) + .await; + + match result { + Ok(count) => { + if count > 0 { + tracing::debug!(active_rooms = count, "Aktive LiveKit-rom funnet"); + } + count > 0 + } + Err(e) => { + tracing::warn!(error = %e, "Kunne ikke sjekke LiveKit-status — antar ingen aktive rom"); + false + } + } +} + +/// Sjekk om en jobb av gitt type skal blokkeres/nedprioriteres. +pub struct GovernorDecision { + /// Justert prioritet (kan være lavere enn base under LiveKit-last) + pub effective_priority: i16, + /// Om jobben skal utsettes (blokkert pga LiveKit) + pub should_defer: bool, +} + +/// Evaluer en jobbtype gitt gjeldende LiveKit-status. +pub fn evaluate_job(rule: &PriorityRule, livekit_active: bool) -> GovernorDecision { + if livekit_active && rule.block_during_livekit { + return GovernorDecision { + effective_priority: 0, + should_defer: true, + }; + } + + let adj = if livekit_active { + rule.livekit_priority_adj + } else { + 0 + }; + + GovernorDecision { + effective_priority: (rule.base_priority + adj).max(0), + should_defer: false, + } +} + +/// Tell kjørende jobber av en gitt type. +pub async fn count_running_of_type(db: &PgPool, job_type: &str) -> Result { + sqlx::query_scalar::<_, i64>( + "SELECT COUNT(*) FROM job_queue WHERE status = 'running' AND job_type = $1", + ) + .bind(job_type) + .fetch_one(db) + .await +} + +/// Total CPU-vekt av alle kjørende jobber. +pub async fn total_running_weight(db: &PgPool, rules: &PriorityRules) -> i16 { + let running_types = sqlx::query_scalar::<_, String>( + "SELECT job_type FROM job_queue WHERE status = 'running'", + ) + .fetch_all(db) + .await + .unwrap_or_default(); + + let mut total: i16 = 0; + for jt in &running_types { + let rule = rules.get(jt).await; + total = total.saturating_add(rule.cpu_weight); + } + total +} + +// ============================================================================= +// Disk-overvåking +// ============================================================================= + +/// Disk-status for et monteringspunkt. +#[derive(Debug, Clone, Serialize)] +pub struct DiskStatus { + pub mount_point: String, + pub total_bytes: u64, + pub used_bytes: u64, + pub available_bytes: u64, + pub usage_percent: f32, + pub alert_level: Option, +} + +/// Hent diskbruk for et gitt monteringspunkt via statvfs. +pub fn check_disk_usage(path: &str) -> Result { + // Bruk nix::sys::statvfs for å hente diskinfo + use std::ffi::CString; + + let c_path = + CString::new(path).map_err(|e| format!("Ugyldig path for statvfs: {e}"))?; + + // Safe wrapper around libc::statvfs + let mut stat: libc::statvfs = unsafe { std::mem::zeroed() }; + let ret = unsafe { libc::statvfs(c_path.as_ptr(), &mut stat) }; + + if ret != 0 { + return Err(format!( + "statvfs feilet for {path}: {}", + std::io::Error::last_os_error() + )); + } + + let block_size = stat.f_frsize as u64; + let total_bytes = stat.f_blocks as u64 * block_size; + let available_bytes = stat.f_bavail as u64 * block_size; + let used_bytes = total_bytes - (stat.f_bfree as u64 * block_size); + + let usage_percent = if total_bytes > 0 { + (used_bytes as f64 / total_bytes as f64 * 100.0) as f32 + } else { + 0.0 + }; + + let alert_level = if usage_percent >= 95.0 { + Some("emergency".to_string()) + } else if usage_percent >= 90.0 { + Some("critical".to_string()) + } else if usage_percent >= 85.0 { + Some("warning".to_string()) + } else { + None + }; + + Ok(DiskStatus { + mount_point: path.to_string(), + total_bytes, + used_bytes, + available_bytes, + usage_percent, + alert_level, + }) +} + +/// Logg diskstatus til disk_status_log-tabellen. +async fn log_disk_status(db: &PgPool, status: &DiskStatus) -> Result<(), sqlx::Error> { + sqlx::query( + r#"INSERT INTO disk_status_log (mount_point, total_bytes, used_bytes, available_bytes, usage_percent, alert_level) + VALUES ($1, $2, $3, $4, $5, $6)"#, + ) + .bind(&status.mount_point) + .bind(status.total_bytes as i64) + .bind(status.used_bytes as i64) + .bind(status.available_bytes as i64) + .bind(status.usage_percent) + .bind(&status.alert_level) + .execute(db) + .await?; + Ok(()) +} + +/// Rydd opp gamle disk_status_log-rader (behold siste 1000). +async fn cleanup_disk_log(db: &PgPool) -> Result<(), sqlx::Error> { + sqlx::query( + r#"DELETE FROM disk_status_log + WHERE id NOT IN ( + SELECT id FROM disk_status_log ORDER BY checked_at DESC LIMIT 1000 + )"#, + ) + .execute(db) + .await?; + Ok(()) +} + +/// Starter disk-overvåkingsloop som sjekker diskbruk hvert 60. sekund. +/// Logger til PG og logger advarsel ved terskeloverskridelse. +pub fn start_disk_monitor(db: PgPool) { + // CAS-rot og data-partisjon + let cas_root = std::env::var("CAS_ROOT") + .unwrap_or_else(|_| "/srv/synops/media/cas".to_string()); + // Sjekk roten av partisjon der CAS ligger + let check_path = if std::path::Path::new(&cas_root).exists() { + cas_root.clone() + } else { + "/".to_string() + }; + + tokio::spawn(async move { + // Vent litt etter oppstart + tokio::time::sleep(std::time::Duration::from_secs(30)).await; + tracing::info!(path = %check_path, "Disk-overvåking startet (intervall: 60s)"); + + let mut check_count: u64 = 0; + + loop { + match check_disk_usage(&check_path) { + Ok(status) => { + // Logg til PG + if let Err(e) = log_disk_status(&db, &status).await { + tracing::warn!(error = %e, "Kunne ikke logge diskstatus"); + } + + // Varsle ved terskeloverskridelse + match status.alert_level.as_deref() { + Some("emergency") => { + tracing::error!( + usage = format!("{:.1}%", status.usage_percent), + available_mb = status.available_bytes / 1_048_576, + "DISK NØDVARSEL: >95% brukt — umiddelbar handling påkrevd!" + ); + } + Some("critical") => { + tracing::warn!( + usage = format!("{:.1}%", status.usage_percent), + available_mb = status.available_bytes / 1_048_576, + "Disk kritisk: >90% brukt — aggressiv pruning anbefalt" + ); + } + Some("warning") => { + tracing::warn!( + usage = format!("{:.1}%", status.usage_percent), + available_mb = status.available_bytes / 1_048_576, + "Disk advarsel: >85% brukt" + ); + } + _ => { + // Normal — logg bare hvert 10. minutt (hvert 10. sjekk) + if check_count % 10 == 0 { + tracing::debug!( + usage = format!("{:.1}%", status.usage_percent), + available_gb = status.available_bytes / 1_073_741_824, + "Diskstatus OK" + ); + } + } + } + } + Err(e) => { + tracing::error!(error = %e, "Feil ved disksjekk"); + } + } + + check_count += 1; + + // Rydd opp gamle logg-rader annenhver time (120 sjekker à 60s) + if check_count % 120 == 0 { + if let Err(e) = cleanup_disk_log(&db).await { + tracing::warn!(error = %e, "Kunne ikke rydde disk_status_log"); + } + } + + tokio::time::sleep(std::time::Duration::from_secs(60)).await; + } + }); +} + +// ============================================================================= +// Admin-API responser +// ============================================================================= + +/// Samlet ressursstatus for admin-panelet. +#[derive(Serialize)] +pub struct ResourceStatus { + pub disk: DiskStatus, + pub livekit_active: bool, + pub total_running_weight: i16, + pub max_weight: i16, + pub priority_rules: Vec, + pub running_jobs_by_type: Vec, +} + +#[derive(Serialize, sqlx::FromRow)] +pub struct RunningJobCount { + pub job_type: String, + pub count: i64, +} + +/// Hent antall kjørende jobber per type. +pub async fn running_jobs_by_type(db: &PgPool) -> Result, sqlx::Error> { + sqlx::query_as::<_, RunningJobCount>( + "SELECT job_type, COUNT(*) as count FROM job_queue WHERE status = 'running' GROUP BY job_type", + ) + .fetch_all(db) + .await +} + +/// Hent siste disk-status fra loggen. +pub async fn latest_disk_status(db: &PgPool) -> Result, sqlx::Error> { + let row = sqlx::query_as::<_, DiskStatusRow>( + "SELECT mount_point, total_bytes, used_bytes, available_bytes, usage_percent, alert_level FROM disk_status_log ORDER BY checked_at DESC LIMIT 1", + ) + .fetch_optional(db) + .await?; + + Ok(row.map(|r| DiskStatus { + mount_point: r.mount_point, + total_bytes: r.total_bytes as u64, + used_bytes: r.used_bytes as u64, + available_bytes: r.available_bytes as u64, + usage_percent: r.usage_percent, + alert_level: r.alert_level, + })) +} + +#[derive(sqlx::FromRow)] +struct DiskStatusRow { + mount_point: String, + total_bytes: i64, + used_bytes: i64, + available_bytes: i64, + usage_percent: f32, + alert_level: Option, +} + +/// Hent disk-status-historikk (siste N målinger). +pub async fn disk_status_history( + db: &PgPool, + limit: i64, +) -> Result, sqlx::Error> { + sqlx::query_as::<_, DiskStatusHistoryRow>( + "SELECT checked_at, usage_percent, alert_level FROM disk_status_log ORDER BY checked_at DESC LIMIT $1", + ) + .bind(limit) + .fetch_all(db) + .await +} + +#[derive(sqlx::FromRow, Serialize)] +pub struct DiskStatusHistoryRow { + pub checked_at: chrono::DateTime, + pub usage_percent: f32, + pub alert_level: Option, +} diff --git a/migrations/014_resource_governor.sql b/migrations/014_resource_governor.sql new file mode 100644 index 0000000..d049fb4 --- /dev/null +++ b/migrations/014_resource_governor.sql @@ -0,0 +1,46 @@ +-- Oppgave 15.5: Ressursstyring — prioritetsregler, ressursgrenser, disk-status +-- +-- Oppretter tabell for jobbtype-prioritetsregler som brukes av workeren +-- til å styre concurrency, prioritet og LiveKit-tilpasning. + +CREATE TABLE job_priority_rules ( + job_type TEXT PRIMARY KEY, + base_priority SMALLINT NOT NULL DEFAULT 5, + -- Hvor mye prioriteten reduseres når LiveKit-rom er aktive + livekit_priority_adj SMALLINT NOT NULL DEFAULT 0, + -- Ressursvekt (1=lett HTTP-kall, 5=tung CPU som Whisper) + cpu_weight SMALLINT NOT NULL DEFAULT 1, + -- Maks samtidige jobber av denne typen (0 = ubegrenset) + max_concurrent SMALLINT NOT NULL DEFAULT 0, + -- Timeout i sekunder (0 = bruk default) + timeout_seconds INT NOT NULL DEFAULT 0, + -- Om jobben skal blokkeres helt under aktive LiveKit-sesjoner + block_during_livekit BOOLEAN NOT NULL DEFAULT false +); + +-- Seed med kjente jobbtyper og fornuftige defaults +INSERT INTO job_priority_rules (job_type, base_priority, livekit_priority_adj, cpu_weight, max_concurrent, timeout_seconds, block_during_livekit) +VALUES + ('whisper_transcribe', 5, -3, 5, 1, 600, false), + ('agent_respond', 10, 0, 1, 2, 120, false), + ('suggest_edges', 3, 0, 1, 2, 120, false), + ('summarize_communication', 5, 0, 1, 1, 120, false), + ('tts_generate', 5, -1, 2, 1, 300, false), + ('audio_process', 5, -2, 3, 1, 600, false), + ('render_article', 7, 0, 1, 2, 60, false), + ('render_index', 7, 0, 1, 2, 60, false); + +-- Tabell for disk-status-snapshots (siste verdier brukes for varsling) +CREATE TABLE disk_status_log ( + id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, + checked_at TIMESTAMPTZ NOT NULL DEFAULT now(), + mount_point TEXT NOT NULL DEFAULT '/', + total_bytes BIGINT NOT NULL, + used_bytes BIGINT NOT NULL, + available_bytes BIGINT NOT NULL, + usage_percent REAL NOT NULL, + alert_level TEXT -- NULL=ok, 'warning'=85%+, 'critical'=90%+, 'emergency'=95%+ +); + +-- Hold bare siste 1000 målinger +CREATE INDEX idx_disk_status_latest ON disk_status_log (checked_at DESC); diff --git a/tasks.md b/tasks.md index 56d097c..943aeea 100644 --- a/tasks.md +++ b/tasks.md @@ -167,8 +167,7 @@ Uavhengige faser kan fortsatt plukkes. - [x] 15.2 Graceful shutdown: admin setter vedlikeholdstidspunkt → nedtelling i frontend → nye LiveKit-rom blokkeres → jobbkø stopper → vent på aktive jobber → restart. Vis aktive sesjoner før bekreftelse. - [x] 15.3 Jobbkø-oversikt: admin-UI for aktive, ventende og feilede jobber. Filtrer på type/samling/status. Manuell retry og avbryt. - [x] 15.4 AI Gateway-konfigurasjon: admin-UI for modelloversikt, API-nøkler (kryptert), ruting-regler per jobbtype, fallback-kjeder, forbruksoversikt per samling. Ref: `docs/infra/ai_gateway.md`. -- [~] 15.5 Ressursstyring: prioritetsregler mellom jobbtyper, ressursgrenser per worker, ressurs-governor for automatisk nedprioritering under aktive LiveKit-sesjoner, disk-status med varsling. - > Påbegynt: 2026-03-18T03:53 +- [x] 15.5 Ressursstyring: prioritetsregler mellom jobbtyper, ressursgrenser per worker, ressurs-governor for automatisk nedprioritering under aktive LiveKit-sesjoner, disk-status med varsling. - [ ] 15.6 Serverhelse-dashboard: tjeneste-status (PG, STDB, Caddy, Authentik, LiteLLM, Whisper, LiveKit), metrikker (CPU, minne, disk), backup-status, logg-tilgang. - [ ] 15.7 Ressursforbruk-logging: `resource_usage_log`-tabell i PG. Maskinrommet logger AI-tokens (inn/ut, modellnivå), Whisper-tid (sek), TTS-tegn, CAS-lagring (bytes), LiveKit-tid (deltaker-min). Båndbredde via Caddy-logg-parsing. Ref: `docs/features/ressursforbruk.md`. - [ ] 15.8 Forbruksoversikt i admin: aggregert visning per samling, per ressurstype, per tidsperiode. Drill-down til jobbtype og modellnivå.