// synops-stats: Parse Caddy access logs for /media/* requests. // // Aggregerer podcast-nedlastinger per episode per dag med IAB-regler: // - Filtrerer kjente bots via user-agent // - Unik IP per 24t (én nedlasting per IP per episode per dag) // // Output: JSON med episode_id, date, downloads, unique_listeners. // --write lagrer i PG (podcast_download_stats-tabell). // // Caddy JSON-format: // { "request": { "uri": "/media/cas/xx/yy/hash", "remote_ip": "..." }, // "status": 200, "size": 84000000, "ts": 1710000000.0 } use chrono::{DateTime, NaiveDate, Utc}; use clap::Parser; use serde::{Deserialize, Serialize}; use std::collections::{HashMap, HashSet}; use std::process; #[derive(Parser)] #[command(name = "synops-stats", about = "Parse Caddy-logger for podcast-nedlastingsstatistikk")] struct Cli { /// Sti til Caddy JSON access-logg (kan angis flere ganger) #[arg(long, default_values_t = vec![ "/var/log/caddy/access-synops.log".to_string(), "/var/log/caddy/access-sidelinja.log".to_string(), ])] log_file: Vec, /// Filtrer til kun denne datoen (YYYY-MM-DD). Uten: alle datoer. #[arg(long)] date: Option, /// Skriv resultater til PostgreSQL #[arg(long)] write: bool, } // --- Caddy log parsing --- #[derive(Deserialize)] struct CaddyLogEntry { request: CaddyRequest, #[serde(default)] status: u16, #[serde(default)] size: u64, #[serde(default)] ts: f64, } #[derive(Deserialize)] struct CaddyRequest { uri: String, #[serde(default)] remote_ip: String, #[serde(default)] headers: HashMap>, } // --- IAB bot detection --- /// Kjente bot user-agents (IAB 2.2 compliance subset). /// Ref: IAB/ABC International Spiders & Bots List. const BOT_PATTERNS: &[&str] = &[ "bot", "crawler", "spider", "crawl", "slurp", "mediapartners", "facebookexternalhit", "twitterbot", "linkedinbot", "whatsapp", "telegrambot", "discordbot", "bingpreview", "googlebot", "yandex", "baidu", "duckduckbot", "sogou", "exabot", "ia_archiver", "archive.org_bot", "wget", "httrack", "python-requests", "python-urllib", "libwww-perl", "java/", "okhttp", "go-http-client", "curl/", // Automated fetchers, not real listeners "headlesschrome", "phantomjs", "scrapy", "ahrefsbot", "semrushbot", "mj12bot", "dotbot", "bytespider", "petalbot", "applebot", // Apple's crawler, NOT Apple Podcasts "gptbot", "claudebot", "ccbot", "seznambot", ]; fn is_bot(ua: &str) -> bool { let ua_lower = ua.to_lowercase(); BOT_PATTERNS.iter().any(|pat| ua_lower.contains(pat)) } fn get_user_agent(headers: &HashMap>) -> Option<&str> { headers .get("User-Agent") .or_else(|| headers.get("user-agent")) .and_then(|vals| vals.first()) .map(|s| s.as_str()) } /// Ekstraher klientnavn fra User-Agent for aggregering. fn parse_client_name(ua: &str) -> String { let ua_lower = ua.to_lowercase(); if ua_lower.contains("apple podcasts") || ua_lower.contains("applepodcasts") || ua_lower.contains("itunes") { "Apple Podcasts".to_string() } else if ua_lower.contains("spotify") { "Spotify".to_string() } else if ua_lower.contains("overcast") { "Overcast".to_string() } else if ua_lower.contains("pocket casts") || ua_lower.contains("pocketcasts") { "Pocket Casts".to_string() } else if ua_lower.contains("castbox") { "Castbox".to_string() } else if ua_lower.contains("podcastaddict") || ua_lower.contains("podcast addict") { "Podcast Addict".to_string() } else if ua_lower.contains("castro") { "Castro".to_string() } else if ua_lower.contains("stitcher") { "Stitcher".to_string() } else if ua_lower.contains("podbean") { "Podbean".to_string() } else if ua_lower.contains("deezer") { "Deezer".to_string() } else if ua_lower.contains("firefox") { "Firefox".to_string() } else if ua_lower.contains("chrome") && !ua_lower.contains("headlesschrome") { "Chrome".to_string() } else if ua_lower.contains("safari") && !ua_lower.contains("chrome") { "Safari".to_string() } else { ua.split_whitespace() .next() .unwrap_or("unknown") .chars() .take(50) .collect() } } // --- CAS hash extraction --- /// Ekstraher CAS-hash fra /media/cas/xx/yy/hash URI. /// Returnerer full hash (64 hex chars) hvis mulig. fn extract_cas_hash(uri: &str) -> Option { // Format: /media/cas/{h1}/{h2}/{full_hash} let path = uri.strip_prefix("/media/cas/")?; let parts: Vec<&str> = path.splitn(4, '/').collect(); if parts.len() >= 3 { let hash = parts[2].split('?').next().unwrap_or(parts[2]); // Strip query params if hash.len() >= 32 && hash.chars().all(|c| c.is_ascii_hexdigit()) { return Some(hash.to_string()); } } None } // --- Output types --- #[derive(Debug, Serialize)] struct EpisodeStats { episode_id: Option, cas_hash: String, episode_title: Option, date: String, downloads: u64, unique_listeners: u64, clients: HashMap, } #[derive(Debug, Serialize)] struct StatsOutput { generated_at: String, log_files: Vec, total_downloads: u64, total_unique_listeners: u64, total_bot_filtered: u64, episodes: Vec, } // --- Parsed download event --- struct DownloadEvent { cas_hash: String, date: NaiveDate, remote_ip: String, client: String, } fn parse_log_file(content: &str, date_filter: Option) -> (Vec, u64) { let mut events = Vec::new(); let mut bot_count = 0u64; for line in content.lines() { let line = line.trim(); if line.is_empty() { continue; } let entry: CaddyLogEntry = match serde_json::from_str(line) { Ok(e) => e, Err(_) => continue, }; // Kun /media/ requests if !entry.request.uri.starts_with("/media/") { continue; } // Kun vellykkede forespørsler (200-299) med innhold if entry.status < 200 || entry.status >= 300 || entry.size == 0 { continue; } // IAB: filtrer bots let ua = get_user_agent(&entry.request.headers).unwrap_or(""); if is_bot(ua) { bot_count += 1; continue; } // Ekstraher CAS-hash let cas_hash = match extract_cas_hash(&entry.request.uri) { Some(h) => h, None => continue, // Ikke en CAS-fil, hopp over }; // Konverter timestamp til dato let ts_secs = entry.ts as i64; let ts_nanos = ((entry.ts - ts_secs as f64) * 1_000_000_000.0) as u32; let dt = match DateTime::::from_timestamp(ts_secs, ts_nanos) { Some(dt) => dt, None => continue, }; let date = dt.date_naive(); // Datofilter if let Some(filter_date) = date_filter { if date != filter_date { continue; } } let client = parse_client_name(ua); let remote_ip = entry.request.remote_ip.clone(); events.push(DownloadEvent { cas_hash, date, remote_ip, client, }); } (events, bot_count) } /// Aggreger events til per-episode-per-dag-statistikk med IAB unik-IP-filter. fn aggregate(events: Vec) -> Vec { // Grupper etter (cas_hash, date) struct Bucket { downloads: u64, ips: HashSet, clients: HashMap, } let mut buckets: HashMap<(String, NaiveDate), Bucket> = HashMap::new(); for event in &events { let key = (event.cas_hash.clone(), event.date); let bucket = buckets.entry(key).or_insert_with(|| Bucket { downloads: 0, ips: HashSet::new(), clients: HashMap::new(), }); // IAB: kun tell én nedlasting per IP per episode per 24t (dag) if bucket.ips.insert(event.remote_ip.clone()) { bucket.downloads += 1; *bucket.clients.entry(event.client.clone()).or_insert(0) += 1; } } let mut stats: Vec = buckets .into_iter() .map(|((cas_hash, date), bucket)| EpisodeStats { episode_id: None, cas_hash, episode_title: None, date: date.format("%Y-%m-%d").to_string(), downloads: bucket.downloads, unique_listeners: bucket.ips.len() as u64, clients: bucket.clients, }) .collect(); stats.sort_by(|a, b| a.date.cmp(&b.date).then(b.downloads.cmp(&a.downloads))); stats } /// Berik statistikk med episode-info fra PG (cas_hash → episode node via has_media edge). async fn enrich_from_db( db: &sqlx::PgPool, stats: &mut [EpisodeStats], ) -> Result<(), sqlx::Error> { let hashes: Vec = stats.iter().map(|s| s.cas_hash.clone()).collect(); if hashes.is_empty() { return Ok(()); } // Finn episode-noder via: episode --has_media--> media_node (metadata.cas_hash) let rows: Vec<(uuid::Uuid, String, Option)> = sqlx::query_as( r#" SELECT DISTINCT ON (m.metadata->>'cas_hash') ep.id, m.metadata->>'cas_hash' AS cas_hash, ep.title FROM nodes m JOIN edges e ON e.target_id = m.id AND e.edge_type = 'has_media' JOIN nodes ep ON ep.id = e.source_id WHERE m.node_kind = 'media' AND m.metadata->>'cas_hash' = ANY($1) ORDER BY m.metadata->>'cas_hash', ep.created_at ASC "#, ) .bind(&hashes) .fetch_all(db) .await?; let lookup: HashMap)> = rows .into_iter() .map(|(id, hash, title)| (hash, (id, title))) .collect(); for stat in stats.iter_mut() { if let Some((id, title)) = lookup.get(&stat.cas_hash) { stat.episode_id = Some(*id); stat.episode_title = title.clone(); } } Ok(()) } /// Skriv statistikk til PG. async fn write_to_db( db: &sqlx::PgPool, stats: &[EpisodeStats], ) -> Result { // Opprett tabell hvis den ikke finnes sqlx::query( r#" CREATE TABLE IF NOT EXISTS podcast_download_stats ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), episode_id UUID REFERENCES nodes(id), cas_hash TEXT NOT NULL, date DATE NOT NULL, downloads BIGINT NOT NULL DEFAULT 0, unique_listeners BIGINT NOT NULL DEFAULT 0, clients JSONB NOT NULL DEFAULT '{}', created_at TIMESTAMPTZ NOT NULL DEFAULT now(), UNIQUE (cas_hash, date) ) "#, ) .execute(db) .await?; let mut written = 0u64; for stat in stats { let result = sqlx::query( r#" INSERT INTO podcast_download_stats (episode_id, cas_hash, date, downloads, unique_listeners, clients) VALUES ($1, $2, $3::date, $4, $5, $6) ON CONFLICT (cas_hash, date) DO UPDATE SET episode_id = COALESCE(EXCLUDED.episode_id, podcast_download_stats.episode_id), downloads = EXCLUDED.downloads, unique_listeners = EXCLUDED.unique_listeners, clients = EXCLUDED.clients "#, ) .bind(stat.episode_id) .bind(&stat.cas_hash) .bind(&stat.date) .bind(stat.downloads as i64) .bind(stat.unique_listeners as i64) .bind(serde_json::to_value(&stat.clients).unwrap_or_default()) .execute(db) .await; match result { Ok(_) => written += 1, Err(e) => { tracing::warn!(cas_hash = %stat.cas_hash, date = %stat.date, error = %e, "Kunne ikke skrive stat"); } } } Ok(written) } #[tokio::main] async fn main() { synops_common::logging::init("synops_stats"); let cli = Cli::parse(); if let Err(e) = run(cli).await { eprintln!("Feil: {e}"); process::exit(1); } } async fn run(cli: Cli) -> Result<(), String> { // Parse datofilter let date_filter = cli .date .as_ref() .map(|d| { NaiveDate::parse_from_str(d, "%Y-%m-%d") .map_err(|e| format!("Ugyldig dato '{}': {e}", d)) }) .transpose()?; // Les og parse alle loggfiler let mut all_events = Vec::new(); let mut total_bots = 0u64; let mut log_files_used = Vec::new(); for path in &cli.log_file { let content = match tokio::fs::read_to_string(path).await { Ok(c) => c, Err(e) => { tracing::warn!(path = %path, error = %e, "Kunne ikke lese loggfil, hopper over"); continue; } }; log_files_used.push(path.clone()); let (events, bots) = parse_log_file(&content, date_filter); tracing::info!( path = %path, media_events = events.len(), bots_filtered = bots, "Loggfil parset" ); total_bots += bots; all_events.extend(events); } // Aggreger let mut stats = aggregate(all_events); // Berik med DB-info og evt. skriv let db = if cli.write || !stats.is_empty() { match synops_common::db::connect().await { Ok(pool) => Some(pool), Err(e) => { tracing::warn!(error = %e, "Kunne ikke koble til database"); None } } } else { None }; if let Some(ref pool) = db { if let Err(e) = enrich_from_db(pool, &mut stats).await { tracing::warn!(error = %e, "Kunne ikke berike med episode-info fra DB"); } } // Bygg output let total_downloads: u64 = stats.iter().map(|s| s.downloads).sum(); let total_unique: u64 = stats.iter().map(|s| s.unique_listeners).sum(); let output = StatsOutput { generated_at: Utc::now().to_rfc3339(), log_files: log_files_used, total_downloads, total_unique_listeners: total_unique, total_bot_filtered: total_bots, episodes: stats, }; // JSON til stdout let json = serde_json::to_string_pretty(&output) .map_err(|e| format!("JSON-serialisering feilet: {e}"))?; println!("{json}"); // Skriv til PG if cli.write { let pool = db.as_ref().ok_or("Database ikke tilgjengelig for --write")?; let written = write_to_db(pool, &output.episodes) .await .map_err(|e| format!("Databasefeil: {e}"))?; tracing::info!(written = written, "Statistikk skrevet til database"); } Ok(()) } #[cfg(test)] mod tests { use super::*; #[test] fn test_extract_cas_hash() { assert_eq!( extract_cas_hash("/media/cas/b9/4d/b94d27b9934d3e08a52e52d7da7dabfac484efe37a5380ee9088f7ace2efcde9"), Some("b94d27b9934d3e08a52e52d7da7dabfac484efe37a5380ee9088f7ace2efcde9".to_string()) ); assert_eq!(extract_cas_hash("/media/podcast/ep47.mp3"), None); assert_eq!(extract_cas_hash("/api/health"), None); // Med query params assert_eq!( extract_cas_hash("/media/cas/ab/cd/abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890?range=bytes"), Some("abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890".to_string()) ); } #[test] fn test_is_bot() { assert!(is_bot("Mozilla/5.0 (compatible; Googlebot/2.1)")); assert!(is_bot("python-requests/2.28.0")); assert!(is_bot("curl/7.88.1")); assert!(is_bot("Wget/1.21")); assert!(is_bot("AhrefsBot/7.0")); assert!(!is_bot("Apple Podcasts/1.0")); assert!(!is_bot("Spotify/8.6.0")); assert!(!is_bot("Mozilla/5.0 (iPhone) AppleWebKit Safari")); assert!(!is_bot("Overcast/3.0")); } #[test] fn test_parse_client_name() { assert_eq!(parse_client_name("ApplePodcasts/3.0"), "Apple Podcasts"); assert_eq!(parse_client_name("Spotify/8.6.0"), "Spotify"); assert_eq!(parse_client_name("Overcast/3.0"), "Overcast"); assert_eq!(parse_client_name("PocketCasts/1.0"), "Pocket Casts"); assert_eq!(parse_client_name("Mozilla/5.0 Chrome/120.0"), "Chrome"); assert_eq!(parse_client_name("iTunes/12.0"), "Apple Podcasts"); } #[test] fn test_parse_and_aggregate() { let hash = "b94d27b9934d3e08a52e52d7da7dabfac484efe37a5380ee9088f7ace2efcde9"; let log = format!( r#"{{"request":{{"uri":"/media/cas/b9/4d/{hash}","remote_ip":"1.2.3.4","headers":{{"User-Agent":["Apple Podcasts/1.0"]}}}},"status":200,"size":84000000,"ts":1710000000.0}} {{"request":{{"uri":"/media/cas/b9/4d/{hash}","remote_ip":"1.2.3.4","headers":{{"User-Agent":["Apple Podcasts/1.0"]}}}},"status":200,"size":84000000,"ts":1710001000.0}} {{"request":{{"uri":"/media/cas/b9/4d/{hash}","remote_ip":"5.6.7.8","headers":{{"User-Agent":["Spotify/8.6"]}}}},"status":200,"size":84000000,"ts":1710002000.0}} {{"request":{{"uri":"/media/cas/b9/4d/{hash}","remote_ip":"9.10.11.12","headers":{{"User-Agent":["Googlebot/2.1"]}}}},"status":200,"size":84000000,"ts":1710003000.0}} {{"request":{{"uri":"/api/health","remote_ip":"1.2.3.4","headers":{{}}}},"status":200,"size":42,"ts":1710004000.0}} "#, hash = hash ); let (events, bots) = parse_log_file(&log, None); assert_eq!(events.len(), 3); // 2 reelle + 1 dup IP, men bot og /api filtrert ut assert_eq!(bots, 1); // Googlebot let stats = aggregate(events); assert_eq!(stats.len(), 1); assert_eq!(stats[0].cas_hash, hash); assert_eq!(stats[0].downloads, 2); // Unik IP: 1.2.3.4 + 5.6.7.8 assert_eq!(stats[0].unique_listeners, 2); assert_eq!(stats[0].clients.get("Apple Podcasts"), Some(&1)); assert_eq!(stats[0].clients.get("Spotify"), Some(&1)); } #[test] fn test_empty_log() { let (events, bots) = parse_log_file("", None); assert!(events.is_empty()); assert_eq!(bots, 0); } #[test] fn test_date_filter() { let hash = "abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890"; let log = format!( r#"{{"request":{{"uri":"/media/cas/ab/cd/{hash}","remote_ip":"1.2.3.4","headers":{{"User-Agent":["Spotify/8"]}}}},"status":200,"size":1000,"ts":1710000000.0}} {{"request":{{"uri":"/media/cas/ab/cd/{hash}","remote_ip":"5.6.7.8","headers":{{"User-Agent":["Spotify/8"]}}}},"status":200,"size":1000,"ts":1710100000.0}} "#, hash = hash ); // ts 1710000000 = 2024-03-09, ts 1710100000 = 2024-03-11 let filter = NaiveDate::from_ymd_opt(2024, 3, 9).unwrap(); let (events, _) = parse_log_file(&log, Some(filter)); assert_eq!(events.len(), 1); } }