diff --git a/maskinrommet/src/cas.rs b/maskinrommet/src/cas.rs index a2461ce..2ab23fe 100644 --- a/maskinrommet/src/cas.rs +++ b/maskinrommet/src/cas.rs @@ -99,6 +99,65 @@ impl CasStore { hex::encode(hasher.finalize()) } + /// Slett en fil fra CAS. Returnerer antall bytes frigitt, eller 0 hvis filen ikke fantes. + pub async fn delete(&self, hash: &str) -> std::io::Result { + let path = self.path_for(hash); + match tokio::fs::metadata(&path).await { + Ok(meta) => { + let size = meta.len(); + tokio::fs::remove_file(&path).await?; + tracing::info!(hash = %hash, size = size, "Slettet fil fra CAS"); + Ok(size) + } + Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(0), + Err(e) => Err(e), + } + } + + /// Beregn total diskbruk for CAS-katalogen (bytes). + /// Bruker `du -sb` for ytelse — traverserer filsystemet direkte. + pub async fn disk_usage_bytes(&self) -> std::io::Result { + let output = tokio::process::Command::new("du") + .args(["-sb", self.root.to_str().unwrap_or(".")]) + .output() + .await?; + if !output.status.success() { + return Err(std::io::Error::new( + std::io::ErrorKind::Other, + "du-kommando feilet", + )); + } + let stdout = String::from_utf8_lossy(&output.stdout); + let bytes: u64 = stdout + .split_whitespace() + .next() + .and_then(|s| s.parse().ok()) + .unwrap_or(0); + Ok(bytes) + } + + /// Sjekk diskbruk-prosent for partisjonen CAS ligger på. + pub async fn disk_usage_percent(&self) -> std::io::Result { + let output = tokio::process::Command::new("df") + .args(["--output=pcent", self.root.to_str().unwrap_or("/")]) + .output() + .await?; + if !output.status.success() { + return Err(std::io::Error::new( + std::io::ErrorKind::Other, + "df-kommando feilet", + )); + } + let stdout = String::from_utf8_lossy(&output.stdout); + // Output: "Use%\n 42%\n" + let pct: f64 = stdout + .lines() + .nth(1) + .and_then(|line| line.trim().trim_end_matches('%').parse().ok()) + .unwrap_or(0.0); + Ok(pct) + } + /// Rot-katalogen for CAS. pub fn root(&self) -> &Path { &self.root diff --git a/maskinrommet/src/main.rs b/maskinrommet/src/main.rs index 88de6d4..57a8149 100644 --- a/maskinrommet/src/main.rs +++ b/maskinrommet/src/main.rs @@ -5,6 +5,7 @@ pub mod cas; mod intentions; pub mod jobs; pub mod livekit; +pub mod pruning; mod queries; mod serving; mod stdb; @@ -130,6 +131,9 @@ async fn main() { // Start jobbkø-worker i bakgrunnen jobs::start_worker(db.clone(), stdb.clone(), cas.clone()); + // Start periodisk CAS-pruning i bakgrunnen + pruning::start_pruning_loop(db.clone(), cas.clone()); + let state = AppState { db, jwks, stdb, cas }; // Ruter: /health er offentlig, /me krever gyldig JWT diff --git a/maskinrommet/src/pruning.rs b/maskinrommet/src/pruning.rs new file mode 100644 index 0000000..71e6705 --- /dev/null +++ b/maskinrommet/src/pruning.rs @@ -0,0 +1,465 @@ +//! Pruning-logikk for CAS-filer. +//! +//! TTL per modalitet, signaler som forlenger levetid, og disk-nødventil. +//! Ref: docs/retninger/maskinrommet.md § CAS og intelligent pruning +//! +//! ## TTL per modalitet +//! | Modalitet | Standard TTL | Begrunnelse | +//! |---------------|-------------|--------------------------------------| +//! | Tekst | Aldri | Billig, essensen av innhold | +//! | Transkripsjon | Aldri | Tekstrepresentasjon bevarer mening | +//! | Lyd | 30 dager | Transkripsjon bevarer innholdet | +//! | Bilde | 30 dager | Beskrivelse/metadata bevarer kontekst| +//! | Video | 14 dager | Dyrest, transkripsjon + thumbnail | +//! +//! ## Signaler som forlenger levetid +//! - Publishing-edge → behold for alltid +//! - Tilgang (last_accessed_at) innenfor TTL → forleng med ny TTL-periode +//! - Uredigert transkripsjon → utranskribert lyd beholdes +//! +//! ## Disk-nødventil +//! | Terskel | Handling | +//! |---------|----------------------------------------------------------| +//! | >85% | Slett generert innhold (TTS, thumbnails — regenererbart) | +//! | >90% | Aggressiv pruning for alle samlinger | +//! | >95% | Kritisk: alt uten publishing-edge slettes. Tekst beholdes| + +use chrono::{DateTime, Utc}; +use sqlx::PgPool; +use uuid::Uuid; + +use crate::cas::CasStore; + +/// Konfigurasjon for pruning. +#[derive(Debug, Clone)] +pub struct PruningConfig { + /// Standard TTL for lyd i dager. + pub audio_ttl_days: i64, + /// Standard TTL for bilder i dager. + pub image_ttl_days: i64, + /// Standard TTL for video i dager. + pub video_ttl_days: i64, + /// Disk-terskel for å slette generert innhold (prosent). + pub disk_warning_pct: f64, + /// Disk-terskel for aggressiv pruning (prosent). + pub disk_aggressive_pct: f64, + /// Disk-terskel for kritisk alarm (prosent). + pub disk_critical_pct: f64, +} + +impl Default for PruningConfig { + fn default() -> Self { + Self { + audio_ttl_days: 30, + image_ttl_days: 30, + video_ttl_days: 14, + disk_warning_pct: 85.0, + disk_aggressive_pct: 90.0, + disk_critical_pct: 95.0, + } + } +} + +/// En CAS-fil-kandidat for pruning. +#[derive(Debug, sqlx::FromRow)] +#[allow(dead_code)] +struct PruneCandidate { + id: Uuid, + cas_hash: String, + mime_category: String, + size_bytes: i64, + created_at: DateTime, + last_accessed_at: Option>, + has_publishing_edge: bool, + is_generated: bool, + has_transcription: bool, +} + +/// Resultat fra en pruning-kjøring. +#[derive(Debug, Default, serde::Serialize)] +pub struct PruneResult { + pub candidates_checked: usize, + pub files_deleted: usize, + pub bytes_freed: u64, + pub disk_pct_before: f64, + pub disk_pct_after: f64, + pub emergency_level: &'static str, +} + +/// Kjør pruning-logikk. Returnerer statistikk. +pub async fn run_pruning( + db: &PgPool, + cas: &CasStore, + config: &PruningConfig, +) -> Result { + let disk_pct = cas.disk_usage_percent().await.map_err(|e| format!("Disk-sjekk feilet: {e}"))?; + + let emergency_level = if disk_pct >= config.disk_critical_pct { + "critical" + } else if disk_pct >= config.disk_aggressive_pct { + "aggressive" + } else if disk_pct >= config.disk_warning_pct { + "warning" + } else { + "normal" + }; + + tracing::info!( + disk_pct = disk_pct, + level = emergency_level, + "Pruning startet" + ); + + let mut result = PruneResult { + disk_pct_before: disk_pct, + emergency_level, + ..Default::default() + }; + + // Fase 1: Slett generert innhold ved disk ≥ warning (85%) + if disk_pct >= config.disk_warning_pct { + let freed = prune_generated(db, cas).await?; + result.files_deleted += freed.0; + result.bytes_freed += freed.1; + tracing::info!( + files = freed.0, + bytes = freed.1, + "Fase 1: Slettet generert innhold" + ); + } + + // Fase 2: Standard TTL-basert pruning (kjører alltid) + let ttl_freed = prune_by_ttl(db, cas, config, emergency_level).await?; + result.candidates_checked += ttl_freed.0; + result.files_deleted += ttl_freed.1; + result.bytes_freed += ttl_freed.2; + + // Fase 3: Kritisk — slett alt uten publishing-edge (unntatt tekst) + if disk_pct >= config.disk_critical_pct { + let critical_freed = prune_critical(db, cas).await?; + result.files_deleted += critical_freed.0; + result.bytes_freed += critical_freed.1; + tracing::warn!( + files = critical_freed.0, + bytes = critical_freed.1, + "Fase 3: KRITISK pruning — alt uten publishing-edge slettet" + ); + } + + // Sjekk disk etter pruning + result.disk_pct_after = cas + .disk_usage_percent() + .await + .unwrap_or(disk_pct); + + tracing::info!( + files_deleted = result.files_deleted, + bytes_freed = result.bytes_freed, + disk_before = format!("{:.1}%", result.disk_pct_before), + disk_after = format!("{:.1}%", result.disk_pct_after), + "Pruning fullført" + ); + + Ok(result) +} + +/// Slett generert innhold (TTS, thumbnails osv.) — kan regenereres. +/// Identifiseres ved metadata.tts eller metadata.generated = true. +async fn prune_generated(db: &PgPool, cas: &CasStore) -> Result<(usize, u64), String> { + let rows: Vec<(String, Uuid)> = sqlx::query_as( + r#" + SELECT metadata->>'cas_hash' AS cas_hash, id + FROM nodes + WHERE node_kind = 'media' + AND metadata->>'cas_hash' IS NOT NULL + AND ( + metadata ? 'tts' + OR metadata->>'generated' = 'true' + ) + "#, + ) + .fetch_all(db) + .await + .map_err(|e| format!("Spørring for generert innhold feilet: {e}"))?; + + let mut deleted = 0usize; + let mut freed = 0u64; + + for (hash, node_id) in &rows { + match cas.delete(hash).await { + Ok(bytes) if bytes > 0 => { + deleted += 1; + freed += bytes; + log_prune(db, *node_id, hash, bytes, "generated_cleanup").await; + } + Ok(_) => {} // Allerede borte + Err(e) => { + tracing::warn!(hash = %hash, error = %e, "Kunne ikke slette generert fil"); + } + } + } + + Ok((deleted, freed)) +} + +/// TTL-basert pruning: slett CAS-filer som har utløpt basert på modalitet. +/// Ved aggressive/critical senkes TTL drastisk. +async fn prune_by_ttl( + db: &PgPool, + cas: &CasStore, + config: &PruningConfig, + emergency_level: &str, +) -> Result<(usize, usize, u64), String> { + // Hent kandidater: media-noder med CAS-hash, MIME, alder, edges, tilgangstid + let candidates: Vec = sqlx::query_as( + r#" + SELECT + n.id, + n.metadata->>'cas_hash' AS cas_hash, + CASE + WHEN n.metadata->>'mime' LIKE 'audio/%' THEN 'audio' + WHEN n.metadata->>'mime' LIKE 'image/%' THEN 'image' + WHEN n.metadata->>'mime' LIKE 'video/%' THEN 'video' + ELSE 'other' + END AS mime_category, + COALESCE((n.metadata->>'size_bytes')::bigint, 0) AS size_bytes, + n.created_at, + n.last_accessed_at, + EXISTS( + SELECT 1 FROM edges e + WHERE (e.source_id = n.id OR e.target_id = n.id) + AND e.edge_type IN ('belongs_to', 'has_media') + AND EXISTS( + SELECT 1 FROM edges pub + WHERE pub.edge_type = 'publishing' + AND (pub.source_id = e.target_id OR pub.source_id = e.source_id) + ) + ) AS has_publishing_edge, + COALESCE(n.metadata ? 'tts' OR n.metadata->>'generated' = 'true', false) AS is_generated, + EXISTS( + SELECT 1 FROM transcription_segments ts + WHERE ts.node_id = n.id + ) AS has_transcription + FROM nodes n + WHERE n.node_kind = 'media' + AND n.metadata->>'cas_hash' IS NOT NULL + ORDER BY n.created_at ASC + "#, + ) + .fetch_all(db) + .await + .map_err(|e| format!("TTL-kandidatspørring feilet: {e}"))?; + + let now = Utc::now(); + let mut checked = 0usize; + let mut deleted = 0usize; + let mut freed = 0u64; + + for c in &candidates { + checked += 1; + + // Tekst/transkripsjon slettes aldri + if c.mime_category == "other" { + continue; + } + + // Publishing-edge = behold for alltid (unntatt ved kritisk) + if c.has_publishing_edge && emergency_level != "critical" { + continue; + } + + // Bestem TTL basert på modalitet og nødsituasjon + let ttl_days = match emergency_level { + "critical" => 0, // Slett alt umiddelbart + "aggressive" => match c.mime_category.as_str() { + "audio" => config.audio_ttl_days / 3, // 10 dager + "image" => config.image_ttl_days / 3, + "video" => 1, // Video slettes nesten umiddelbart + _ => i64::MAX, + }, + _ => match c.mime_category.as_str() { + "audio" => config.audio_ttl_days, + "image" => config.image_ttl_days, + "video" => config.video_ttl_days, + _ => i64::MAX, + }, + }; + + if ttl_days == i64::MAX { + continue; + } + + // Beregn effektiv alder — siste tilgang forlenger levetiden + let reference_time = c.last_accessed_at.unwrap_or(c.created_at); + let age_days = (now - reference_time).num_days(); + + if age_days < ttl_days { + continue; // Ikke utløpt ennå + } + + // Lyd uten transkripsjon: behold lengre (trenger transkribering først) + if c.mime_category == "audio" && !c.has_transcription && emergency_level == "normal" { + tracing::debug!( + node_id = %c.id, + "Beholder utranskribert lyd — trenger transkribering" + ); + continue; + } + + // Allerede slettet generert innhold i fase 1 — hopp over + if c.is_generated { + continue; + } + + // Slett filen + match cas.delete(&c.cas_hash).await { + Ok(bytes) if bytes > 0 => { + deleted += 1; + freed += bytes; + log_prune(db, c.id, &c.cas_hash, bytes, "ttl_expired").await; + } + Ok(_) => {} // Allerede borte + Err(e) => { + tracing::warn!( + hash = %c.cas_hash, + node_id = %c.id, + error = %e, + "Kunne ikke slette CAS-fil" + ); + } + } + } + + Ok((checked, deleted, freed)) +} + +/// Kritisk pruning: slett ALT uten publishing-edge (unntatt tekst/transkripsjon). +async fn prune_critical(db: &PgPool, cas: &CasStore) -> Result<(usize, u64), String> { + let rows: Vec<(String, Uuid, i64)> = sqlx::query_as( + r#" + SELECT + n.metadata->>'cas_hash' AS cas_hash, + n.id, + COALESCE((n.metadata->>'size_bytes')::bigint, 0) AS size_bytes + FROM nodes n + WHERE n.node_kind = 'media' + AND n.metadata->>'cas_hash' IS NOT NULL + AND NOT EXISTS( + SELECT 1 FROM edges e + WHERE (e.source_id = n.id OR e.target_id = n.id) + AND e.edge_type IN ('belongs_to', 'has_media') + AND EXISTS( + SELECT 1 FROM edges pub + WHERE pub.edge_type = 'publishing' + AND (pub.source_id = e.target_id OR pub.source_id = e.source_id) + ) + ) + "#, + ) + .fetch_all(db) + .await + .map_err(|e| format!("Kritisk pruning-spørring feilet: {e}"))?; + + let mut deleted = 0usize; + let mut freed = 0u64; + + for (hash, node_id, _size) in &rows { + match cas.delete(hash).await { + Ok(bytes) if bytes > 0 => { + deleted += 1; + freed += bytes; + log_prune(db, *node_id, hash, bytes, "critical_emergency").await; + } + Ok(_) => {} + Err(e) => { + tracing::warn!(hash = %hash, error = %e, "Kritisk: kunne ikke slette CAS-fil"); + } + } + } + + Ok((deleted, freed)) +} + +/// Logg en pruning-hendelse til resource_usage_log. +async fn log_prune(db: &PgPool, node_id: Uuid, hash: &str, bytes: u64, reason: &str) { + let detail = serde_json::json!({ + "hash": hash, + "size_bytes": bytes, + "operation": "delete", + "reason": reason, + }); + + let _ = sqlx::query( + r#" + INSERT INTO resource_usage_log (target_node_id, resource_type, detail) + VALUES ($1, 'cas', $2) + "#, + ) + .bind(node_id) + .bind(&detail) + .execute(db) + .await + .map_err(|e| { + tracing::warn!(error = %e, "Kunne ikke logge pruning-hendelse"); + }); +} + +/// Oppdater last_accessed_at for en node basert på CAS-hash. +/// Kalles fra serving.rs når en CAS-fil hentes. +pub async fn touch_access(db: &PgPool, cas_hash: &str) { + let _ = sqlx::query( + "UPDATE nodes SET last_accessed_at = now() WHERE metadata->>'cas_hash' = $1", + ) + .bind(cas_hash) + .execute(db) + .await + .map_err(|e| { + tracing::debug!(error = %e, "Kunne ikke oppdatere last_accessed_at"); + }); +} + +/// Start periodisk pruning-loop som bakgrunnsoppgave. +/// Kjører hvert 6. time, eller oftere ved høy diskbruk. +pub fn start_pruning_loop(db: PgPool, cas: CasStore) { + let config = PruningConfig::default(); + + tokio::spawn(async move { + // Vent 60 sekunder etter oppstart før første kjøring + tokio::time::sleep(std::time::Duration::from_secs(60)).await; + tracing::info!("Pruning-loop startet (intervall: 6t, nødsjekk: 10min)"); + + loop { + match run_pruning(&db, &cas, &config).await { + Ok(result) => { + if result.files_deleted > 0 { + tracing::info!( + deleted = result.files_deleted, + freed_mb = result.bytes_freed / 1_048_576, + "Pruning: {} filer slettet, {} MB frigitt", + result.files_deleted, + result.bytes_freed / 1_048_576, + ); + } + + // Ved høy diskbruk: sjekk igjen om 10 minutter + let sleep_secs = if result.disk_pct_after >= config.disk_warning_pct { + tracing::warn!( + disk_pct = result.disk_pct_after, + "Disk fortsatt høy — neste pruning om 10 min" + ); + 600 // 10 minutter + } else { + 6 * 3600 // 6 timer + }; + + tokio::time::sleep(std::time::Duration::from_secs(sleep_secs)).await; + } + Err(e) => { + tracing::error!(error = %e, "Pruning feilet"); + // Vent 30 minutter ved feil + tokio::time::sleep(std::time::Duration::from_secs(1800)).await; + } + } + } + }); +} diff --git a/maskinrommet/src/serving.rs b/maskinrommet/src/serving.rs index 350103b..7d48a6c 100644 --- a/maskinrommet/src/serving.rs +++ b/maskinrommet/src/serving.rs @@ -61,6 +61,13 @@ pub async fn get_cas_file( StatusCode::INTERNAL_SERVER_ERROR })?; + // Oppdater last_accessed_at asynkront (fire-and-forget for ytelse) + let db_clone = state.db.clone(); + let hash_clone = hash.clone(); + tokio::spawn(async move { + crate::pruning::touch_access(&db_clone, &hash_clone).await; + }); + let stream = ReaderStream::new(file); let body = Body::from_stream(stream); diff --git a/migrations/010_pruning.sql b/migrations/010_pruning.sql new file mode 100644 index 0000000..3417dc5 --- /dev/null +++ b/migrations/010_pruning.sql @@ -0,0 +1,16 @@ +-- 010_pruning.sql +-- Støtte for pruning-logikk: TTL per modalitet, signaler som forlenger levetid. +-- Ref: docs/retninger/maskinrommet.md § CAS og intelligent pruning + +BEGIN; + +-- last_accessed_at: sporer siste gang en CAS-fil ble servert. +-- Brukes som signal for å forlenge TTL — "noen bruker denne filen aktivt". +ALTER TABLE nodes ADD COLUMN IF NOT EXISTS last_accessed_at TIMESTAMPTZ; + +-- Indeks for pruning-spørringer: finn media-noder sortert etter alder/tilgang. +CREATE INDEX IF NOT EXISTS idx_nodes_media_access + ON nodes (node_kind, last_accessed_at, created_at) + WHERE node_kind = 'media'; + +COMMIT; diff --git a/tasks.md b/tasks.md index 5f39ce9..bdf9353 100644 --- a/tasks.md +++ b/tasks.md @@ -125,8 +125,7 @@ Uavhengige faser kan fortsatt plukkes. - [x] 11.1 LiveKit oppsett: Docker-container for WebRTC. Ref: `docs/setup/produksjon.md`. - [x] 11.2 Sanntidslyd: kommunikasjonsnode med live-status → LiveKit-rom for deltakere. -- [~] 11.3 Pruning-logikk: TTL per modalitet, signaler som forlenger levetid, disk-nødventil. - > Påbegynt: 2026-03-17T23:55 +- [x] 11.3 Pruning-logikk: TTL per modalitet, signaler som forlenger levetid, disk-nødventil. - [ ] 11.4 Podcast-RSS: samlings-node med publiserings-edges → generert RSS-feed. ## Fase 13: Trait-system