synops/maskinrommet/src/bandwidth.rs
vegard eb81055ef4 Fullfører oppgave 15.7: Ressursforbruk-logging
Sentralisert logging av alle ressurskrevende operasjoner til
resource_usage_log-tabellen (opprettet i migrasjon 009).

Ny kode:
- resource_usage.rs: hjelpemodul med log() og find_collection_for_node()
- bandwidth.rs: Caddy JSON-logg-parser med nattlig batch-jobb (kl 03:00)

Logging lagt til i handlere:
- AI: summarize, ai_edges (token-telling via LiteLLM usage-felt),
  agent (placeholder — claude CLI gir ikke token-info)
- Whisper: duration_seconds, model, language, mode
- TTS: refaktorert til sentralisert modul, lagt til collection_id
- CAS: logger nye filer ved upload (ikke dedup)
- LiveKit: logger join-hendelser (faktisk deltaker-minutter
  krever webhook-integrasjon i fremtiden)

Caddy-config: JSON access logging aktivert for sidelinja.org og
synops.no i /srv/synops/config/caddy/Caddyfile (utenfor repo).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-18 04:24:54 +00:00

274 lines
8.5 KiB
Rust

// Båndbredde-logging via Caddy-logg-parsing.
//
// Nattlig batch-jobb som leser Caddy JSON-access-logger og
// aggregerer bytes servert til resource_usage_log.
//
// Caddy JSON-format (relevante felter):
// { "request": { "uri": "/media/...", "headers": { "User-Agent": [...] } },
// "status": 200, "size": 84000000, "ts": 1710000000.0 }
//
// Ref: docs/features/ressursforbruk.md
use serde::Deserialize;
use sqlx::PgPool;
use uuid::Uuid;
/// Caddy JSON-loggformat (kun feltene vi trenger).
#[derive(Deserialize)]
struct CaddyLogEntry {
request: CaddyRequest,
#[serde(default)]
status: u16,
#[serde(default)]
size: u64,
#[serde(default)]
#[allow(dead_code)]
ts: f64,
}
#[derive(Deserialize)]
struct CaddyRequest {
uri: String,
#[serde(default)]
headers: std::collections::HashMap<String, Vec<String>>,
}
/// Aggregert båndbredde per path.
struct BandwidthEntry {
path: String,
size_bytes: u64,
client: String,
}
/// Parser en Caddy JSON-loggfil og returnerer båndbredde-entries
/// for media/CAS/pub-forespørsler med status 200-299.
fn parse_caddy_log(content: &str) -> Vec<BandwidthEntry> {
let mut entries = Vec::new();
for line in content.lines() {
let line = line.trim();
if line.is_empty() {
continue;
}
let entry: CaddyLogEntry = match serde_json::from_str(line) {
Ok(e) => e,
Err(_) => continue,
};
// Kun vellykkede forespørsler med innhold
if entry.status < 200 || entry.status >= 300 || entry.size == 0 {
continue;
}
// Kun media-, CAS- og publiserings-trafikk
let uri = &entry.request.uri;
if !uri.starts_with("/media/")
&& !uri.starts_with("/cas/")
&& !uri.starts_with("/pub/")
{
continue;
}
let client = entry
.request
.headers
.get("User-Agent")
.or_else(|| entry.request.headers.get("user-agent"))
.and_then(|vals| vals.first())
.map(|ua| parse_client_name(ua))
.unwrap_or_else(|| "unknown".to_string());
entries.push(BandwidthEntry {
path: uri.clone(),
size_bytes: entry.size,
client,
});
}
entries
}
/// Ekstraher klientnavn fra User-Agent-streng.
fn parse_client_name(ua: &str) -> String {
let ua_lower = ua.to_lowercase();
if ua_lower.contains("apple podcasts") || ua_lower.contains("applepodcasts") {
"Apple Podcasts".to_string()
} else if ua_lower.contains("spotify") {
"Spotify".to_string()
} else if ua_lower.contains("overcast") {
"Overcast".to_string()
} else if ua_lower.contains("pocket casts") || ua_lower.contains("pocketcasts") {
"Pocket Casts".to_string()
} else if ua_lower.contains("firefox") {
"Firefox".to_string()
} else if ua_lower.contains("chrome") {
"Chrome".to_string()
} else if ua_lower.contains("safari") {
"Safari".to_string()
} else if ua_lower.contains("bot") || ua_lower.contains("crawler") {
"bot".to_string()
} else {
// Returner første token (typisk klientnavn)
ua.split_whitespace()
.next()
.unwrap_or("unknown")
.chars()
.take(50)
.collect()
}
}
/// Kjør båndbredde-logg-parsing for en gitt loggfil.
/// Leser filen, parser entries, og skriver til resource_usage_log.
///
/// Bruker en "synops_bandwidth_system"-node som target_node_id
/// (systemet selv — ikke en spesifikk brukernode).
pub async fn parse_and_log_bandwidth(db: &PgPool, log_path: &str) -> Result<u64, String> {
let content = tokio::fs::read_to_string(log_path)
.await
.map_err(|e| format!("Kunne ikke lese loggfil {log_path}: {e}"))?;
let entries = parse_caddy_log(&content);
if entries.is_empty() {
tracing::info!(log_path = %log_path, "Ingen bandwidth-entries i loggfilen");
return Ok(0);
}
// Finn en system-node å logge mot. Bruk Sidelinja-samlingsnoden
// som fallback target for bandwidth-logging.
let system_node_id: Uuid = sqlx::query_scalar::<_, Uuid>(
"SELECT id FROM nodes WHERE node_kind = 'collection' ORDER BY created_at ASC LIMIT 1",
)
.fetch_optional(db)
.await
.map_err(|e| format!("PG-feil: {e}"))?
.unwrap_or_else(Uuid::nil);
let mut logged = 0u64;
for entry in &entries {
if let Err(e) = crate::resource_usage::log(
db,
system_node_id,
None, // system-jobb, ingen bruker
None,
"bandwidth",
serde_json::json!({
"size_bytes": entry.size_bytes,
"path": entry.path,
"client": entry.client
}),
)
.await
{
tracing::warn!(error = %e, path = %entry.path, "Kunne ikke logge bandwidth-entry");
} else {
logged += 1;
}
}
tracing::info!(
log_path = %log_path,
entries = entries.len(),
logged = logged,
"Bandwidth-logging fullført"
);
Ok(logged)
}
/// Start periodisk båndbredde-parsing (nattlig batch-jobb, kjører kl 03:00).
pub fn start_bandwidth_parser(db: PgPool) {
tokio::spawn(async move {
tracing::info!("Bandwidth-parser startet (nattlig jobb kl 03:00)");
loop {
// Beregn tid til neste kjøring (kl 03:00)
let now = chrono::Utc::now();
let today_03 = now
.date_naive()
.and_hms_opt(3, 0, 0)
.unwrap();
let next_run = if now.naive_utc() > today_03 {
// Allerede passert 03:00 i dag — kjør i morgen
today_03 + chrono::Duration::days(1)
} else {
today_03
};
let sleep_duration = (next_run - now.naive_utc())
.to_std()
.unwrap_or(std::time::Duration::from_secs(3600));
tracing::debug!(
next_run = %next_run,
sleep_secs = sleep_duration.as_secs(),
"Bandwidth-parser sover til neste kjøring"
);
tokio::time::sleep(sleep_duration).await;
// Kjør parsing for alle loggfiler
let log_files = vec![
"/var/log/caddy/access-sidelinja.log",
"/var/log/caddy/access-synops.log",
];
for path in &log_files {
match parse_and_log_bandwidth(&db, path).await {
Ok(count) => {
if count > 0 {
tracing::info!(path = %path, entries = count, "Bandwidth-logging fullført");
}
}
Err(e) => {
tracing::warn!(path = %path, error = %e, "Bandwidth-parsing feilet");
}
}
}
}
});
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_caddy_log_basic() {
let log = r#"{"request":{"uri":"/media/podcast/ep47.mp3","headers":{"User-Agent":["Apple Podcasts/1.0"]}},"status":200,"size":84000000,"ts":1710000000.0}
{"request":{"uri":"/api/health","headers":{}},"status":200,"size":42,"ts":1710000001.0}
{"request":{"uri":"/pub/sidelinja/article-1","headers":{"User-Agent":["Mozilla/5.0 Chrome/120"]}},"status":200,"size":15000,"ts":1710000002.0}
{"request":{"uri":"/media/podcast/ep47.mp3","headers":{}},"status":304,"size":0,"ts":1710000003.0}
"#;
let entries = parse_caddy_log(log);
assert_eq!(entries.len(), 2); // /api/health filtreres ut, 304 filtreres ut
assert_eq!(entries[0].path, "/media/podcast/ep47.mp3");
assert_eq!(entries[0].size_bytes, 84000000);
assert_eq!(entries[0].client, "Apple Podcasts");
assert_eq!(entries[1].path, "/pub/sidelinja/article-1");
assert_eq!(entries[1].client, "Chrome");
}
#[test]
fn test_parse_client_name() {
assert_eq!(parse_client_name("ApplePodcasts/3.0"), "Apple Podcasts");
assert_eq!(parse_client_name("Spotify/8.6.0"), "Spotify");
assert_eq!(
parse_client_name("Mozilla/5.0 (compatible; Googlebot/2.1)"),
"bot"
);
assert_eq!(
parse_client_name("Mozilla/5.0 Chrome/120.0"),
"Chrome"
);
}
#[test]
fn test_parse_caddy_log_empty() {
assert!(parse_caddy_log("").is_empty());
assert!(parse_caddy_log("\n\n").is_empty());
}
}