Fullfører oppgave 15.5: Ressursstyring for jobbkø

Implementerer prioritetsregler, ressursgrenser og LiveKit-bevisst
resource governor for jobbkø-workeren, pluss disk-overvåking med varsling.

Hovedkomponenter:

1. Prioritetsregler (job_priority_rules-tabell):
   - Konfigurerbar base_priority, cpu_weight, max_concurrent per jobbtype
   - LiveKit-justering: livekit_priority_adj og block_during_livekit
   - Timeout per jobbtype
   - Admin-API for å endre regler uten restart

2. Ressurs-governor i worker-loopen:
   - Semaphore: maks 3 samtidige jobber
   - CPU-vektgrense: total vekt maks 8 (Whisper=5, render=1, etc.)
   - Per-type concurrency-grense
   - LiveKit-status sjekkes med 10s cache-TTL
   - Jobber utsettes/nedprioriteres ved aktive LiveKit-rom
   - Individuell timeout per jobb (default 600s)
   - Jobber kjøres i egne tokio-tasks (parallell dispatch)

3. Disk-overvåking:
   - Sjekker diskbruk hvert 60. sekund via statvfs
   - Terskler: 85% warning, 90% critical, 95% emergency
   - Logger til disk_status_log (siste 1000 målinger beholdes)
   - Admin-API: GET /admin/resources/disk med historikk

4. Admin-API:
   - GET /admin/resources — samlet ressursstatus
   - GET /admin/resources/disk — diskstatus med historikk
   - POST /admin/resources/update_rule — oppdater prioritetsregel

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
vegard 2026-03-18 04:02:11 +00:00
parent d57f3d1324
commit b64f217637
9 changed files with 863 additions and 23 deletions

View file

