// Calendar-poller — periodisk polling av CalDAV/ICS-kalendere. // // Finner samlinger med metadata.calendar_subscriptions og enqueuer // calendar_poll-jobber for abonnementer som er klare for ny polling // (basert på intervall og siste poll). // // Samlingens metadata-format: // ```json // { // "calendar_subscriptions": [ // { // "url": "https://calendar.google.com/calendar/ical/.../basic.ics", // "interval_minutes": 60, // "last_polled_at": null, // "enabled": true // } // ] // } // ``` // // Ref: docs/features/kalender.md, tools/synops-calendar/ use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use sqlx::PgPool; use uuid::Uuid; /// En enkelt kalender-subscription på en samling. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct CalendarSubscription { pub url: String, #[serde(default = "default_interval")] pub interval_minutes: u32, pub last_polled_at: Option>, #[serde(default)] pub enabled: Option, } fn default_interval() -> u32 { 60 } /// Rad fra spørring: samling med calendar_subscriptions. #[derive(sqlx::FromRow)] struct CollectionWithCalendars { id: Uuid, created_by: Uuid, calendar_subscriptions: serde_json::Value, } /// Start periodisk kalender-poller i bakgrunnen. /// Sjekker hvert 60. sekund for abonnementer som trenger ny polling. pub fn start_calendar_poller(db: PgPool) { tokio::spawn(async move { // Vent 90 sekunder etter oppstart (forskjøvet fra feed-poller) tokio::time::sleep(std::time::Duration::from_secs(90)).await; tracing::info!("Calendar-poller startet (intervall: 60s)"); loop { match poll_due_calendars(&db).await { Ok(count) => { if count > 0 { tracing::info!(calendars = count, "Calendar-poller: {} kalendere lagt i kø", count); } } Err(e) => { tracing::error!(error = %e, "Calendar-poller feilet"); } } tokio::time::sleep(std::time::Duration::from_secs(60)).await; } }); } /// Finn samlinger med calendar_subscriptions og enqueue jobber for forfalne abonnementer. async fn poll_due_calendars(db: &PgPool) -> Result { let collections: Vec = sqlx::query_as( r#" SELECT id, created_by, metadata->'calendar_subscriptions' as calendar_subscriptions FROM nodes WHERE node_kind = 'collection' AND metadata ? 'calendar_subscriptions' AND jsonb_array_length(metadata->'calendar_subscriptions') > 0 "#, ) .fetch_all(db) .await .map_err(|e| format!("Kunne ikke hente samlinger med calendar_subscriptions: {e}"))?; let mut enqueued = 0usize; let now = Utc::now(); for collection in &collections { let subs: Vec = serde_json::from_value(collection.calendar_subscriptions.clone()).unwrap_or_default(); for (idx, sub) in subs.iter().enumerate() { if sub.enabled == Some(false) { continue; } let due = match sub.last_polled_at { Some(last) => { let elapsed = now.signed_duration_since(last); elapsed.num_minutes() >= sub.interval_minutes as i64 } None => true, }; if !due { continue; } // Sjekk at det ikke allerede finnes en kjørende/ventende jobb let existing: Option = sqlx::query_scalar( r#" SELECT COUNT(*) FROM job_queue WHERE job_type = 'calendar_poll' AND payload->>'url' = $1 AND payload->>'collection_id' = $2 AND status IN ('pending', 'running', 'retry') "#, ) .bind(&sub.url) .bind(collection.id.to_string()) .fetch_one(db) .await .map_err(|e| format!("Kunne ikke sjekke eksisterende calendar_poll-jobb: {e}"))?; if existing.unwrap_or(0) > 0 { tracing::debug!( url = %sub.url, collection_id = %collection.id, "Calendar-poll allerede i kø, hopper over" ); continue; } let payload = serde_json::json!({ "url": sub.url, "collection_id": collection.id.to_string(), "created_by": collection.created_by.to_string(), "subscription_index": idx, }); crate::jobs::enqueue(db, "calendar_poll", payload, Some(collection.id), 3) .await .map_err(|e| format!("Kunne ikke enqueue calendar_poll: {e}"))?; tracing::info!( url = %sub.url, collection_id = %collection.id, "Calendar-poll enqueued" ); enqueued += 1; } } Ok(enqueued) } /// Håndterer calendar_poll-jobb — spawner synops-calendar CLI med --url. /// /// Payload: { url, collection_id, created_by, subscription_index } pub async fn handle_calendar_poll( job: &crate::jobs::JobRow, db: &PgPool, ) -> Result { let url = job.payload.get("url") .and_then(|v| v.as_str()) .ok_or("Mangler url i payload")?; let collection_id = job.payload.get("collection_id") .and_then(|v| v.as_str()) .ok_or("Mangler collection_id i payload")?; let subscription_index = job.payload.get("subscription_index") .and_then(|v| v.as_u64()) .unwrap_or(0) as usize; let collection_uuid: Uuid = collection_id.parse() .map_err(|e| format!("Ugyldig collection_id: {e}"))?; let bin = std::env::var("SYNOPS_CALENDAR_BIN") .unwrap_or_else(|_| "synops-calendar".to_string()); let mut cmd = tokio::process::Command::new(&bin); cmd.arg("--url").arg(url) .arg("--collection-id").arg(collection_id); crate::cli_dispatch::set_database_url(&mut cmd)?; tracing::info!( url = %url, collection_id = %collection_id, "Starter synops-calendar (URL-modus)" ); let result = crate::cli_dispatch::run_cli_tool(&bin, &mut cmd).await?; // Oppdater last_polled_at if let Err(e) = update_last_polled(db, collection_uuid, subscription_index).await { tracing::warn!(error = %e, "Kunne ikke oppdatere last_polled_at for kalender"); } let created = result["created"].as_u64().unwrap_or(0); let updated = result["updated"].as_u64().unwrap_or(0); tracing::info!( url = %url, created = created, updated = updated, "synops-calendar fullført" ); Ok(result) } /// Oppdater last_polled_at for et spesifikt kalender-abonnement. async fn update_last_polled( db: &PgPool, collection_id: Uuid, subscription_index: usize, ) -> Result<(), String> { let now = Utc::now().to_rfc3339(); sqlx::query( r#" UPDATE nodes SET metadata = jsonb_set( metadata, $2::text[], to_jsonb($3::text) ) WHERE id = $1 "#, ) .bind(collection_id) .bind(&["calendar_subscriptions".to_string(), subscription_index.to_string(), "last_polled_at".to_string()]) .bind(&now) .execute(db) .await .map_err(|e| format!("Kunne ikke oppdatere last_polled_at: {e}"))?; Ok(()) }