synops/maskinrommet/src/resources.rs
vegard b64f217637 Fullfører oppgave 15.5: Ressursstyring for jobbkø
Implementerer prioritetsregler, ressursgrenser og LiveKit-bevisst
resource governor for jobbkø-workeren, pluss disk-overvåking med varsling.

Hovedkomponenter:

1. Prioritetsregler (job_priority_rules-tabell):
   - Konfigurerbar base_priority, cpu_weight, max_concurrent per jobbtype
   - LiveKit-justering: livekit_priority_adj og block_during_livekit
   - Timeout per jobbtype
   - Admin-API for å endre regler uten restart

2. Ressurs-governor i worker-loopen:
   - Semaphore: maks 3 samtidige jobber
   - CPU-vektgrense: total vekt maks 8 (Whisper=5, render=1, etc.)
   - Per-type concurrency-grense
   - LiveKit-status sjekkes med 10s cache-TTL
   - Jobber utsettes/nedprioriteres ved aktive LiveKit-rom
   - Individuell timeout per jobb (default 600s)
   - Jobber kjøres i egne tokio-tasks (parallell dispatch)

3. Disk-overvåking:
   - Sjekker diskbruk hvert 60. sekund via statvfs
   - Terskler: 85% warning, 90% critical, 95% emergency
   - Logger til disk_status_log (siste 1000 målinger beholdes)
   - Admin-API: GET /admin/resources/disk med historikk

4. Admin-API:
   - GET /admin/resources — samlet ressursstatus
   - GET /admin/resources/disk — diskstatus med historikk
   - POST /admin/resources/update_rule — oppdater prioritetsregel

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-18 04:02:11 +00:00

443 lines
15 KiB
Rust

