Implementerer prioritetsregler, ressursgrenser og LiveKit-bevisst resource governor for jobbkø-workeren, pluss disk-overvåking med varsling. Hovedkomponenter: 1. Prioritetsregler (job_priority_rules-tabell): - Konfigurerbar base_priority, cpu_weight, max_concurrent per jobbtype - LiveKit-justering: livekit_priority_adj og block_during_livekit - Timeout per jobbtype - Admin-API for å endre regler uten restart 2. Ressurs-governor i worker-loopen: - Semaphore: maks 3 samtidige jobber - CPU-vektgrense: total vekt maks 8 (Whisper=5, render=1, etc.) - Per-type concurrency-grense - LiveKit-status sjekkes med 10s cache-TTL - Jobber utsettes/nedprioriteres ved aktive LiveKit-rom - Individuell timeout per jobb (default 600s) - Jobber kjøres i egne tokio-tasks (parallell dispatch) 3. Disk-overvåking: - Sjekker diskbruk hvert 60. sekund via statvfs - Terskler: 85% warning, 90% critical, 95% emergency - Logger til disk_status_log (siste 1000 målinger beholdes) - Admin-API: GET /admin/resources/disk med historikk 4. Admin-API: - GET /admin/resources — samlet ressursstatus - GET /admin/resources/disk — diskstatus med historikk - POST /admin/resources/update_rule — oppdater prioritetsregel Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
443 lines
15 KiB
Rust
443 lines
15 KiB
Rust
// Ressursstyring — prioritetsregler, ressurs-governor og disk-overvåking.
|
|
//
|
|
// Tre hoveddeler:
|
|
// 1. PriorityRules: leser job_priority_rules fra PG, brukes av jobbkø-workeren
|
|
// til å justere prioritet og concurrency basert på jobbtype og LiveKit-status.
|
|
// 2. ResourceGovernor: sjekker om LiveKit-rom er aktive og tilpasser
|
|
// worker-oppførsel (blokkering, nedprioritering, Whisper-trådbegrensning).
|
|
// 3. DiskMonitor: periodisk sjekk av diskbruk med varsling ved terskeloverskridelse.
|
|
//
|
|
// Ref: docs/infra/jobbkø.md § 4.4, oppgave 15.5
|
|
|
|
use serde::Serialize;
|
|
use sqlx::PgPool;
|
|
use std::collections::HashMap;
|
|
use std::sync::Arc;
|
|
use tokio::sync::RwLock;
|
|
|
|
// =============================================================================
|
|
// Prioritetsregler
|
|
// =============================================================================
|
|
|
|
/// En rad fra job_priority_rules-tabellen.
|
|
#[derive(sqlx::FromRow, Debug, Clone, Serialize)]
|
|
pub struct PriorityRule {
|
|
pub job_type: String,
|
|
pub base_priority: i16,
|
|
pub livekit_priority_adj: i16,
|
|
pub cpu_weight: i16,
|
|
pub max_concurrent: i16,
|
|
pub timeout_seconds: i32,
|
|
pub block_during_livekit: bool,
|
|
}
|
|
|
|
/// Cache av prioritetsregler — lastes fra PG ved oppstart og kan refreshes.
|
|
#[derive(Clone)]
|
|
pub struct PriorityRules {
|
|
rules: Arc<RwLock<HashMap<String, PriorityRule>>>,
|
|
}
|
|
|
|
impl PriorityRules {
|
|
pub async fn load(db: &PgPool) -> Result<Self, sqlx::Error> {
|
|
let rows = sqlx::query_as::<_, PriorityRule>(
|
|
"SELECT job_type, base_priority, livekit_priority_adj, cpu_weight, max_concurrent, timeout_seconds, block_during_livekit FROM job_priority_rules"
|
|
)
|
|
.fetch_all(db)
|
|
.await?;
|
|
|
|
let mut map = HashMap::new();
|
|
for rule in rows {
|
|
map.insert(rule.job_type.clone(), rule);
|
|
}
|
|
|
|
tracing::info!(rules = map.len(), "Prioritetsregler lastet");
|
|
Ok(Self {
|
|
rules: Arc::new(RwLock::new(map)),
|
|
})
|
|
}
|
|
|
|
/// Hent regelen for en jobbtype. Returnerer default hvis ikke funnet.
|
|
pub async fn get(&self, job_type: &str) -> PriorityRule {
|
|
let rules = self.rules.read().await;
|
|
rules.get(job_type).cloned().unwrap_or(PriorityRule {
|
|
job_type: job_type.to_string(),
|
|
base_priority: 5,
|
|
livekit_priority_adj: 0,
|
|
cpu_weight: 1,
|
|
max_concurrent: 0,
|
|
timeout_seconds: 0,
|
|
block_during_livekit: false,
|
|
})
|
|
}
|
|
|
|
/// Hent alle regler (for admin-API).
|
|
pub async fn all(&self) -> Vec<PriorityRule> {
|
|
let rules = self.rules.read().await;
|
|
rules.values().cloned().collect()
|
|
}
|
|
|
|
/// Oppdater cache fra PG.
|
|
pub async fn refresh(&self, db: &PgPool) -> Result<(), sqlx::Error> {
|
|
let rows = sqlx::query_as::<_, PriorityRule>(
|
|
"SELECT job_type, base_priority, livekit_priority_adj, cpu_weight, max_concurrent, timeout_seconds, block_during_livekit FROM job_priority_rules"
|
|
)
|
|
.fetch_all(db)
|
|
.await?;
|
|
|
|
let mut map = HashMap::new();
|
|
for rule in rows {
|
|
map.insert(rule.job_type.clone(), rule);
|
|
}
|
|
|
|
let mut rules = self.rules.write().await;
|
|
*rules = map;
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
// =============================================================================
|
|
// Ressurs-governor (LiveKit-bevisst)
|
|
// =============================================================================
|
|
|
|
/// Sjekker om det er aktive LiveKit-rom ved å spørre kommunikasjonsnoder
|
|
/// med pågående lyd-sesjoner. Vi sjekker edges tabellen for aktive
|
|
/// member_of-edges til kommunikasjonsnoder som har livekit_room i metadata.
|
|
///
|
|
/// Enkel tilnærming: sjekk om noen kommunikasjonsnoder har
|
|
/// metadata.livekit_active = true.
|
|
pub async fn has_active_livekit_rooms(db: &PgPool) -> bool {
|
|
// Sjekk om det finnes kommunikasjonsnoder med aktiv LiveKit-sesjon.
|
|
// Kommunikasjonsnoder som er "open" med livekit_active i metadata
|
|
// indikerer en pågående lydøkt.
|
|
let result = sqlx::query_scalar::<_, i64>(
|
|
r#"SELECT COUNT(*) FROM nodes
|
|
WHERE node_kind = 'communication'
|
|
AND metadata->>'livekit_active' = 'true'"#,
|
|
)
|
|
.fetch_one(db)
|
|
.await;
|
|
|
|
match result {
|
|
Ok(count) => {
|
|
if count > 0 {
|
|
tracing::debug!(active_rooms = count, "Aktive LiveKit-rom funnet");
|
|
}
|
|
count > 0
|
|
}
|
|
Err(e) => {
|
|
tracing::warn!(error = %e, "Kunne ikke sjekke LiveKit-status — antar ingen aktive rom");
|
|
false
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Sjekk om en jobb av gitt type skal blokkeres/nedprioriteres.
|
|
pub struct GovernorDecision {
|
|
/// Justert prioritet (kan være lavere enn base under LiveKit-last)
|
|
pub effective_priority: i16,
|
|
/// Om jobben skal utsettes (blokkert pga LiveKit)
|
|
pub should_defer: bool,
|
|
}
|
|
|
|
/// Evaluer en jobbtype gitt gjeldende LiveKit-status.
|
|
pub fn evaluate_job(rule: &PriorityRule, livekit_active: bool) -> GovernorDecision {
|
|
if livekit_active && rule.block_during_livekit {
|
|
return GovernorDecision {
|
|
effective_priority: 0,
|
|
should_defer: true,
|
|
};
|
|
}
|
|
|
|
let adj = if livekit_active {
|
|
rule.livekit_priority_adj
|
|
} else {
|
|
0
|
|
};
|
|
|
|
GovernorDecision {
|
|
effective_priority: (rule.base_priority + adj).max(0),
|
|
should_defer: false,
|
|
}
|
|
}
|
|
|
|
/// Tell kjørende jobber av en gitt type.
|
|
pub async fn count_running_of_type(db: &PgPool, job_type: &str) -> Result<i64, sqlx::Error> {
|
|
sqlx::query_scalar::<_, i64>(
|
|
"SELECT COUNT(*) FROM job_queue WHERE status = 'running' AND job_type = $1",
|
|
)
|
|
.bind(job_type)
|
|
.fetch_one(db)
|
|
.await
|
|
}
|
|
|
|
/// Total CPU-vekt av alle kjørende jobber.
|
|
pub async fn total_running_weight(db: &PgPool, rules: &PriorityRules) -> i16 {
|
|
let running_types = sqlx::query_scalar::<_, String>(
|
|
"SELECT job_type FROM job_queue WHERE status = 'running'",
|
|
)
|
|
.fetch_all(db)
|
|
.await
|
|
.unwrap_or_default();
|
|
|
|
let mut total: i16 = 0;
|
|
for jt in &running_types {
|
|
let rule = rules.get(jt).await;
|
|
total = total.saturating_add(rule.cpu_weight);
|
|
}
|
|
total
|
|
}
|
|
|
|
// =============================================================================
|
|
// Disk-overvåking
|
|
// =============================================================================
|
|
|
|
/// Disk-status for et monteringspunkt.
|
|
#[derive(Debug, Clone, Serialize)]
|
|
pub struct DiskStatus {
|
|
pub mount_point: String,
|
|
pub total_bytes: u64,
|
|
pub used_bytes: u64,
|
|
pub available_bytes: u64,
|
|
pub usage_percent: f32,
|
|
pub alert_level: Option<String>,
|
|
}
|
|
|
|
/// Hent diskbruk for et gitt monteringspunkt via statvfs.
|
|
pub fn check_disk_usage(path: &str) -> Result<DiskStatus, String> {
|
|
// Bruk nix::sys::statvfs for å hente diskinfo
|
|
use std::ffi::CString;
|
|
|
|
let c_path =
|
|
CString::new(path).map_err(|e| format!("Ugyldig path for statvfs: {e}"))?;
|
|
|
|
// Safe wrapper around libc::statvfs
|
|
let mut stat: libc::statvfs = unsafe { std::mem::zeroed() };
|
|
let ret = unsafe { libc::statvfs(c_path.as_ptr(), &mut stat) };
|
|
|
|
if ret != 0 {
|
|
return Err(format!(
|
|
"statvfs feilet for {path}: {}",
|
|
std::io::Error::last_os_error()
|
|
));
|
|
}
|
|
|
|
let block_size = stat.f_frsize as u64;
|
|
let total_bytes = stat.f_blocks as u64 * block_size;
|
|
let available_bytes = stat.f_bavail as u64 * block_size;
|
|
let used_bytes = total_bytes - (stat.f_bfree as u64 * block_size);
|
|
|
|
let usage_percent = if total_bytes > 0 {
|
|
(used_bytes as f64 / total_bytes as f64 * 100.0) as f32
|
|
} else {
|
|
0.0
|
|
};
|
|
|
|
let alert_level = if usage_percent >= 95.0 {
|
|
Some("emergency".to_string())
|
|
} else if usage_percent >= 90.0 {
|
|
Some("critical".to_string())
|
|
} else if usage_percent >= 85.0 {
|
|
Some("warning".to_string())
|
|
} else {
|
|
None
|
|
};
|
|
|
|
Ok(DiskStatus {
|
|
mount_point: path.to_string(),
|
|
total_bytes,
|
|
used_bytes,
|
|
available_bytes,
|
|
usage_percent,
|
|
alert_level,
|
|
})
|
|
}
|
|
|
|
/// Logg diskstatus til disk_status_log-tabellen.
|
|
async fn log_disk_status(db: &PgPool, status: &DiskStatus) -> Result<(), sqlx::Error> {
|
|
sqlx::query(
|
|
r#"INSERT INTO disk_status_log (mount_point, total_bytes, used_bytes, available_bytes, usage_percent, alert_level)
|
|
VALUES ($1, $2, $3, $4, $5, $6)"#,
|
|
)
|
|
.bind(&status.mount_point)
|
|
.bind(status.total_bytes as i64)
|
|
.bind(status.used_bytes as i64)
|
|
.bind(status.available_bytes as i64)
|
|
.bind(status.usage_percent)
|
|
.bind(&status.alert_level)
|
|
.execute(db)
|
|
.await?;
|
|
Ok(())
|
|
}
|
|
|
|
/// Rydd opp gamle disk_status_log-rader (behold siste 1000).
|
|
async fn cleanup_disk_log(db: &PgPool) -> Result<(), sqlx::Error> {
|
|
sqlx::query(
|
|
r#"DELETE FROM disk_status_log
|
|
WHERE id NOT IN (
|
|
SELECT id FROM disk_status_log ORDER BY checked_at DESC LIMIT 1000
|
|
)"#,
|
|
)
|
|
.execute(db)
|
|
.await?;
|
|
Ok(())
|
|
}
|
|
|
|
/// Starter disk-overvåkingsloop som sjekker diskbruk hvert 60. sekund.
|
|
/// Logger til PG og logger advarsel ved terskeloverskridelse.
|
|
pub fn start_disk_monitor(db: PgPool) {
|
|
// CAS-rot og data-partisjon
|
|
let cas_root = std::env::var("CAS_ROOT")
|
|
.unwrap_or_else(|_| "/srv/synops/media/cas".to_string());
|
|
// Sjekk roten av partisjon der CAS ligger
|
|
let check_path = if std::path::Path::new(&cas_root).exists() {
|
|
cas_root.clone()
|
|
} else {
|
|
"/".to_string()
|
|
};
|
|
|
|
tokio::spawn(async move {
|
|
// Vent litt etter oppstart
|
|
tokio::time::sleep(std::time::Duration::from_secs(30)).await;
|
|
tracing::info!(path = %check_path, "Disk-overvåking startet (intervall: 60s)");
|
|
|
|
let mut check_count: u64 = 0;
|
|
|
|
loop {
|
|
match check_disk_usage(&check_path) {
|
|
Ok(status) => {
|
|
// Logg til PG
|
|
if let Err(e) = log_disk_status(&db, &status).await {
|
|
tracing::warn!(error = %e, "Kunne ikke logge diskstatus");
|
|
}
|
|
|
|
// Varsle ved terskeloverskridelse
|
|
match status.alert_level.as_deref() {
|
|
Some("emergency") => {
|
|
tracing::error!(
|
|
usage = format!("{:.1}%", status.usage_percent),
|
|
available_mb = status.available_bytes / 1_048_576,
|
|
"DISK NØDVARSEL: >95% brukt — umiddelbar handling påkrevd!"
|
|
);
|
|
}
|
|
Some("critical") => {
|
|
tracing::warn!(
|
|
usage = format!("{:.1}%", status.usage_percent),
|
|
available_mb = status.available_bytes / 1_048_576,
|
|
"Disk kritisk: >90% brukt — aggressiv pruning anbefalt"
|
|
);
|
|
}
|
|
Some("warning") => {
|
|
tracing::warn!(
|
|
usage = format!("{:.1}%", status.usage_percent),
|
|
available_mb = status.available_bytes / 1_048_576,
|
|
"Disk advarsel: >85% brukt"
|
|
);
|
|
}
|
|
_ => {
|
|
// Normal — logg bare hvert 10. minutt (hvert 10. sjekk)
|
|
if check_count % 10 == 0 {
|
|
tracing::debug!(
|
|
usage = format!("{:.1}%", status.usage_percent),
|
|
available_gb = status.available_bytes / 1_073_741_824,
|
|
"Diskstatus OK"
|
|
);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
Err(e) => {
|
|
tracing::error!(error = %e, "Feil ved disksjekk");
|
|
}
|
|
}
|
|
|
|
check_count += 1;
|
|
|
|
// Rydd opp gamle logg-rader annenhver time (120 sjekker à 60s)
|
|
if check_count % 120 == 0 {
|
|
if let Err(e) = cleanup_disk_log(&db).await {
|
|
tracing::warn!(error = %e, "Kunne ikke rydde disk_status_log");
|
|
}
|
|
}
|
|
|
|
tokio::time::sleep(std::time::Duration::from_secs(60)).await;
|
|
}
|
|
});
|
|
}
|
|
|
|
// =============================================================================
|
|
// Admin-API responser
|
|
// =============================================================================
|
|
|
|
/// Samlet ressursstatus for admin-panelet.
|
|
#[derive(Serialize)]
|
|
pub struct ResourceStatus {
|
|
pub disk: DiskStatus,
|
|
pub livekit_active: bool,
|
|
pub total_running_weight: i16,
|
|
pub max_weight: i16,
|
|
pub priority_rules: Vec<PriorityRule>,
|
|
pub running_jobs_by_type: Vec<RunningJobCount>,
|
|
}
|
|
|
|
#[derive(Serialize, sqlx::FromRow)]
|
|
pub struct RunningJobCount {
|
|
pub job_type: String,
|
|
pub count: i64,
|
|
}
|
|
|
|
/// Hent antall kjørende jobber per type.
|
|
pub async fn running_jobs_by_type(db: &PgPool) -> Result<Vec<RunningJobCount>, sqlx::Error> {
|
|
sqlx::query_as::<_, RunningJobCount>(
|
|
"SELECT job_type, COUNT(*) as count FROM job_queue WHERE status = 'running' GROUP BY job_type",
|
|
)
|
|
.fetch_all(db)
|
|
.await
|
|
}
|
|
|
|
/// Hent siste disk-status fra loggen.
|
|
pub async fn latest_disk_status(db: &PgPool) -> Result<Option<DiskStatus>, sqlx::Error> {
|
|
let row = sqlx::query_as::<_, DiskStatusRow>(
|
|
"SELECT mount_point, total_bytes, used_bytes, available_bytes, usage_percent, alert_level FROM disk_status_log ORDER BY checked_at DESC LIMIT 1",
|
|
)
|
|
.fetch_optional(db)
|
|
.await?;
|
|
|
|
Ok(row.map(|r| DiskStatus {
|
|
mount_point: r.mount_point,
|
|
total_bytes: r.total_bytes as u64,
|
|
used_bytes: r.used_bytes as u64,
|
|
available_bytes: r.available_bytes as u64,
|
|
usage_percent: r.usage_percent,
|
|
alert_level: r.alert_level,
|
|
}))
|
|
}
|
|
|
|
#[derive(sqlx::FromRow)]
|
|
struct DiskStatusRow {
|
|
mount_point: String,
|
|
total_bytes: i64,
|
|
used_bytes: i64,
|
|
available_bytes: i64,
|
|
usage_percent: f32,
|
|
alert_level: Option<String>,
|
|
}
|
|
|
|
/// Hent disk-status-historikk (siste N målinger).
|
|
pub async fn disk_status_history(
|
|
db: &PgPool,
|
|
limit: i64,
|
|
) -> Result<Vec<DiskStatusHistoryRow>, sqlx::Error> {
|
|
sqlx::query_as::<_, DiskStatusHistoryRow>(
|
|
"SELECT checked_at, usage_percent, alert_level FROM disk_status_log ORDER BY checked_at DESC LIMIT $1",
|
|
)
|
|
.bind(limit)
|
|
.fetch_all(db)
|
|
.await
|
|
}
|
|
|
|
#[derive(sqlx::FromRow, Serialize)]
|
|
pub struct DiskStatusHistoryRow {
|
|
pub checked_at: chrono::DateTime<chrono::Utc>,
|
|
pub usage_percent: f32,
|
|
pub alert_level: Option<String>,
|
|
}
|