@ -93,9 +93,50 @@ Ny jobbtype = ny handler-funksjon (bygg request, håndter respons, feilhåndteri
Verdiene er veiledende — SvelteKit setter prioritet ved opprettelse basert på kontekst. En manuelt trigget transkripsjon kan få høyere prioritet enn en automatisk nattjobb. Verdiene er veiledende — SvelteKit setter prioritet ved opprettelse basert på kontekst. En manuelt trigget transkripsjon kan få høyere prioritet enn en automatisk nattjobb.
### 4.4 Ressursstyring ### 4.4 Ressursstyring (implementert, oppgave 15.5)
* **Concurrency:** `--max-concurrent` begrenser antall samtidige jobber. Default 3 — passer for 8 vCPU der noen slots er Whisper (CPU-tung) og resten er HTTP-kall (ventetid).
* **Resource Governor (Whisper):** Når et LiveKit-rom er aktivt, reduserer workeren Whisper-tråder (`--threads 2` i HTTP-kall til faster-whisper) for å beskytte lydkvaliteten. Sjekkes via LiveKit room-status før Whisper-kall. #### Concurrency-kontroll
* **Semaphore:** Maks 3 samtidige jobber (tokio semaphore). Forhindrer at workeren overbelaster serveren.
* **CPU-vektgrense:** Hver jobbtype har en `cpu_weight` (15). Totalt maks 8 (passer 8 vCPU). En Whisper-jobb (vekt 5) blokkerer andre tunge jobber men tillater lette HTTP-kall (vekt 1).
* **Per-type max_concurrent:** Begrenser hvor mange jobber av samme type som kan kjøre samtidig (f.eks. maks 1 Whisper, maks 2 render).
#### Prioritetsregler (`job_priority_rules`-tabell)
Konfigurerbare regler per jobbtype, lagret i PG og cachet i minnet:
| Felt | Beskrivelse |
|------|-------------|
| `base_priority` | Standard prioritet (010, høyere = viktigere) |
| `livekit_priority_adj` | Prioritetsjustering under aktive LiveKit-sesjoner (typisk negativ) |
| `cpu_weight` | Ressursvekt 15 (brukes mot MAX_TOTAL_WEIGHT=8) |
| `max_concurrent` | Maks samtidige jobber av denne typen (0=ubegrenset) |
| `timeout_seconds` | Timeout per jobb (0=default 600s) |
| `block_during_livekit` | Om jobben skal utsettes helt under LiveKit-sesjoner |
Regler kan endres via `POST /admin/resources/update_rule` uten restart.
#### Resource Governor (LiveKit-bevisst)
Workeren sjekker om det finnes aktive LiveKit-rom (kommunikasjonsnoder med `metadata.livekit_active = true`). Status caches i 10 sekunder.
Når LiveKit er aktivt:
* Jobbtyper med `block_during_livekit = true` utsettes 30 sekunder
* Andre jobber nedprioriteres med `livekit_priority_adj` (f.eks. Whisper: -3)
* Totalt CPU-vektbudsjett begrenser tunge jobber (Whisper vekt=5 + LiveKit → kun lette jobber i parallell)
#### Timeout
Hver jobb har individuell timeout basert på `timeout_seconds` i prioritetsreglene. Default 600s (10 min). Ved timeout: jobben feiler og kan retryes.
#### Disk-overvåking
Bakgrunnsloop som sjekker diskbruk hvert 60. sekund:
| Terskel | Alert-nivå | Handling |
|---------|-----------|----------|
| <85% | OK | Ingen normal drift |
| ≥85% | `warning` | Loggmelding, vurder pruning |
| ≥90% | `critical` | Logges som warning, aggressiv pruning anbefalt |
| ≥95% | `emergency` | Logges som error, umiddelbar handling påkrevd |
Status lagres i `disk_status_log`-tabellen (siste 1000 målinger beholdes). Admin-API: `GET /admin/resources/disk`.
* **Skalering senere:** To nivåer: * **Skalering senere:** To nivåer:
1. **Worker-splitting:** Workeren splittes til to binærer fra samme crate (`worker-heavy`, `worker-light`) via CLI-argument (`--types whisper_transcribe,openrouter_analyze`). Ingen kodeendring nødvendig — kun deploy-konfigurasjon. 1. **Worker-splitting:** Workeren splittes til to binærer fra samme crate (`worker-heavy`, `worker-light`) via CLI-argument (`--types whisper_transcribe,openrouter_analyze`). Ingen kodeendring nødvendig — kun deploy-konfigurasjon.
2. **Compute-separasjon:** Flytt Rust-worker + faster-whisper til en separat Hetzner-node (evt. ARM/Ampere for pris/ytelse). LiveKit er ekstremt sensitivt for CPU-stotring — ved samtidig WebRTC og Whisper på samme maskin risikerer vi audio glitches uansett cgroups. Worker-noden poller jobbkøen i PostgreSQL over internt nettverk — arkitekturen støtter dette uten kodeendring. 2. **Compute-separasjon:** Flytt Rust-worker + faster-whisper til en separat Hetzner-node (evt. ARM/Ampere for pris/ytelse). LiveKit er ekstremt sensitivt for CPU-stotring — ved samtidig WebRTC og Whisper på samme maskin risikerer vi audio glitches uansett cgroups. Worker-noden poller jobbkøen i PostgreSQL over internt nettverk — arkitekturen støtter dette uten kodeendring.
@ -144,6 +185,19 @@ tabell med alle felter, retry/avbryt-knapper. Poller hvert 5. sekund.
**Kode:** `jobs.rs` (`list_jobs`, `count_by_status`, `distinct_job_types`, `retry_job`, `cancel_job`), **Kode:** `jobs.rs` (`list_jobs`, `count_by_status`, `distinct_job_types`, `retry_job`, `cancel_job`),
`intentions.rs` (API-handlers), `frontend/src/routes/admin/jobs/+page.svelte` `intentions.rs` (API-handlers), `frontend/src/routes/admin/jobs/+page.svelte`
### Implementert (oppgave 15.5): Ressursstyring
**API-endepunkter:**
- `GET /admin/resources` — samlet ressursstatus (disk, LiveKit, vekt, regler, kjørende jobber)
- `GET /admin/resources/disk` — disk-status med siste 60 historiske målinger
- `POST /admin/resources/update_rule` — oppdater/opprett prioritetsregel
**Kode:** `resources.rs` (prioritetsregler, governor, disk-overvåking),
`jobs.rs` (semaphore, vektbasert concurrency, LiveKit-sjekk i worker-loop),
`intentions.rs` (admin-handlers for `/admin/resources/*`)
**Migrasjon:** `014_resource_governor.sql``job_priority_rules` + `disk_status_log`
- Valgfritt: SpacetimeDB-event ved statusendring slik at UI kan vise fremdrift i sanntid (f.eks. "Transkriberer... 2/3 forsøk") - Valgfritt: SpacetimeDB-event ved statusendring slik at UI kan vise fremdrift i sanntid (f.eks. "Transkriberer... 2/3 forsøk")
## 8. Instruks for Claude Code ## 8. Instruks for Claude Code

View file

@ -1083,6 +1083,7 @@ dependencies = [
"chrono", "chrono",
"hex", "hex",
"jsonwebtoken", "jsonwebtoken",
"libc",
"rand 0.8.5", "rand 0.8.5",
"reqwest", "reqwest",
"serde", "serde",

View file

@ -21,3 +21,4 @@ hex = "0.4"
tokio-util = { version = "0.7", features = ["io"] } tokio-util = { version = "0.7", features = ["io"] }
tera = "1" tera = "1"
rand = "0.8" rand = "0.8"
libc = "0.2.183"

View file

@ -4105,6 +4105,121 @@ pub struct JobActionResponse {
pub success: bool, pub success: bool,
} }
// =============================================================================
// Ressursstyring: admin-endepunkter (oppgave 15.5)
// =============================================================================
/// GET /admin/resources — samlet ressursstatus.
pub async fn resource_status(
State(state): State<AppState>,
_user: AuthUser,
) -> Result<Json<crate::resources::ResourceStatus>, (StatusCode, Json<ErrorResponse>)> {
let cas_root = std::env::var("CAS_ROOT")
.unwrap_or_else(|_| "/srv/synops/media/cas".to_string());
let disk = crate::resources::check_disk_usage(&cas_root)
.unwrap_or_else(|_| crate::resources::check_disk_usage("/").unwrap_or(
crate::resources::DiskStatus {
mount_point: "/".to_string(),
total_bytes: 0,
used_bytes: 0,
available_bytes: 0,
usage_percent: 0.0,
alert_level: None,
}
));
let livekit_active = crate::resources::has_active_livekit_rooms(&state.db).await;
let total_weight = crate::resources::total_running_weight(&state.db, &state.priority_rules).await;
let rules = state.priority_rules.all().await;
let running = crate::resources::running_jobs_by_type(&state.db)
.await
.map_err(|e| internal_error(&format!("DB-feil: {e}")))?;
Ok(Json(crate::resources::ResourceStatus {
disk,
livekit_active,
total_running_weight: total_weight,
max_weight: 8, // MAX_TOTAL_WEIGHT fra jobs.rs
priority_rules: rules,
running_jobs_by_type: running,
}))
}
/// GET /admin/resources/disk — disk-status med historikk.
pub async fn resource_disk(
State(state): State<AppState>,
_user: AuthUser,
) -> Result<Json<DiskOverview>, (StatusCode, Json<ErrorResponse>)> {
let current = crate::resources::latest_disk_status(&state.db)
.await
.map_err(|e| internal_error(&format!("DB-feil: {e}")))?;
let history = crate::resources::disk_status_history(&state.db, 60)
.await
.map_err(|e| internal_error(&format!("DB-feil: {e}")))?;
Ok(Json(DiskOverview { current, history }))
}
#[derive(Serialize)]
pub struct DiskOverview {
pub current: Option<crate::resources::DiskStatus>,
pub history: Vec<crate::resources::DiskStatusHistoryRow>,
}
/// POST /admin/resources/update_rule — oppdater en prioritetsregel.
pub async fn update_priority_rule(
State(state): State<AppState>,
_user: AuthUser,
Json(req): Json<UpdatePriorityRuleRequest>,
) -> Result<Json<JobActionResponse>, (StatusCode, Json<ErrorResponse>)> {
sqlx::query(
r#"INSERT INTO job_priority_rules (job_type, base_priority, livekit_priority_adj, cpu_weight, max_concurrent, timeout_seconds, block_during_livekit)
VALUES ($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT (job_type) DO UPDATE SET
base_priority = EXCLUDED.base_priority,
livekit_priority_adj = EXCLUDED.livekit_priority_adj,
cpu_weight = EXCLUDED.cpu_weight,
max_concurrent = EXCLUDED.max_concurrent,
timeout_seconds = EXCLUDED.timeout_seconds,
block_during_livekit = EXCLUDED.block_during_livekit"#,
)
.bind(&req.job_type)
.bind(req.base_priority)
.bind(req.livekit_priority_adj)
.bind(req.cpu_weight)
.bind(req.max_concurrent)
.bind(req.timeout_seconds)
.bind(req.block_during_livekit)
.execute(&state.db)
.await
.map_err(|e| internal_error(&format!("DB-feil: {e}")))?;
// Oppdater cache
if let Err(e) = state.priority_rules.refresh(&state.db).await {
tracing::warn!(error = %e, "Kunne ikke refreshe prioritetsregler-cache");
}
tracing::info!(
job_type = %req.job_type,
user = %_user.node_id,
"Admin: prioritetsregel oppdatert"
);
Ok(Json(JobActionResponse { success: true }))
}
#[derive(Deserialize)]
pub struct UpdatePriorityRuleRequest {
pub job_type: String,
pub base_priority: i16,
pub livekit_priority_adj: i16,
pub cpu_weight: i16,
pub max_concurrent: i16,
pub timeout_seconds: i32,
pub block_during_livekit: bool,
}
// ============================================================================= // =============================================================================
// Tester // Tester
// ============================================================================= // =============================================================================

View file

@ -2,12 +2,16 @@
// //
// Enkel polling-loop med SELECT ... FOR UPDATE SKIP LOCKED. // Enkel polling-loop med SELECT ... FOR UPDATE SKIP LOCKED.
// Dispatching til handler-funksjoner basert på job_type. // Dispatching til handler-funksjoner basert på job_type.
// Ressursstyring: semaphore for concurrency, prioritetsregler fra PG,
// LiveKit-bevisst resource governor (oppgave 15.5).
// //
// Ref: docs/infra/jobbkø.md // Ref: docs/infra/jobbkø.md
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use serde::Serialize; use serde::Serialize;
use sqlx::PgPool; use sqlx::PgPool;
use std::sync::Arc;
use tokio::sync::Semaphore;
use uuid::Uuid; use uuid::Uuid;
use crate::agent; use crate::agent;
@ -16,11 +20,16 @@ use crate::audio;
use crate::cas::CasStore; use crate::cas::CasStore;
use crate::maintenance::MaintenanceState; use crate::maintenance::MaintenanceState;
use crate::publishing; use crate::publishing;
use crate::resources::{self, PriorityRules};
use crate::stdb::StdbClient; use crate::stdb::StdbClient;
use crate::summarize; use crate::summarize;
use crate::transcribe; use crate::transcribe;
use crate::tts; use crate::tts;
/// Maks total CPU-vekt som kan kjøre samtidig.
/// Standard: 8 (passer for 8 vCPU der Whisper=5 + et lett kall=1+1+1).
const MAX_TOTAL_WEIGHT: i16 = 8;
/// Rad fra job_queue-tabellen. /// Rad fra job_queue-tabellen.
#[derive(sqlx::FromRow, Debug)] #[derive(sqlx::FromRow, Debug)]
pub struct JobRow { pub struct JobRow {
@ -366,14 +375,35 @@ pub async fn cancel_job(db: &PgPool, job_id: Uuid) -> Result<bool, sqlx::Error>
/// Starter worker-loopen som poller job_queue. /// Starter worker-loopen som poller job_queue.
/// Kjører som en bakgrunnsoppgave i tokio. /// Kjører som en bakgrunnsoppgave i tokio.
/// ///
/// Ressursstyring (oppgave 15.5):
/// - Concurrency-kontroll via semaphore (maks 3 samtidige jobber)
/// - Prioritetsregler fra job_priority_rules-tabellen
/// - LiveKit-bevisst resource governor: nedprioriterer/blokkerer tunge
/// jobber når LiveKit-rom er aktive
/// - CPU-vektbasert kapasitetskontroll (MAX_TOTAL_WEIGHT)
///
/// Respekterer vedlikeholdsmodus: når `maintenance.is_active()` er true, /// Respekterer vedlikeholdsmodus: når `maintenance.is_active()` er true,
/// slutter workeren å dequeue nye jobber (kjørende jobber fullføres). /// slutter workeren å dequeue nye jobber (kjørende jobber fullføres).
pub fn start_worker(db: PgPool, stdb: StdbClient, cas: CasStore, maintenance: MaintenanceState) { pub fn start_worker(
db: PgPool,
stdb: StdbClient,
cas: CasStore,
maintenance: MaintenanceState,
priority_rules: PriorityRules,
) {
let whisper_url = std::env::var("WHISPER_URL") let whisper_url = std::env::var("WHISPER_URL")
.unwrap_or_else(|_| "http://faster-whisper:8000".to_string()); .unwrap_or_else(|_| "http://faster-whisper:8000".to_string());
// Semaphore for maks 3 samtidige jobber (doc: --max-concurrent 3)
let semaphore = Arc::new(Semaphore::new(3));
// Cache LiveKit-status med 10-sekunders TTL for å unngå
// å spørre PG på hver poll-iterasjon
let livekit_cache: Arc<tokio::sync::RwLock<(bool, std::time::Instant)>> =
Arc::new(tokio::sync::RwLock::new((false, std::time::Instant::now())));
tokio::spawn(async move { tokio::spawn(async move {
tracing::info!("Jobbkø-worker startet (poll-intervall: 2s)"); tracing::info!("Jobbkø-worker startet (poll: 2s, max-concurrent: 3, max-weight: {MAX_TOTAL_WEIGHT})");
loop { loop {
// Sjekk vedlikeholdsmodus — ikke dequeue nye jobber // Sjekk vedlikeholdsmodus — ikke dequeue nye jobber
@ -383,36 +413,173 @@ pub fn start_worker(db: PgPool, stdb: StdbClient, cas: CasStore, maintenance: Ma
continue; continue;
} }
// Sjekk om vi har ledig kapasitet (semaphore)
let permit = match semaphore.clone().try_acquire_owned() {
Ok(p) => p,
Err(_) => {
// Alle 3 slots er opptatt — vent litt
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
continue;
}
};
// Sjekk total CPU-vekt
let current_weight = resources::total_running_weight(&db, &priority_rules).await;
if current_weight >= MAX_TOTAL_WEIGHT {
drop(permit);
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
continue;
}
// Oppdater LiveKit-cache (TTL: 10 sekunder)
let livekit_active = {
let cache = livekit_cache.read().await;
if cache.1.elapsed() < std::time::Duration::from_secs(10) {
cache.0
} else {
drop(cache);
let active = resources::has_active_livekit_rooms(&db).await;
let mut cache = livekit_cache.write().await;
*cache = (active, std::time::Instant::now());
active
}
};
match dequeue(&db).await { match dequeue(&db).await {
Ok(Some(job)) => { Ok(Some(job)) => {
// Sjekk prioritetsregler for denne jobbtypen
let rule = priority_rules.get(&job.job_type).await;
let decision = resources::evaluate_job(&rule, livekit_active);
// Sjekk om jobben skal utsettes (LiveKit-blokkering)
if decision.should_defer {
tracing::info!(
job_id = %job.id,
job_type = %job.job_type,
"Jobb utsatt — blokkert under aktiv LiveKit-sesjon"
);
// Sett tilbake til pending med kort delay
let _ = sqlx::query(
"UPDATE job_queue SET status = 'pending', started_at = NULL, attempts = attempts - 1, scheduled_for = now() + interval '30 seconds' WHERE id = $1"
)
.bind(job.id)
.execute(&db)
.await;
drop(permit);
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
continue;
}
// Sjekk max_concurrent per type
if rule.max_concurrent > 0 {
let running = resources::count_running_of_type(&db, &job.job_type)
.await
.unwrap_or(0);
// running inkluderer den vi nettopp satte til 'running',
// så sjekk mot max_concurrent
if running > rule.max_concurrent as i64 {
tracing::debug!(
job_type = %job.job_type,
running = running,
max = rule.max_concurrent,
"Max concurrent nådd for jobbtype — utsetter"
);
let _ = sqlx::query(
"UPDATE job_queue SET status = 'pending', started_at = NULL, attempts = attempts - 1, scheduled_for = now() + interval '5 seconds' WHERE id = $1"
)
.bind(job.id)
.execute(&db)
.await;
drop(permit);
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
continue;
}
}
// Sjekk at ny jobb ikke sprenger vektgrensen
if current_weight + rule.cpu_weight > MAX_TOTAL_WEIGHT {
tracing::debug!(
job_type = %job.job_type,
weight = rule.cpu_weight,
current = current_weight,
max = MAX_TOTAL_WEIGHT,
"CPU-vektgrense nådd — utsetter tung jobb"
);
let _ = sqlx::query(
"UPDATE job_queue SET status = 'pending', started_at = NULL, attempts = attempts - 1, scheduled_for = now() + interval '5 seconds' WHERE id = $1"
)
.bind(job.id)
.execute(&db)
.await;
drop(permit);
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
continue;
}
tracing::info!( tracing::info!(
job_id = %job.id, job_id = %job.id,
job_type = %job.job_type, job_type = %job.job_type,
attempt = job.attempts, attempt = job.attempts,
weight = rule.cpu_weight,
livekit = livekit_active,
"Behandler jobb" "Behandler jobb"
); );
match dispatch(&job, &db, &stdb, &cas, &whisper_url).await { // Kjør jobben i en egen tokio-task (frigjør poll-loopen)
Ok(result) => { let db2 = db.clone();
if let Err(e) = complete_job(&db, job.id, result).await { let stdb2 = stdb.clone();
let cas2 = cas.clone();
let whisper_url2 = whisper_url.clone();
let timeout_secs = if rule.timeout_seconds > 0 {
rule.timeout_seconds as u64
} else {
600 // 10 min default
};
tokio::spawn(async move {
// Hold semaphore-permit til jobben er ferdig
let _permit = permit;
let result = tokio::time::timeout(
std::time::Duration::from_secs(timeout_secs),
dispatch(&job, &db2, &stdb2, &cas2, &whisper_url2),
)
.await;
match result {
Ok(Ok(res)) => {
if let Err(e) = complete_job(&db2, job.id, res).await {
tracing::error!(job_id = %job.id, error = %e, "Kunne ikke markere jobb som fullført"); tracing::error!(job_id = %job.id, error = %e, "Kunne ikke markere jobb som fullført");
} else { } else {
tracing::info!(job_id = %job.id, "Jobb fullført"); tracing::info!(job_id = %job.id, "Jobb fullført");
} }
} }
Err(err) => { Ok(Err(err)) => {
tracing::error!(job_id = %job.id, error = %err, "Jobb feilet"); tracing::error!(job_id = %job.id, error = %err, "Jobb feilet");
if let Err(e) = fail_job(&db, &job, &err).await { if let Err(e) = fail_job(&db2, &job, &err).await {
tracing::error!(job_id = %job.id, error = %e, "Kunne ikke markere jobb som feilet"); tracing::error!(job_id = %job.id, error = %e, "Kunne ikke markere jobb som feilet");
} }
} }
Err(_) => {
let msg = format!("Jobb tidsavbrutt etter {timeout_secs}s");
tracing::error!(job_id = %job.id, "{msg}");
if let Err(e) = fail_job(&db2, &job, &msg).await {
tracing::error!(job_id = %job.id, error = %e, "Kunne ikke markere tidsavbrutt jobb");
} }
} }
}
});
// Ikke vent — poll umiddelbart for neste jobb
continue;
}
Ok(None) => { Ok(None) => {
drop(permit);
// Ingen ventende jobber — vent før neste poll // Ingen ventende jobber — vent før neste poll
tokio::time::sleep(std::time::Duration::from_secs(2)).await; tokio::time::sleep(std::time::Duration::from_secs(2)).await;
} }
Err(e) => { Err(e) => {
drop(permit);
tracing::error!(error = %e, "Feil ved polling av jobbkø"); tracing::error!(error = %e, "Feil ved polling av jobbkø");
tokio::time::sleep(std::time::Duration::from_secs(5)).await; tokio::time::sleep(std::time::Duration::from_secs(5)).await;
} }

View file

@ -12,6 +12,7 @@ pub mod maintenance;
pub mod pruning; pub mod pruning;
mod queries; mod queries;
pub mod publishing; pub mod publishing;
pub mod resources;
mod rss; mod rss;
mod serving; mod serving;
mod stdb; mod stdb;
@ -41,6 +42,7 @@ pub struct AppState {
pub index_cache: publishing::IndexCache, pub index_cache: publishing::IndexCache,
pub dynamic_page_cache: publishing::DynamicPageCache, pub dynamic_page_cache: publishing::DynamicPageCache,
pub maintenance: maintenance::MaintenanceState, pub maintenance: maintenance::MaintenanceState,
pub priority_rules: resources::PriorityRules,
} }
#[derive(Serialize)] #[derive(Serialize)]
@ -141,12 +143,20 @@ async fn main() {
// Vedlikeholdstilstand (oppgave 15.2) // Vedlikeholdstilstand (oppgave 15.2)
let maintenance = maintenance::MaintenanceState::new(); let maintenance = maintenance::MaintenanceState::new();
// Start jobbkø-worker i bakgrunnen // Last prioritetsregler fra PG (oppgave 15.5)
jobs::start_worker(db.clone(), stdb.clone(), cas.clone(), maintenance.clone()); let priority_rules = resources::PriorityRules::load(&db)
.await
.expect("Kunne ikke laste prioritetsregler fra PG");
// Start jobbkø-worker i bakgrunnen (med ressursstyring, oppgave 15.5)
jobs::start_worker(db.clone(), stdb.clone(), cas.clone(), maintenance.clone(), priority_rules.clone());
// Start periodisk CAS-pruning i bakgrunnen // Start periodisk CAS-pruning i bakgrunnen
pruning::start_pruning_loop(db.clone(), cas.clone()); pruning::start_pruning_loop(db.clone(), cas.clone());
// Start disk-overvåking i bakgrunnen (oppgave 15.5)
resources::start_disk_monitor(db.clone());
// Start planlagt publisering-scheduler i bakgrunnen // Start planlagt publisering-scheduler i bakgrunnen
publishing::start_publish_scheduler(db.clone()); publishing::start_publish_scheduler(db.clone());
@ -155,7 +165,7 @@ async fn main() {
let index_cache = publishing::new_index_cache(); let index_cache = publishing::new_index_cache();
let dynamic_page_cache = publishing::new_dynamic_page_cache(); let dynamic_page_cache = publishing::new_dynamic_page_cache();
let state = AppState { db, jwks, stdb, cas, index_cache, dynamic_page_cache, maintenance }; let state = AppState { db, jwks, stdb, cas, index_cache, dynamic_page_cache, maintenance, priority_rules };
// Ruter: /health er offentlig, /me krever gyldig JWT // Ruter: /health er offentlig, /me krever gyldig JWT
let app = Router::new() let app = Router::new()
@ -204,6 +214,10 @@ async fn main() {
.route("/admin/jobs", get(intentions::list_jobs)) .route("/admin/jobs", get(intentions::list_jobs))
.route("/intentions/retry_job", post(intentions::retry_job)) .route("/intentions/retry_job", post(intentions::retry_job))
.route("/intentions/cancel_job", post(intentions::cancel_job)) .route("/intentions/cancel_job", post(intentions::cancel_job))
// Ressursstyring (oppgave 15.5)
.route("/admin/resources", get(intentions::resource_status))
.route("/admin/resources/disk", get(intentions::resource_disk))
.route("/admin/resources/update_rule", post(intentions::update_priority_rule))
// AI Gateway-konfigurasjon (oppgave 15.4) // AI Gateway-konfigurasjon (oppgave 15.4)
.route("/admin/ai", get(ai_admin::ai_overview)) .route("/admin/ai", get(ai_admin::ai_overview))
.route("/admin/ai/usage", get(ai_admin::ai_usage)) .route("/admin/ai/usage", get(ai_admin::ai_usage))

View file

@ -0,0 +1,443 @@
// Ressursstyring — prioritetsregler, ressurs-governor og disk-overvåking.
//
// Tre hoveddeler:
// 1. PriorityRules: leser job_priority_rules fra PG, brukes av jobbkø-workeren
// til å justere prioritet og concurrency basert på jobbtype og LiveKit-status.
// 2. ResourceGovernor: sjekker om LiveKit-rom er aktive og tilpasser
// worker-oppførsel (blokkering, nedprioritering, Whisper-trådbegrensning).
// 3. DiskMonitor: periodisk sjekk av diskbruk med varsling ved terskeloverskridelse.
//
// Ref: docs/infra/jobbkø.md § 4.4, oppgave 15.5
use serde::Serialize;
use sqlx::PgPool;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
// =============================================================================
// Prioritetsregler
// =============================================================================
/// En rad fra job_priority_rules-tabellen.
#[derive(sqlx::FromRow, Debug, Clone, Serialize)]
pub struct PriorityRule {
pub job_type: String,
pub base_priority: i16,
pub livekit_priority_adj: i16,
pub cpu_weight: i16,
pub max_concurrent: i16,
pub timeout_seconds: i32,
pub block_during_livekit: bool,
}
/// Cache av prioritetsregler — lastes fra PG ved oppstart og kan refreshes.
#[derive(Clone)]
pub struct PriorityRules {
rules: Arc<RwLock<HashMap<String, PriorityRule>>>,
}
impl PriorityRules {
pub async fn load(db: &PgPool) -> Result<Self, sqlx::Error> {
let rows = sqlx::query_as::<_, PriorityRule>(
"SELECT job_type, base_priority, livekit_priority_adj, cpu_weight, max_concurrent, timeout_seconds, block_during_livekit FROM job_priority_rules"
)
.fetch_all(db)
.await?;
let mut map = HashMap::new();
for rule in rows {
map.insert(rule.job_type.clone(), rule);
}
tracing::info!(rules = map.len(), "Prioritetsregler lastet");
Ok(Self {
rules: Arc::new(RwLock::new(map)),
})
}
/// Hent regelen for en jobbtype. Returnerer default hvis ikke funnet.
pub async fn get(&self, job_type: &str) -> PriorityRule {
let rules = self.rules.read().await;
rules.get(job_type).cloned().unwrap_or(PriorityRule {
job_type: job_type.to_string(),
base_priority: 5,
livekit_priority_adj: 0,
cpu_weight: 1,
max_concurrent: 0,
timeout_seconds: 0,
block_during_livekit: false,
})
}
/// Hent alle regler (for admin-API).
pub async fn all(&self) -> Vec<PriorityRule> {
let rules = self.rules.read().await;
rules.values().cloned().collect()
}
/// Oppdater cache fra PG.
pub async fn refresh(&self, db: &PgPool) -> Result<(), sqlx::Error> {
let rows = sqlx::query_as::<_, PriorityRule>(
"SELECT job_type, base_priority, livekit_priority_adj, cpu_weight, max_concurrent, timeout_seconds, block_during_livekit FROM job_priority_rules"
)
.fetch_all(db)
.await?;
let mut map = HashMap::new();
for rule in rows {
map.insert(rule.job_type.clone(), rule);
}
let mut rules = self.rules.write().await;
*rules = map;
Ok(())
}
}
// =============================================================================
// Ressurs-governor (LiveKit-bevisst)
// =============================================================================
/// Sjekker om det er aktive LiveKit-rom ved å spørre kommunikasjonsnoder
/// med pågående lyd-sesjoner. Vi sjekker edges tabellen for aktive
/// member_of-edges til kommunikasjonsnoder som har livekit_room i metadata.
///
/// Enkel tilnærming: sjekk om noen kommunikasjonsnoder har
/// metadata.livekit_active = true.
pub async fn has_active_livekit_rooms(db: &PgPool) -> bool {
// Sjekk om det finnes kommunikasjonsnoder med aktiv LiveKit-sesjon.
// Kommunikasjonsnoder som er "open" med livekit_active i metadata
// indikerer en pågående lydøkt.
let result = sqlx::query_scalar::<_, i64>(
r#"SELECT COUNT(*) FROM nodes
WHERE node_kind = 'communication'
AND metadata->>'livekit_active' = 'true'"#,
)
.fetch_one(db)
.await;
match result {
Ok(count) => {
if count > 0 {
tracing::debug!(active_rooms = count, "Aktive LiveKit-rom funnet");
}
count > 0
}
Err(e) => {
tracing::warn!(error = %e, "Kunne ikke sjekke LiveKit-status — antar ingen aktive rom");
false
}
}
}
/// Sjekk om en jobb av gitt type skal blokkeres/nedprioriteres.
pub struct GovernorDecision {
/// Justert prioritet (kan være lavere enn base under LiveKit-last)
pub effective_priority: i16,
/// Om jobben skal utsettes (blokkert pga LiveKit)
pub should_defer: bool,
}
/// Evaluer en jobbtype gitt gjeldende LiveKit-status.
pub fn evaluate_job(rule: &PriorityRule, livekit_active: bool) -> GovernorDecision {
if livekit_active && rule.block_during_livekit {
return GovernorDecision {
effective_priority: 0,
should_defer: true,
};
}
let adj = if livekit_active {
rule.livekit_priority_adj
} else {
0
};
GovernorDecision {
effective_priority: (rule.base_priority + adj).max(0),
should_defer: false,
}
}
/// Tell kjørende jobber av en gitt type.
pub async fn count_running_of_type(db: &PgPool, job_type: &str) -> Result<i64, sqlx::Error> {
sqlx::query_scalar::<_, i64>(
"SELECT COUNT(*) FROM job_queue WHERE status = 'running' AND job_type = $1",
)
.bind(job_type)
.fetch_one(db)
.await
}
/// Total CPU-vekt av alle kjørende jobber.
pub async fn total_running_weight(db: &PgPool, rules: &PriorityRules) -> i16 {
let running_types = sqlx::query_scalar::<_, String>(
"SELECT job_type FROM job_queue WHERE status = 'running'",
)
.fetch_all(db)
.await
.unwrap_or_default();
let mut total: i16 = 0;
for jt in &running_types {
let rule = rules.get(jt).await;
total = total.saturating_add(rule.cpu_weight);
}
total
}
// =============================================================================
// Disk-overvåking
// =============================================================================
/// Disk-status for et monteringspunkt.
#[derive(Debug, Clone, Serialize)]
pub struct DiskStatus {
pub mount_point: String,
pub total_bytes: u64,
pub used_bytes: u64,
pub available_bytes: u64,
pub usage_percent: f32,
pub alert_level: Option<String>,
}
/// Hent diskbruk for et gitt monteringspunkt via statvfs.
pub fn check_disk_usage(path: &str) -> Result<DiskStatus, String> {
// Bruk nix::sys::statvfs for å hente diskinfo
use std::ffi::CString;
let c_path =
CString::new(path).map_err(|e| format!("Ugyldig path for statvfs: {e}"))?;
// Safe wrapper around libc::statvfs
let mut stat: libc::statvfs = unsafe { std::mem::zeroed() };
let ret = unsafe { libc::statvfs(c_path.as_ptr(), &mut stat) };
if ret != 0 {
return Err(format!(
"statvfs feilet for {path}: {}",
std::io::Error::last_os_error()
));
}
let block_size = stat.f_frsize as u64;
let total_bytes = stat.f_blocks as u64 * block_size;
let available_bytes = stat.f_bavail as u64 * block_size;
let used_bytes = total_bytes - (stat.f_bfree as u64 * block_size);
let usage_percent = if total_bytes > 0 {
(used_bytes as f64 / total_bytes as f64 * 100.0) as f32
} else {
0.0
};
let alert_level = if usage_percent >= 95.0 {
Some("emergency".to_string())
} else if usage_percent >= 90.0 {
Some("critical".to_string())
} else if usage_percent >= 85.0 {
Some("warning".to_string())
} else {
None
};
Ok(DiskStatus {
mount_point: path.to_string(),
total_bytes,
used_bytes,
available_bytes,
usage_percent,
alert_level,
})
}
/// Logg diskstatus til disk_status_log-tabellen.
async fn log_disk_status(db: &PgPool, status: &DiskStatus) -> Result<(), sqlx::Error> {
sqlx::query(
r#"INSERT INTO disk_status_log (mount_point, total_bytes, used_bytes, available_bytes, usage_percent, alert_level)
VALUES ($1, $2, $3, $4, $5, $6)"#,
)
.bind(&status.mount_point)
.bind(status.total_bytes as i64)
.bind(status.used_bytes as i64)
.bind(status.available_bytes as i64)
.bind(status.usage_percent)
.bind(&status.alert_level)
.execute(db)
.await?;
Ok(())
}
/// Rydd opp gamle disk_status_log-rader (behold siste 1000).
async fn cleanup_disk_log(db: &PgPool) -> Result<(), sqlx::Error> {
sqlx::query(
r#"DELETE FROM disk_status_log
WHERE id NOT IN (
SELECT id FROM disk_status_log ORDER BY checked_at DESC LIMIT 1000
)"#,
)
.execute(db)
.await?;
Ok(())
}
/// Starter disk-overvåkingsloop som sjekker diskbruk hvert 60. sekund.
/// Logger til PG og logger advarsel ved terskeloverskridelse.
pub fn start_disk_monitor(db: PgPool) {
// CAS-rot og data-partisjon
let cas_root = std::env::var("CAS_ROOT")
.unwrap_or_else(|_| "/srv/synops/media/cas".to_string());
// Sjekk roten av partisjon der CAS ligger
let check_path = if std::path::Path::new(&cas_root).exists() {
cas_root.clone()
} else {
"/".to_string()
};
tokio::spawn(async move {
// Vent litt etter oppstart
tokio::time::sleep(std::time::Duration::from_secs(30)).await;
tracing::info!(path = %check_path, "Disk-overvåking startet (intervall: 60s)");
let mut check_count: u64 = 0;
loop {
match check_disk_usage(&check_path) {
Ok(status) => {
// Logg til PG
if let Err(e) = log_disk_status(&db, &status).await {
tracing::warn!(error = %e, "Kunne ikke logge diskstatus");
}
// Varsle ved terskeloverskridelse
match status.alert_level.as_deref() {
Some("emergency") => {
tracing::error!(
usage = format!("{:.1}%", status.usage_percent),
available_mb = status.available_bytes / 1_048_576,
"DISK NØDVARSEL: >95% brukt — umiddelbar handling påkrevd!"
);
}
Some("critical") => {
tracing::warn!(
usage = format!("{:.1}%", status.usage_percent),
available_mb = status.available_bytes / 1_048_576,
"Disk kritisk: >90% brukt — aggressiv pruning anbefalt"
);
}
Some("warning") => {
tracing::warn!(
usage = format!("{:.1}%", status.usage_percent),
available_mb = status.available_bytes / 1_048_576,
"Disk advarsel: >85% brukt"
);
}
_ => {
// Normal — logg bare hvert 10. minutt (hvert 10. sjekk)
if check_count % 10 == 0 {
tracing::debug!(
usage = format!("{:.1}%", status.usage_percent),
available_gb = status.available_bytes / 1_073_741_824,
"Diskstatus OK"
);
}
}
}
}
Err(e) => {
tracing::error!(error = %e, "Feil ved disksjekk");
}
}
check_count += 1;
// Rydd opp gamle logg-rader annenhver time (120 sjekker à 60s)
if check_count % 120 == 0 {
if let Err(e) = cleanup_disk_log(&db).await {
tracing::warn!(error = %e, "Kunne ikke rydde disk_status_log");
}
}
tokio::time::sleep(std::time::Duration::from_secs(60)).await;
}
});
}
// =============================================================================
// Admin-API responser
// =============================================================================
/// Samlet ressursstatus for admin-panelet.
#[derive(Serialize)]
pub struct ResourceStatus {
pub disk: DiskStatus,
pub livekit_active: bool,
pub total_running_weight: i16,
pub max_weight: i16,
pub priority_rules: Vec<PriorityRule>,
pub running_jobs_by_type: Vec<RunningJobCount>,
}
#[derive(Serialize, sqlx::FromRow)]
pub struct RunningJobCount {
pub job_type: String,
pub count: i64,
}
/// Hent antall kjørende jobber per type.
pub async fn running_jobs_by_type(db: &PgPool) -> Result<Vec<RunningJobCount>, sqlx::Error> {
sqlx::query_as::<_, RunningJobCount>(
"SELECT job_type, COUNT(*) as count FROM job_queue WHERE status = 'running' GROUP BY job_type",
)
.fetch_all(db)
.await
}
/// Hent siste disk-status fra loggen.
pub async fn latest_disk_status(db: &PgPool) -> Result<Option<DiskStatus>, sqlx::Error> {
let row = sqlx::query_as::<_, DiskStatusRow>(
"SELECT mount_point, total_bytes, used_bytes, available_bytes, usage_percent, alert_level FROM disk_status_log ORDER BY checked_at DESC LIMIT 1",
)
.fetch_optional(db)
.await?;
Ok(row.map(|r| DiskStatus {
mount_point: r.mount_point,
total_bytes: r.total_bytes as u64,
used_bytes: r.used_bytes as u64,
available_bytes: r.available_bytes as u64,
usage_percent: r.usage_percent,
alert_level: r.alert_level,
}))
}
#[derive(sqlx::FromRow)]
struct DiskStatusRow {
mount_point: String,
total_bytes: i64,
used_bytes: i64,
available_bytes: i64,
usage_percent: f32,
alert_level: Option<String>,
}
/// Hent disk-status-historikk (siste N målinger).
pub async fn disk_status_history(
db: &PgPool,
limit: i64,
) -> Result<Vec<DiskStatusHistoryRow>, sqlx::Error> {
sqlx::query_as::<_, DiskStatusHistoryRow>(
"SELECT checked_at, usage_percent, alert_level FROM disk_status_log ORDER BY checked_at DESC LIMIT $1",
)
.bind(limit)
.fetch_all(db)
.await
}
#[derive(sqlx::FromRow, Serialize)]
pub struct DiskStatusHistoryRow {
pub checked_at: chrono::DateTime<chrono::Utc>,
pub usage_percent: f32,
pub alert_level: Option<String>,
}

View file

@ -0,0 +1,46 @@
-- Oppgave 15.5: Ressursstyring — prioritetsregler, ressursgrenser, disk-status
--
-- Oppretter tabell for jobbtype-prioritetsregler som brukes av workeren
-- til å styre concurrency, prioritet og LiveKit-tilpasning.
CREATE TABLE job_priority_rules (
job_type TEXT PRIMARY KEY,
base_priority SMALLINT NOT NULL DEFAULT 5,
-- Hvor mye prioriteten reduseres når LiveKit-rom er aktive
livekit_priority_adj SMALLINT NOT NULL DEFAULT 0,
-- Ressursvekt (1=lett HTTP-kall, 5=tung CPU som Whisper)
cpu_weight SMALLINT NOT NULL DEFAULT 1,
-- Maks samtidige jobber av denne typen (0 = ubegrenset)
max_concurrent SMALLINT NOT NULL DEFAULT 0,
-- Timeout i sekunder (0 = bruk default)
timeout_seconds INT NOT NULL DEFAULT 0,
-- Om jobben skal blokkeres helt under aktive LiveKit-sesjoner
block_during_livekit BOOLEAN NOT NULL DEFAULT false
);
-- Seed med kjente jobbtyper og fornuftige defaults
INSERT INTO job_priority_rules (job_type, base_priority, livekit_priority_adj, cpu_weight, max_concurrent, timeout_seconds, block_during_livekit)
VALUES
('whisper_transcribe', 5, -3, 5, 1, 600, false),
('agent_respond', 10, 0, 1, 2, 120, false),
('suggest_edges', 3, 0, 1, 2, 120, false),
('summarize_communication', 5, 0, 1, 1, 120, false),
('tts_generate', 5, -1, 2, 1, 300, false),
('audio_process', 5, -2, 3, 1, 600, false),
('render_article', 7, 0, 1, 2, 60, false),
('render_index', 7, 0, 1, 2, 60, false);
-- Tabell for disk-status-snapshots (siste verdier brukes for varsling)
CREATE TABLE disk_status_log (
id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
checked_at TIMESTAMPTZ NOT NULL DEFAULT now(),
mount_point TEXT NOT NULL DEFAULT '/',
total_bytes BIGINT NOT NULL,
used_bytes BIGINT NOT NULL,
available_bytes BIGINT NOT NULL,
usage_percent REAL NOT NULL,
alert_level TEXT -- NULL=ok, 'warning'=85%+, 'critical'=90%+, 'emergency'=95%+
);
-- Hold bare siste 1000 målinger
CREATE INDEX idx_disk_status_latest ON disk_status_log (checked_at DESC);

View file

@ -167,8 +167,7 @@ Uavhengige faser kan fortsatt plukkes.
- [x] 15.2 Graceful shutdown: admin setter vedlikeholdstidspunkt → nedtelling i frontend → nye LiveKit-rom blokkeres → jobbkø stopper → vent på aktive jobber → restart. Vis aktive sesjoner før bekreftelse. - [x] 15.2 Graceful shutdown: admin setter vedlikeholdstidspunkt → nedtelling i frontend → nye LiveKit-rom blokkeres → jobbkø stopper → vent på aktive jobber → restart. Vis aktive sesjoner før bekreftelse.
- [x] 15.3 Jobbkø-oversikt: admin-UI for aktive, ventende og feilede jobber. Filtrer på type/samling/status. Manuell retry og avbryt. - [x] 15.3 Jobbkø-oversikt: admin-UI for aktive, ventende og feilede jobber. Filtrer på type/samling/status. Manuell retry og avbryt.
- [x] 15.4 AI Gateway-konfigurasjon: admin-UI for modelloversikt, API-nøkler (kryptert), ruting-regler per jobbtype, fallback-kjeder, forbruksoversikt per samling. Ref: `docs/infra/ai_gateway.md`. - [x] 15.4 AI Gateway-konfigurasjon: admin-UI for modelloversikt, API-nøkler (kryptert), ruting-regler per jobbtype, fallback-kjeder, forbruksoversikt per samling. Ref: `docs/infra/ai_gateway.md`.
- [~] 15.5 Ressursstyring: prioritetsregler mellom jobbtyper, ressursgrenser per worker, ressurs-governor for automatisk nedprioritering under aktive LiveKit-sesjoner, disk-status med varsling. - [x] 15.5 Ressursstyring: prioritetsregler mellom jobbtyper, ressursgrenser per worker, ressurs-governor for automatisk nedprioritering under aktive LiveKit-sesjoner, disk-status med varsling.
> Påbegynt: 2026-03-18T03:53
- [ ] 15.6 Serverhelse-dashboard: tjeneste-status (PG, STDB, Caddy, Authentik, LiteLLM, Whisper, LiveKit), metrikker (CPU, minne, disk), backup-status, logg-tilgang. - [ ] 15.6 Serverhelse-dashboard: tjeneste-status (PG, STDB, Caddy, Authentik, LiteLLM, Whisper, LiveKit), metrikker (CPU, minne, disk), backup-status, logg-tilgang.
- [ ] 15.7 Ressursforbruk-logging: `resource_usage_log`-tabell i PG. Maskinrommet logger AI-tokens (inn/ut, modellnivå), Whisper-tid (sek), TTS-tegn, CAS-lagring (bytes), LiveKit-tid (deltaker-min). Båndbredde via Caddy-logg-parsing. Ref: `docs/features/ressursforbruk.md`. - [ ] 15.7 Ressursforbruk-logging: `resource_usage_log`-tabell i PG. Maskinrommet logger AI-tokens (inn/ut, modellnivå), Whisper-tid (sek), TTS-tegn, CAS-lagring (bytes), LiveKit-tid (deltaker-min). Båndbredde via Caddy-logg-parsing. Ref: `docs/features/ressursforbruk.md`.
- [ ] 15.8 Forbruksoversikt i admin: aggregert visning per samling, per ressurstype, per tidsperiode. Drill-down til jobbtype og modellnivå. - [ ] 15.8 Forbruksoversikt i admin: aggregert visning per samling, per ressurstype, per tidsperiode. Drill-down til jobbtype og modellnivå.