// Ressursstyring — prioritetsregler, ressurs-governor og disk-overvåking.
//
// Tre hoveddeler:
// 1. PriorityRules: leser job_priority_rules fra PG, brukes av jobbkø-workeren
// til å justere prioritet og concurrency basert på jobbtype og LiveKit-status.
// 2. ResourceGovernor: sjekker om LiveKit-rom er aktive og tilpasser
// worker-oppførsel (blokkering, nedprioritering, Whisper-trådbegrensning).
// 3. DiskMonitor: periodisk sjekk av diskbruk med varsling ved terskeloverskridelse.
//
// Ref: docs/infra/jobbkø.md § 4.4, oppgave 15.5
use serde::Serialize;
use sqlx::PgPool;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
// =============================================================================
// Prioritetsregler
// =============================================================================
/// En rad fra job_priority_rules-tabellen.
#[derive(sqlx::FromRow, Debug, Clone, Serialize)]
pub struct PriorityRule {
pub job_type: String,
pub base_priority: i16,
pub livekit_priority_adj: i16,
pub cpu_weight: i16,
pub max_concurrent: i16,
pub timeout_seconds: i32,
pub block_during_livekit: bool,
}
/// Cache av prioritetsregler — lastes fra PG ved oppstart og kan refreshes.
#[derive(Clone)]
pub struct PriorityRules {
rules: Arc<RwLock<HashMap<String, PriorityRule>>>,
}
impl PriorityRules {
pub async fn load(db: &PgPool) -> Result<Self, sqlx::Error> {
let rows = sqlx::query_as::<_, PriorityRule>(
"SELECT job_type, base_priority, livekit_priority_adj, cpu_weight, max_concurrent, timeout_seconds, block_during_livekit FROM job_priority_rules"
)
.fetch_all(db)
.await?;
let mut map = HashMap::new();
for rule in rows {
map.insert(rule.job_type.clone(), rule);
}
tracing::info!(rules = map.len(), "Prioritetsregler lastet");
Ok(Self {
rules: Arc::new(RwLock::new(map)),
})
}
/// Hent regelen for en jobbtype. Returnerer default hvis ikke funnet.
pub async fn get(&self, job_type: &str) -> PriorityRule {
let rules = self.rules.read().await;
rules.get(job_type).cloned().unwrap_or(PriorityRule {
job_type: job_type.to_string(),
base_priority: 5,
livekit_priority_adj: 0,
cpu_weight: 1,
max_concurrent: 0,
timeout_seconds: 0,
block_during_livekit: false,
})
}
/// Hent alle regler (for admin-API).
pub async fn all(&self) -> Vec<PriorityRule> {
let rules = self.rules.read().await;
rules.values().cloned().collect()
}
/// Oppdater cache fra PG.
pub async fn refresh(&self, db: &PgPool) -> Result<(), sqlx::Error> {
let rows = sqlx::query_as::<_, PriorityRule>(
"SELECT job_type, base_priority, livekit_priority_adj, cpu_weight, max_concurrent, timeout_seconds, block_during_livekit FROM job_priority_rules"
)
.fetch_all(db)
.await?;
let mut map = HashMap::new();
for rule in rows {
map.insert(rule.job_type.clone(), rule);
}
let mut rules = self.rules.write().await;
*rules = map;
Ok(())
}
}
// =============================================================================
// Ressurs-governor (LiveKit-bevisst)
// =============================================================================
/// Sjekker om det er aktive LiveKit-rom ved å spørre kommunikasjonsnoder
/// med pågående lyd-sesjoner. Vi sjekker edges tabellen for aktive
/// member_of-edges til kommunikasjonsnoder som har livekit_room i metadata.
///
/// Enkel tilnærming: sjekk om noen kommunikasjonsnoder har
/// metadata.livekit_active = true.
pub async fn has_active_livekit_rooms(db: &PgPool) -> bool {
// Sjekk om det finnes kommunikasjonsnoder med aktiv LiveKit-sesjon.
// Kommunikasjonsnoder som er "open" med livekit_active i metadata
// indikerer en pågående lydøkt.
let result = sqlx::query_scalar::<_, i64>(
r#"SELECT COUNT(*) FROM nodes
WHERE node_kind = 'communication'
AND metadata->>'livekit_active' = 'true'"#,
)
.fetch_one(db)
.await;
match result {
Ok(count) => {
if count > 0 {
tracing::debug!(active_rooms = count, "Aktive LiveKit-rom funnet");
}
count > 0
}
Err(e) => {
tracing::warn!(error = %e, "Kunne ikke sjekke LiveKit-status — antar ingen aktive rom");
false
}
}
}
/// Sjekk om en jobb av gitt type skal blokkeres/nedprioriteres.
pub struct GovernorDecision {
/// Justert prioritet (kan være lavere enn base under LiveKit-last)
pub effective_priority: i16,
/// Om jobben skal utsettes (blokkert pga LiveKit)
pub should_defer: bool,
}
/// Evaluer en jobbtype gitt gjeldende LiveKit-status.
pub fn evaluate_job(rule: &PriorityRule, livekit_active: bool) -> GovernorDecision {
if livekit_active && rule.block_during_livekit {
return GovernorDecision {
effective_priority: 0,
should_defer: true,
};
}
let adj = if livekit_active {
rule.livekit_priority_adj
} else {
0
};
GovernorDecision {
effective_priority: (rule.base_priority + adj).max(0),
should_defer: false,
}
}
/// Tell kjørende jobber av en gitt type.
pub async fn count_running_of_type(db: &PgPool, job_type: &str) -> Result<i64, sqlx::Error> {
sqlx::query_scalar::<_, i64>(
"SELECT COUNT(*) FROM job_queue WHERE status = 'running' AND job_type = $1",
)
.bind(job_type)
.fetch_one(db)
.await
}
/// Total CPU-vekt av alle kjørende jobber.
pub async fn total_running_weight(db: &PgPool, rules: &PriorityRules) -> i16 {
let running_types = sqlx::query_scalar::<_, String>(
"SELECT job_type FROM job_queue WHERE status = 'running'",
)
.fetch_all(db)
.await
.unwrap_or_default();
let mut total: i16 = 0;
for jt in &running_types {
let rule = rules.get(jt).await;
total = total.saturating_add(rule.cpu_weight);
}
total
}
// =============================================================================
// Disk-overvåking
// =============================================================================
/// Disk-status for et monteringspunkt.
#[derive(Debug, Clone, Serialize)]
pub struct DiskStatus {
pub mount_point: String,
pub total_bytes: u64,
pub used_bytes: u64,
pub available_bytes: u64,
pub usage_percent: f32,
pub alert_level: Option<String>,
}
/// Hent diskbruk for et gitt monteringspunkt via statvfs.
pub fn check_disk_usage(path: &str) -> Result<DiskStatus, String> {
// Bruk nix::sys::statvfs for å hente diskinfo
use std::ffi::CString;
let c_path =
CString::new(path).map_err(|e| format!("Ugyldig path for statvfs: {e}"))?;
// Safe wrapper around libc::statvfs
let mut stat: libc::statvfs = unsafe { std::mem::zeroed() };
let ret = unsafe { libc::statvfs(c_path.as_ptr(), &mut stat) };
if ret != 0 {
return Err(format!(
"statvfs feilet for {path}: {}",
std::io::Error::last_os_error()
));
}
let block_size = stat.f_frsize as u64;
let total_bytes = stat.f_blocks as u64 * block_size;
let available_bytes = stat.f_bavail as u64 * block_size;
let used_bytes = total_bytes - (stat.f_bfree as u64 * block_size);
let usage_percent = if total_bytes > 0 {
(used_bytes as f64 / total_bytes as f64 * 100.0) as f32
} else {
0.0
};
let alert_level = if usage_percent >= 95.0 {
Some("emergency".to_string())
} else if usage_percent >= 90.0 {
Some("critical".to_string())
} else if usage_percent >= 85.0 {
Some("warning".to_string())
} else {
None
};
Ok(DiskStatus {
mount_point: path.to_string(),
total_bytes,
used_bytes,
available_bytes,
usage_percent,
alert_level,
})
}
/// Logg diskstatus til disk_status_log-tabellen.
async fn log_disk_status(db: &PgPool, status: &DiskStatus) -> Result<(), sqlx::Error> {
sqlx::query(
r#"INSERT INTO disk_status_log (mount_point, total_bytes, used_bytes, available_bytes, usage_percent, alert_level)
VALUES ($1, $2, $3, $4, $5, $6)"#,
)
.bind(&status.mount_point)
.bind(status.total_bytes as i64)
.bind(status.used_bytes as i64)
.bind(status.available_bytes as i64)
.bind(status.usage_percent)
.bind(&status.alert_level)
.execute(db)
.await?;
Ok(())
}
/// Rydd opp gamle disk_status_log-rader (behold siste 1000).
async fn cleanup_disk_log(db: &PgPool) -> Result<(), sqlx::Error> {
sqlx::query(
r#"DELETE FROM disk_status_log
WHERE id NOT IN (
SELECT id FROM disk_status_log ORDER BY checked_at DESC LIMIT 1000
)"#,
)
.execute(db)
.await?;
Ok(())
}
/// Starter disk-overvåkingsloop som sjekker diskbruk hvert 60. sekund.
/// Logger til PG og logger advarsel ved terskeloverskridelse.
pub fn start_disk_monitor(db: PgPool) {
// CAS-rot og data-partisjon
let cas_root = std::env::var("CAS_ROOT")
.unwrap_or_else(|_| "/srv/synops/media/cas".to_string());
// Sjekk roten av partisjon der CAS ligger
let check_path = if std::path::Path::new(&cas_root).exists() {
cas_root.clone()
} else {
"/".to_string()
};
tokio::spawn(async move {
// Vent litt etter oppstart
tokio::time::sleep(std::time::Duration::from_secs(30)).await;
tracing::info!(path = %check_path, "Disk-overvåking startet (intervall: 60s)");
let mut check_count: u64 = 0;
loop {
match check_disk_usage(&check_path) {
Ok(status) => {
// Logg til PG
if let Err(e) = log_disk_status(&db, &status).await {
tracing::warn!(error = %e, "Kunne ikke logge diskstatus");
}
// Varsle ved terskeloverskridelse
match status.alert_level.as_deref() {
Some("emergency") => {
tracing::error!(
usage = format!("{:.1}%", status.usage_percent),
available_mb = status.available_bytes / 1_048_576,
"DISK NØDVARSEL: >95% brukt — umiddelbar handling påkrevd!"
);
}
Some("critical") => {
tracing::warn!(
usage = format!("{:.1}%", status.usage_percent),
available_mb = status.available_bytes / 1_048_576,
"Disk kritisk: >90% brukt — aggressiv pruning anbefalt"
);
}
Some("warning") => {
tracing::warn!(
usage = format!("{:.1}%", status.usage_percent),
available_mb = status.available_bytes / 1_048_576,
"Disk advarsel: >85% brukt"
);
}
_ => {
// Normal — logg bare hvert 10. minutt (hvert 10. sjekk)
if check_count % 10 == 0 {
tracing::debug!(
usage = format!("{:.1}%", status.usage_percent),
available_gb = status.available_bytes / 1_073_741_824,
"Diskstatus OK"
);
}
}
}
}
Err(e) => {
tracing::error!(error = %e, "Feil ved disksjekk");
}
}
check_count += 1;
// Rydd opp gamle logg-rader annenhver time (120 sjekker à 60s)
if check_count % 120 == 0 {
if let Err(e) = cleanup_disk_log(&db).await {
tracing::warn!(error = %e, "Kunne ikke rydde disk_status_log");
}
}
tokio::time::sleep(std::time::Duration::from_secs(60)).await;
}
});
}
// =============================================================================
// Admin-API responser
// =============================================================================
/// Samlet ressursstatus for admin-panelet.
#[derive(Serialize)]
pub struct ResourceStatus {
pub disk: DiskStatus,
pub livekit_active: bool,
pub total_running_weight: i16,
pub max_weight: i16,
pub priority_rules: Vec<PriorityRule>,
pub running_jobs_by_type: Vec<RunningJobCount>,
}
#[derive(Serialize, sqlx::FromRow)]
pub struct RunningJobCount {
pub job_type: String,
pub count: i64,
}
/// Hent antall kjørende jobber per type.
pub async fn running_jobs_by_type(db: &PgPool) -> Result<Vec<RunningJobCount>, sqlx::Error> {
sqlx::query_as::<_, RunningJobCount>(
"SELECT job_type, COUNT(*) as count FROM job_queue WHERE status = 'running' GROUP BY job_type",
)
.fetch_all(db)
.await
}
/// Hent siste disk-status fra loggen.
pub async fn latest_disk_status(db: &PgPool) -> Result<Option<DiskStatus>, sqlx::Error> {
let row = sqlx::query_as::<_, DiskStatusRow>(
"SELECT mount_point, total_bytes, used_bytes, available_bytes, usage_percent, alert_level FROM disk_status_log ORDER BY checked_at DESC LIMIT 1",
)
.fetch_optional(db)
.await?;
Ok(row.map(|r| DiskStatus {
mount_point: r.mount_point,
total_bytes: r.total_bytes as u64,
used_bytes: r.used_bytes as u64,
available_bytes: r.available_bytes as u64,
usage_percent: r.usage_percent,
alert_level: r.alert_level,
}))
}
#[derive(sqlx::FromRow)]
struct DiskStatusRow {
mount_point: String,
total_bytes: i64,
used_bytes: i64,
available_bytes: i64,
usage_percent: f32,
alert_level: Option<String>,
}
/// Hent disk-status-historikk (siste N målinger).
pub async fn disk_status_history(
db: &PgPool,
limit: i64,
) -> Result<Vec<DiskStatusHistoryRow>, sqlx::Error> {
sqlx::query_as::<_, DiskStatusHistoryRow>(
"SELECT checked_at, usage_percent, alert_level FROM disk_status_log ORDER BY checked_at DESC LIMIT $1",
)
.bind(limit)
.fetch_all(db)
.await
}
#[derive(sqlx::FromRow, Serialize)]
pub struct DiskStatusHistoryRow {
pub checked_at: chrono::DateTime<chrono::Utc>,
pub usage_percent: f32,
pub alert_level: Option<String>,
}