// Ressursstyring — prioritetsregler, ressurs-governor og disk-overvåking. // // Tre hoveddeler: // 1. PriorityRules: leser job_priority_rules fra PG, brukes av jobbkø-workeren // til å justere prioritet og concurrency basert på jobbtype og LiveKit-status. // 2. ResourceGovernor: sjekker om LiveKit-rom er aktive og tilpasser // worker-oppførsel (blokkering, nedprioritering, Whisper-trådbegrensning). // 3. DiskMonitor: periodisk sjekk av diskbruk med varsling ved terskeloverskridelse. // // Ref: docs/infra/jobbkø.md § 4.4, oppgave 15.5 use serde::Serialize; use sqlx::PgPool; use std::collections::HashMap; use std::sync::Arc; use tokio::sync::RwLock; // ============================================================================= // Prioritetsregler // ============================================================================= /// En rad fra job_priority_rules-tabellen. #[derive(sqlx::FromRow, Debug, Clone, Serialize)] pub struct PriorityRule { pub job_type: String, pub base_priority: i16, pub livekit_priority_adj: i16, pub cpu_weight: i16, pub max_concurrent: i16, pub timeout_seconds: i32, pub block_during_livekit: bool, } /// Cache av prioritetsregler — lastes fra PG ved oppstart og kan refreshes. #[derive(Clone)] pub struct PriorityRules { rules: Arc>>, } impl PriorityRules { pub async fn load(db: &PgPool) -> Result { let rows = sqlx::query_as::<_, PriorityRule>( "SELECT job_type, base_priority, livekit_priority_adj, cpu_weight, max_concurrent, timeout_seconds, block_during_livekit FROM job_priority_rules" ) .fetch_all(db) .await?; let mut map = HashMap::new(); for rule in rows { map.insert(rule.job_type.clone(), rule); } tracing::info!(rules = map.len(), "Prioritetsregler lastet"); Ok(Self { rules: Arc::new(RwLock::new(map)), }) } /// Hent regelen for en jobbtype. Returnerer default hvis ikke funnet. pub async fn get(&self, job_type: &str) -> PriorityRule { let rules = self.rules.read().await; rules.get(job_type).cloned().unwrap_or(PriorityRule { job_type: job_type.to_string(), base_priority: 5, livekit_priority_adj: 0, cpu_weight: 1, max_concurrent: 0, timeout_seconds: 0, block_during_livekit: false, }) } /// Hent alle regler (for admin-API). pub async fn all(&self) -> Vec { let rules = self.rules.read().await; rules.values().cloned().collect() } /// Oppdater cache fra PG. pub async fn refresh(&self, db: &PgPool) -> Result<(), sqlx::Error> { let rows = sqlx::query_as::<_, PriorityRule>( "SELECT job_type, base_priority, livekit_priority_adj, cpu_weight, max_concurrent, timeout_seconds, block_during_livekit FROM job_priority_rules" ) .fetch_all(db) .await?; let mut map = HashMap::new(); for rule in rows { map.insert(rule.job_type.clone(), rule); } let mut rules = self.rules.write().await; *rules = map; Ok(()) } } // ============================================================================= // Ressurs-governor (LiveKit-bevisst) // ============================================================================= /// Sjekker om det er aktive LiveKit-rom ved å spørre kommunikasjonsnoder /// med pågående lyd-sesjoner. Vi sjekker edges tabellen for aktive /// member_of-edges til kommunikasjonsnoder som har livekit_room i metadata. /// /// Enkel tilnærming: sjekk om noen kommunikasjonsnoder har /// metadata.livekit_active = true. pub async fn has_active_livekit_rooms(db: &PgPool) -> bool { // Sjekk om det finnes kommunikasjonsnoder med aktiv LiveKit-sesjon. // Kommunikasjonsnoder som er "open" med livekit_active i metadata // indikerer en pågående lydøkt. let result = sqlx::query_scalar::<_, i64>( r#"SELECT COUNT(*) FROM nodes WHERE node_kind = 'communication' AND metadata->>'livekit_active' = 'true'"#, ) .fetch_one(db) .await; match result { Ok(count) => { if count > 0 { tracing::debug!(active_rooms = count, "Aktive LiveKit-rom funnet"); } count > 0 } Err(e) => { tracing::warn!(error = %e, "Kunne ikke sjekke LiveKit-status — antar ingen aktive rom"); false } } } /// Sjekk om en jobb av gitt type skal blokkeres/nedprioriteres. pub struct GovernorDecision { /// Justert prioritet (kan være lavere enn base under LiveKit-last) pub effective_priority: i16, /// Om jobben skal utsettes (blokkert pga LiveKit) pub should_defer: bool, } /// Evaluer en jobbtype gitt gjeldende LiveKit-status. pub fn evaluate_job(rule: &PriorityRule, livekit_active: bool) -> GovernorDecision { if livekit_active && rule.block_during_livekit { return GovernorDecision { effective_priority: 0, should_defer: true, }; } let adj = if livekit_active { rule.livekit_priority_adj } else { 0 }; GovernorDecision { effective_priority: (rule.base_priority + adj).max(0), should_defer: false, } } /// Tell kjørende jobber av en gitt type. pub async fn count_running_of_type(db: &PgPool, job_type: &str) -> Result { sqlx::query_scalar::<_, i64>( "SELECT COUNT(*) FROM job_queue WHERE status = 'running' AND job_type = $1", ) .bind(job_type) .fetch_one(db) .await } /// Total CPU-vekt av alle kjørende jobber. pub async fn total_running_weight(db: &PgPool, rules: &PriorityRules) -> i16 { let running_types = sqlx::query_scalar::<_, String>( "SELECT job_type FROM job_queue WHERE status = 'running'", ) .fetch_all(db) .await .unwrap_or_default(); let mut total: i16 = 0; for jt in &running_types { let rule = rules.get(jt).await; total = total.saturating_add(rule.cpu_weight); } total } // ============================================================================= // Disk-overvåking // ============================================================================= /// Disk-status for et monteringspunkt. #[derive(Debug, Clone, Serialize)] pub struct DiskStatus { pub mount_point: String, pub total_bytes: u64, pub used_bytes: u64, pub available_bytes: u64, pub usage_percent: f32, pub alert_level: Option, } /// Hent diskbruk for et gitt monteringspunkt via statvfs. pub fn check_disk_usage(path: &str) -> Result { // Bruk nix::sys::statvfs for å hente diskinfo use std::ffi::CString; let c_path = CString::new(path).map_err(|e| format!("Ugyldig path for statvfs: {e}"))?; // Safe wrapper around libc::statvfs let mut stat: libc::statvfs = unsafe { std::mem::zeroed() }; let ret = unsafe { libc::statvfs(c_path.as_ptr(), &mut stat) }; if ret != 0 { return Err(format!( "statvfs feilet for {path}: {}", std::io::Error::last_os_error() )); } let block_size = stat.f_frsize as u64; let total_bytes = stat.f_blocks as u64 * block_size; let available_bytes = stat.f_bavail as u64 * block_size; let used_bytes = total_bytes - (stat.f_bfree as u64 * block_size); let usage_percent = if total_bytes > 0 { (used_bytes as f64 / total_bytes as f64 * 100.0) as f32 } else { 0.0 }; let alert_level = if usage_percent >= 95.0 { Some("emergency".to_string()) } else if usage_percent >= 90.0 { Some("critical".to_string()) } else if usage_percent >= 85.0 { Some("warning".to_string()) } else { None }; Ok(DiskStatus { mount_point: path.to_string(), total_bytes, used_bytes, available_bytes, usage_percent, alert_level, }) } /// Logg diskstatus til disk_status_log-tabellen. async fn log_disk_status(db: &PgPool, status: &DiskStatus) -> Result<(), sqlx::Error> { sqlx::query( r#"INSERT INTO disk_status_log (mount_point, total_bytes, used_bytes, available_bytes, usage_percent, alert_level) VALUES ($1, $2, $3, $4, $5, $6)"#, ) .bind(&status.mount_point) .bind(status.total_bytes as i64) .bind(status.used_bytes as i64) .bind(status.available_bytes as i64) .bind(status.usage_percent) .bind(&status.alert_level) .execute(db) .await?; Ok(()) } /// Rydd opp gamle disk_status_log-rader (behold siste 1000). async fn cleanup_disk_log(db: &PgPool) -> Result<(), sqlx::Error> { sqlx::query( r#"DELETE FROM disk_status_log WHERE id NOT IN ( SELECT id FROM disk_status_log ORDER BY checked_at DESC LIMIT 1000 )"#, ) .execute(db) .await?; Ok(()) } /// Starter disk-overvåkingsloop som sjekker diskbruk hvert 60. sekund. /// Logger til PG og logger advarsel ved terskeloverskridelse. pub fn start_disk_monitor(db: PgPool) { // CAS-rot og data-partisjon let cas_root = std::env::var("CAS_ROOT") .unwrap_or_else(|_| "/srv/synops/media/cas".to_string()); // Sjekk roten av partisjon der CAS ligger let check_path = if std::path::Path::new(&cas_root).exists() { cas_root.clone() } else { "/".to_string() }; tokio::spawn(async move { // Vent litt etter oppstart tokio::time::sleep(std::time::Duration::from_secs(30)).await; tracing::info!(path = %check_path, "Disk-overvåking startet (intervall: 60s)"); let mut check_count: u64 = 0; loop { match check_disk_usage(&check_path) { Ok(status) => { // Logg til PG if let Err(e) = log_disk_status(&db, &status).await { tracing::warn!(error = %e, "Kunne ikke logge diskstatus"); } // Varsle ved terskeloverskridelse match status.alert_level.as_deref() { Some("emergency") => { tracing::error!( usage = format!("{:.1}%", status.usage_percent), available_mb = status.available_bytes / 1_048_576, "DISK NØDVARSEL: >95% brukt — umiddelbar handling påkrevd!" ); } Some("critical") => { tracing::warn!( usage = format!("{:.1}%", status.usage_percent), available_mb = status.available_bytes / 1_048_576, "Disk kritisk: >90% brukt — aggressiv pruning anbefalt" ); } Some("warning") => { tracing::warn!( usage = format!("{:.1}%", status.usage_percent), available_mb = status.available_bytes / 1_048_576, "Disk advarsel: >85% brukt" ); } _ => { // Normal — logg bare hvert 10. minutt (hvert 10. sjekk) if check_count % 10 == 0 { tracing::debug!( usage = format!("{:.1}%", status.usage_percent), available_gb = status.available_bytes / 1_073_741_824, "Diskstatus OK" ); } } } } Err(e) => { tracing::error!(error = %e, "Feil ved disksjekk"); } } check_count += 1; // Rydd opp gamle logg-rader annenhver time (120 sjekker à 60s) if check_count % 120 == 0 { if let Err(e) = cleanup_disk_log(&db).await { tracing::warn!(error = %e, "Kunne ikke rydde disk_status_log"); } } tokio::time::sleep(std::time::Duration::from_secs(60)).await; } }); } // ============================================================================= // Admin-API responser // ============================================================================= /// Samlet ressursstatus for admin-panelet. #[derive(Serialize)] pub struct ResourceStatus { pub disk: DiskStatus, pub livekit_active: bool, pub total_running_weight: i16, pub max_weight: i16, pub priority_rules: Vec, pub running_jobs_by_type: Vec, } #[derive(Serialize, sqlx::FromRow)] pub struct RunningJobCount { pub job_type: String, pub count: i64, } /// Hent antall kjørende jobber per type. pub async fn running_jobs_by_type(db: &PgPool) -> Result, sqlx::Error> { sqlx::query_as::<_, RunningJobCount>( "SELECT job_type, COUNT(*) as count FROM job_queue WHERE status = 'running' GROUP BY job_type", ) .fetch_all(db) .await } /// Hent siste disk-status fra loggen. pub async fn latest_disk_status(db: &PgPool) -> Result, sqlx::Error> { let row = sqlx::query_as::<_, DiskStatusRow>( "SELECT mount_point, total_bytes, used_bytes, available_bytes, usage_percent, alert_level FROM disk_status_log ORDER BY checked_at DESC LIMIT 1", ) .fetch_optional(db) .await?; Ok(row.map(|r| DiskStatus { mount_point: r.mount_point, total_bytes: r.total_bytes as u64, used_bytes: r.used_bytes as u64, available_bytes: r.available_bytes as u64, usage_percent: r.usage_percent, alert_level: r.alert_level, })) } #[derive(sqlx::FromRow)] struct DiskStatusRow { mount_point: String, total_bytes: i64, used_bytes: i64, available_bytes: i64, usage_percent: f32, alert_level: Option, } /// Hent disk-status-historikk (siste N målinger). pub async fn disk_status_history( db: &PgPool, limit: i64, ) -> Result, sqlx::Error> { sqlx::query_as::<_, DiskStatusHistoryRow>( "SELECT checked_at, usage_percent, alert_level FROM disk_status_log ORDER BY checked_at DESC LIMIT $1", ) .bind(limit) .fetch_all(db) .await } #[derive(sqlx::FromRow, Serialize)] pub struct DiskStatusHistoryRow { pub checked_at: chrono::DateTime, pub usage_percent: f32, pub alert_level: Option, }