// synops-import-podcast — Importer eksisterende podcast fra RSS-feed. // // Input: --feed-url --collection-id [--created-by ] // [--dry-run] [--payload-json ] // // Parser RSS-feed, laster ned lydfiler og artwork til CAS, oppretter // content-noder med has_media- og belongs_to-edges. Duplikatdeteksjon // via — idempotent: kjør flere ganger, bare nye episoder importeres. // // --dry-run viser hva som ville blitt importert uten å skrive til DB/CAS. // // Miljøvariabler: // DATABASE_URL — PostgreSQL-tilkobling (påkrevd) // CAS_ROOT — CAS-rotkatalog (default: /srv/synops/media/cas) // RUST_LOG — Loggnivå (default: synops_import_podcast=info) // // Ref: docs/features/podcast_hosting.md use chrono::{DateTime, Utc}; use clap::Parser; use feed_rs::parser; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::process; use tracing::{debug, info, warn}; use uuid::Uuid; // ============================================================================= // CLI // ============================================================================= /// Importer eksisterende podcast fra RSS-feed. #[derive(Parser)] #[command(name = "synops-import-podcast", about = "Importer podcast fra RSS-feed")] struct Cli { /// RSS-feed URL #[arg(long)] feed_url: Option, /// Samlings-ID (collection) å importere til #[arg(long)] collection_id: Option, /// Bruker-ID som oppretter nodene #[arg(long)] created_by: Option, /// Forhåndsvisning uten å skrive til DB/CAS #[arg(long)] dry_run: bool, /// Jobb-payload som JSON (for maskinrommet/jobbkø) #[arg(long)] payload_json: Option, } /// Payload fra jobbkø. #[derive(Deserialize)] struct JobPayload { feed_url: String, collection_id: Uuid, created_by: Uuid, #[serde(default)] dry_run: bool, } // ============================================================================= // Output-typer // ============================================================================= #[derive(Serialize)] struct ImportResult { status: String, feed_url: String, feed_title: Option, episodes_found: usize, episodes_imported: usize, episodes_skipped: usize, dry_run: bool, episodes: Vec, errors: Vec, } #[derive(Serialize)] struct EpisodeResult { guid: String, title: String, published_at: Option, duration: Option, episode_number: Option, season_number: Option, action: String, // "imported", "skipped", "would_import", "error" #[serde(skip_serializing_if = "Option::is_none")] node_id: Option, #[serde(skip_serializing_if = "Option::is_none")] audio_cas_hash: Option, #[serde(skip_serializing_if = "Option::is_none")] artwork_cas_hash: Option, #[serde(skip_serializing_if = "Option::is_none")] error: Option, } // ============================================================================= // iTunes-metadata fra rå XML (feed-rs mapper ikke episode/season) // ============================================================================= /// Ekstra metadata per episode som feed-rs ikke parser. struct ItunesItemMeta { episode: Option, season: Option, image_url: Option, } /// Parse iTunes-extensions fra rå XML for hvert /. /// Returnerer en map fra → ItunesItemMeta. fn parse_itunes_from_xml(xml: &[u8]) -> HashMap { use quick_xml::events::Event; use quick_xml::reader::Reader; let mut result = HashMap::new(); let mut reader = Reader::from_reader(xml); reader.config_mut().trim_text(true); let mut in_item = false; let mut current_guid = String::new(); let mut current_episode: Option = None; let mut current_season: Option = None; let mut current_image: Option = None; let mut current_tag = String::new(); let mut buf = Vec::new(); loop { match reader.read_event_into(&mut buf) { Ok(Event::Start(e)) | Ok(Event::Empty(e)) => { let local = String::from_utf8_lossy(e.local_name().as_ref()).to_string(); match local.as_str() { "item" | "entry" => { in_item = true; current_guid.clear(); current_episode = None; current_season = None; current_image = None; } "image" if in_item => { // for attr in e.attributes().flatten() { if attr.key.as_ref() == b"href" { current_image = String::from_utf8(attr.value.to_vec()).ok(); } } } _ => {} } current_tag = local; } Ok(Event::Text(e)) if in_item => { let text = e.unescape().unwrap_or_default().to_string(); handle_text(¤t_tag, &text, &mut current_guid, &mut current_episode, &mut current_season); } Ok(Event::CData(e)) if in_item => { // CDATA sections — common for let text = String::from_utf8_lossy(&e).to_string(); handle_text(¤t_tag, &text, &mut current_guid, &mut current_episode, &mut current_season); } Ok(Event::End(e)) => { let local = String::from_utf8_lossy(e.local_name().as_ref()).to_string(); if local == "item" || local == "entry" { if !current_guid.is_empty() { result.insert( current_guid.clone(), ItunesItemMeta { episode: current_episode, season: current_season, image_url: current_image.clone(), }, ); } in_item = false; } } Ok(Event::Eof) => break, Err(_) => break, _ => {} } buf.clear(); } result } fn handle_text( tag: &str, text: &str, guid: &mut String, episode: &mut Option, season: &mut Option, ) { match tag { "guid" => *guid = text.trim().to_string(), "episode" => *episode = text.trim().parse().ok(), "season" => *season = text.trim().parse().ok(), _ => {} } } // ============================================================================= // Main // ============================================================================= #[tokio::main] async fn main() { synops_common::logging::init("synops_import_podcast"); let cli = Cli::parse(); let (feed_url, collection_id, created_by, dry_run) = if let Some(ref json) = cli.payload_json { match serde_json::from_str::(json) { Ok(p) => (p.feed_url, p.collection_id, p.created_by, p.dry_run || cli.dry_run), Err(e) => { eprintln!("Ugyldig --payload-json: {e}"); process::exit(1); } } } else { let feed_url = cli.feed_url.unwrap_or_else(|| { eprintln!("--feed-url er påkrevd"); process::exit(1); }); let collection_id = cli.collection_id.unwrap_or_else(|| { eprintln!("--collection-id er påkrevd"); process::exit(1); }); let created_by = cli.created_by.unwrap_or_else(|| { eprintln!("--created-by er påkrevd"); process::exit(1); }); (feed_url, collection_id, created_by, cli.dry_run) }; match run_import(&feed_url, collection_id, created_by, dry_run).await { Ok(result) => { let json = serde_json::to_string_pretty(&result).unwrap(); println!("{json}"); if result.status == "error" { process::exit(1); } } Err(e) => { eprintln!("Feil: {e}"); process::exit(1); } } } // ============================================================================= // Import-logikk // ============================================================================= async fn run_import( feed_url: &str, collection_id: Uuid, created_by: Uuid, dry_run: bool, ) -> Result { // 1. Hent RSS-feed info!(url = feed_url, dry_run, "Henter podcast-feed"); let client = reqwest::Client::builder() .user_agent("synops-import-podcast/0.1") .timeout(std::time::Duration::from_secs(60)) .build() .map_err(|e| format!("HTTP-klient feilet: {e}"))?; let response = client .get(feed_url) .send() .await .map_err(|e| format!("Kunne ikke hente feed: {e}"))?; if !response.status().is_success() { return Err(format!("HTTP {}: {}", response.status(), feed_url)); } let body = response .bytes() .await .map_err(|e| format!("Kunne ikke lese respons: {e}"))?; // 2. Parse feed med feed-rs let feed = parser::parse(&body[..]).map_err(|e| format!("Kunne ikke parse feed: {e}"))?; let feed_title = feed.title.map(|t| t.content); // 3. Parse iTunes-extensions fra rå XML (episode/season/image) let itunes_meta = parse_itunes_from_xml(&body); info!( feed_title = feed_title.as_deref().unwrap_or("(uten tittel)"), entries = feed.entries.len(), "Feed parset" ); // 4. Koble til database (også for dry-run — trengs for duplikatsjekk) let db = synops_common::db::connect().await?; // 5. Hent eksisterende GUIDs for duplikatdeteksjon let existing_guids: Vec = sqlx::query_scalar( r#" SELECT n.metadata->>'guid' FROM nodes n INNER JOIN edges e ON e.source_id = n.id AND e.target_id = $1 AND e.edge_type = 'belongs_to' WHERE n.node_kind = 'content' AND n.metadata->>'guid' IS NOT NULL "#, ) .bind(collection_id) .fetch_all(&db) .await .map_err(|e| format!("PG-feil ved duplikatsjekk: {e}"))?; debug!(existing = existing_guids.len(), "Eksisterende episoder i samling"); let cas_root = synops_common::cas::root(); // 6. Prosesser episoder let mut episodes_imported = 0usize; let mut episodes_skipped = 0usize; let mut episode_results = Vec::new(); let mut errors = Vec::new(); for entry in &feed.entries { let guid = entry.id.clone(); if guid.is_empty() { debug!("Entry uten guid, hopper over"); episodes_skipped += 1; continue; } let title = entry .title .as_ref() .map(|t| t.content.clone()) .unwrap_or_else(|| format!("Episode ({})", &guid)); // Hent iTunes-metadata fra rå XML-parsing let itunes = itunes_meta.get(&guid); // Varighet fra MediaObject (feed-rs mapper itunes:duration hit) let duration = entry .media .first() .and_then(|mo| mo.duration) .map(|d| format_duration_secs(d.as_secs())); let episode_number = itunes.and_then(|i| i.episode); let season_number = itunes.and_then(|i| i.season); let published_at: Option> = entry.published.or(entry.updated); // Duplikatsjekk if existing_guids.iter().any(|g| g == &guid) { debug!(guid = %guid, title = %title, "Allerede importert, hopper over"); episodes_skipped += 1; episode_results.push(EpisodeResult { guid, title, published_at: published_at.map(|d| d.to_rfc3339()), duration, episode_number, season_number, action: "skipped".to_string(), node_id: None, audio_cas_hash: None, artwork_cas_hash: None, error: None, }); continue; } if dry_run { let enclosure_url = find_audio_enclosure(entry).map(|(u, _, _)| u); info!( guid = %guid, title = %title, enclosure = enclosure_url.as_deref().unwrap_or("(ingen)"), "DRY-RUN: ville importert" ); episodes_imported += 1; episode_results.push(EpisodeResult { guid, title, published_at: published_at.map(|d| d.to_rfc3339()), duration, episode_number, season_number, action: "would_import".to_string(), node_id: None, audio_cas_hash: None, artwork_cas_hash: None, error: None, }); continue; } // Ekte import let artwork_url = itunes .and_then(|i| i.image_url.clone()) .or_else(|| find_thumbnail_url(entry)); match import_episode( &db, &client, &cas_root, entry, &guid, &title, published_at, duration.as_deref(), episode_number, season_number, artwork_url.as_deref(), collection_id, created_by, ) .await { Ok(ep_result) => { info!( node_id = %ep_result.node_id.unwrap_or_default(), guid = %guid, title = %title, "Episode importert" ); episodes_imported += 1; episode_results.push(ep_result); } Err(e) => { warn!(guid = %guid, title = %title, error = %e, "Feil ved import av episode"); errors.push(format!("{title} ({guid}): {e}")); episode_results.push(EpisodeResult { guid, title, published_at: published_at.map(|d| d.to_rfc3339()), duration, episode_number, season_number, action: "error".to_string(), node_id: None, audio_cas_hash: None, artwork_cas_hash: None, error: Some(e), }); } } } let status = if errors.is_empty() { "completed" } else if episodes_imported > 0 { "partial" } else { "error" }; info!( episodes_imported, episodes_skipped, errors = errors.len(), dry_run, "Import ferdig" ); Ok(ImportResult { status: status.to_string(), feed_url: feed_url.to_string(), feed_title, episodes_found: feed.entries.len(), episodes_imported, episodes_skipped, dry_run, episodes: episode_results, errors, }) } // ============================================================================= // Episode-import // ============================================================================= #[allow(clippy::too_many_arguments)] async fn import_episode( db: &sqlx::PgPool, client: &reqwest::Client, cas_root: &str, entry: &feed_rs::model::Entry, guid: &str, title: &str, published_at: Option>, duration: Option<&str>, episode_number: Option, season_number: Option, artwork_url: Option<&str>, collection_id: Uuid, created_by: Uuid, ) -> Result { let node_id = Uuid::now_v7(); // -- Innhold (description/summary) -- let content_text = entry .summary .as_ref() .map(|s| s.content.clone()) .or_else(|| { entry .content .as_ref() .and_then(|c| c.body.as_ref().map(|b| strip_html_tags(b))) }) .unwrap_or_default(); // -- Enclosure (lydfil) -- let enclosure = find_audio_enclosure(entry); let mut audio_cas_hash: Option = None; let mut audio_size: Option = None; let mut audio_mime: Option = None; if let Some((enc_url, enc_mime, enc_size)) = &enclosure { info!(url = %enc_url, mime = %enc_mime, "Laster ned lydfil"); match download_to_cas(client, cas_root, enc_url).await { Ok((hash, size)) => { audio_cas_hash = Some(hash); audio_size = Some(enc_size.unwrap_or(size as i64)); audio_mime = Some(enc_mime.clone()); } Err(e) => { warn!(url = %enc_url, error = %e, "Kunne ikke laste ned lydfil"); return Err(format!("Nedlasting av lydfil feilet: {e}")); } } } // -- Episode-artwork -- let mut artwork_cas_hash: Option = None; if let Some(art_url) = artwork_url { debug!(url = %art_url, "Laster ned episode-artwork"); match download_to_cas(client, cas_root, art_url).await { Ok((hash, _)) => { artwork_cas_hash = Some(hash); } Err(e) => { warn!(url = %art_url, error = %e, "Kunne ikke laste ned artwork (fortsetter uten)"); } } } // -- Metadata -- let metadata = serde_json::json!({ "guid": guid, "published_at": published_at.map(|d| d.to_rfc3339()), "duration": duration, "episode_number": episode_number, "season_number": season_number, "original_enclosure_url": enclosure.as_ref().map(|(u, _, _)| u.as_str()), }); // -- Opprett content-node -- sqlx::query( r#" INSERT INTO nodes (id, node_kind, title, content, visibility, metadata, created_by) VALUES ($1, 'content', $2, $3, 'hidden'::visibility, $4, $5) "#, ) .bind(node_id) .bind(title) .bind(&content_text) .bind(&metadata) .bind(created_by) .execute(db) .await .map_err(|e| format!("INSERT content-node feilet: {e}"))?; // -- belongs_to-edge (episode → samling) med publish_at -- let publish_at_meta = if let Some(dt) = published_at { serde_json::json!({"publish_at": dt.to_rfc3339()}) } else { serde_json::json!({}) }; sqlx::query( r#" INSERT INTO edges (id, source_id, target_id, edge_type, metadata, system, created_by) VALUES ($1, $2, $3, 'belongs_to', $4, false, $5) ON CONFLICT (source_id, target_id, edge_type) DO NOTHING "#, ) .bind(Uuid::now_v7()) .bind(node_id) .bind(collection_id) .bind(&publish_at_meta) .bind(created_by) .execute(db) .await .map_err(|e| format!("INSERT belongs_to-edge feilet: {e}"))?; // -- Media-node + has_media-edge (lydfil) -- if let Some(ref cas_hash) = audio_cas_hash { let media_node_id = Uuid::now_v7(); let media_meta = serde_json::json!({ "cas_hash": cas_hash, "mime": audio_mime.as_deref().unwrap_or("audio/mpeg"), "size_bytes": audio_size, "duration": duration, }); sqlx::query( r#" INSERT INTO nodes (id, node_kind, title, visibility, metadata, created_by) VALUES ($1, 'media', $2, 'hidden'::visibility, $3, $4) "#, ) .bind(media_node_id) .bind(title) .bind(&media_meta) .bind(created_by) .execute(db) .await .map_err(|e| format!("INSERT media-node feilet: {e}"))?; sqlx::query( r#" INSERT INTO edges (id, source_id, target_id, edge_type, metadata, system, created_by) VALUES ($1, $2, $3, 'has_media', '{}', true, $4) ON CONFLICT (source_id, target_id, edge_type) DO NOTHING "#, ) .bind(Uuid::now_v7()) .bind(node_id) .bind(media_node_id) .bind(created_by) .execute(db) .await .map_err(|e| format!("INSERT has_media-edge feilet: {e}"))?; } // -- Artwork media-node + og_image-edge -- if let Some(ref cas_hash) = artwork_cas_hash { let art_node_id = Uuid::now_v7(); let art_meta = serde_json::json!({ "cas_hash": cas_hash, "mime": "image/jpeg", }); sqlx::query( r#" INSERT INTO nodes (id, node_kind, title, visibility, metadata, created_by) VALUES ($1, 'media', $2, 'hidden'::visibility, $3, $4) "#, ) .bind(art_node_id) .bind(format!("{title} (artwork)")) .bind(&art_meta) .bind(created_by) .execute(db) .await .map_err(|e| format!("INSERT artwork-node feilet: {e}"))?; sqlx::query( r#" INSERT INTO edges (id, source_id, target_id, edge_type, metadata, system, created_by) VALUES ($1, $2, $3, 'og_image', '{}', true, $4) ON CONFLICT (source_id, target_id, edge_type) DO NOTHING "#, ) .bind(Uuid::now_v7()) .bind(node_id) .bind(art_node_id) .bind(created_by) .execute(db) .await .map_err(|e| format!("INSERT og_image-edge feilet: {e}"))?; } Ok(EpisodeResult { guid: guid.to_string(), title: title.to_string(), published_at: published_at.map(|d| d.to_rfc3339()), duration: duration.map(|d| d.to_string()), episode_number, season_number, action: "imported".to_string(), node_id: Some(node_id), audio_cas_hash, artwork_cas_hash, error: None, }) } // ============================================================================= // Hjelpefunksjoner // ============================================================================= /// Finn audio-enclosure fra en feed-entry. /// Returnerer (url, mime, size). fn find_audio_enclosure(entry: &feed_rs::model::Entry) -> Option<(String, String, Option)> { // 1. MediaObject.content (feed-rs mapper og itunes hit) for mo in &entry.media { for mc in &mo.content { if let Some(ref url) = mc.url { let mime = mc .content_type .as_ref() .map(|m| m.to_string()) .unwrap_or_default(); if mime.starts_with("audio/") || url.as_str().ends_with(".mp3") || url.as_str().ends_with(".m4a") || url.as_str().ends_with(".ogg") { let size = mc.size.map(|s| s as i64); return Some(( url.to_string(), if mime.is_empty() { "audio/mpeg".to_string() } else { mime }, size, )); } } } } // 2. Fallback: sjekk links med audio media-type for link in &entry.links { if let Some(ref mt) = link.media_type { if mt.starts_with("audio/") { let size = link.length.map(|l| l as i64); return Some((link.href.clone(), mt.clone(), size)); } } } None } /// Finn thumbnail/artwork URL fra MediaObject. fn find_thumbnail_url(entry: &feed_rs::model::Entry) -> Option { for mo in &entry.media { for thumb in &mo.thumbnails { return Some(thumb.image.uri.clone()); } } None } /// Last ned en URL til CAS. Returnerer (hash, size). async fn download_to_cas( client: &reqwest::Client, cas_root: &str, url: &str, ) -> Result<(String, usize), String> { let response = client .get(url) .send() .await .map_err(|e| format!("HTTP-feil: {e}"))?; if !response.status().is_success() { return Err(format!("HTTP {}: {url}", response.status())); } let data = response .bytes() .await .map_err(|e| format!("Kunne ikke lese respons: {e}"))?; let size = data.len(); let hash = synops_common::cas::store(cas_root, &data).await?; debug!(hash = %hash, size, url, "Lagret i CAS"); Ok((hash, size)) } /// Formater sekunder til HH:MM:SS. fn format_duration_secs(secs: u64) -> String { let h = secs / 3600; let m = (secs % 3600) / 60; let s = secs % 60; if h > 0 { format!("{h:02}:{m:02}:{s:02}") } else { format!("{m:02}:{s:02}") } } /// Enkel HTML-tag-stripping. fn strip_html_tags(html: &str) -> String { let mut result = String::with_capacity(html.len()); let mut in_tag = false; for ch in html.chars() { match ch { '<' => in_tag = true, '>' => in_tag = false, _ if !in_tag => result.push(ch), _ => {} } } result .split_whitespace() .collect::>() .join(" ") .trim() .to_string() }