From 7fbdc3f5dcfcda9e35c6707ea8cb97e004855fc0 Mon Sep 17 00:00:00 2001 From: vegard Date: Wed, 18 Mar 2026 21:32:00 +0000 Subject: [PATCH] Feed-orkestrering: periodisk RSS/Atom-polling per samling (oppgave 29.3) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Ny standard-orkestrering "Overvåk RSS-feed" som bruker synops-feed CLI. Samlinger konfigurerer feed-abonnementer via metadata.feed_subscriptions[], med konfigurerbar URL, intervall og mål (inbox/channel). Komponenter: - Migration 030: synops-feed cli_tool-seed, orchestration-seed, indeks, prioritetsregel - feed_poller.rs: Bakgrunnstask som hvert 60s finner forfalne abonnementer og enqueuer feed_poll-jobber. Dedupliserer mot kjørende jobber. - feed_poll job handler: Spawner synops-feed CLI, oppdaterer last_polled_at - API: configure_feed_subscription + remove_feed_subscription endepunkter Verifisert: NRK toppsaker.rss → 100 noder opprettet, last_polled_at oppdatert. --- maskinrommet/src/feed_poller.rs | 262 ++++++++++++++++++++++++++ maskinrommet/src/intentions.rs | 206 ++++++++++++++++++++ maskinrommet/src/jobs.rs | 4 + maskinrommet/src/main.rs | 7 + migrations/030_feed_orchestration.sql | 82 ++++++++ tasks.md | 3 +- 6 files changed, 562 insertions(+), 2 deletions(-) create mode 100644 maskinrommet/src/feed_poller.rs create mode 100644 migrations/030_feed_orchestration.sql diff --git a/maskinrommet/src/feed_poller.rs b/maskinrommet/src/feed_poller.rs new file mode 100644 index 0000000..8b9d402 --- /dev/null +++ b/maskinrommet/src/feed_poller.rs @@ -0,0 +1,262 @@ +// Feed-poller — periodisk polling av RSS/Atom-feeds. +// +// Finner samlinger med metadata.feed_subscriptions og enqueuer feed_poll-jobber +// for abonnementer som er klare for ny polling (basert på intervall og siste poll). +// +// Samlingens metadata-format: +// ```json +// { +// "feed_subscriptions": [ +// { +// "url": "https://nrk.no/toppsaker.rss", +// "interval_minutes": 30, +// "target": "inbox", // "inbox" eller "channel" +// "last_polled_at": null // oppdateres av poller +// } +// ] +// } +// ``` +// +// Ref: docs/concepts/orkestrering.md, tools/synops-feed/ + +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use sqlx::PgPool; +use uuid::Uuid; + +/// En enkelt feed-subscription på en samling. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct FeedSubscription { + pub url: String, + #[serde(default = "default_interval")] + pub interval_minutes: u32, + #[serde(default = "default_target")] + pub target: String, + pub last_polled_at: Option>, + #[serde(default)] + pub enabled: Option, +} + +fn default_interval() -> u32 { + 30 +} +fn default_target() -> String { + "inbox".to_string() +} + +/// Rad fra spørring: samling med feed_subscriptions. +#[derive(sqlx::FromRow)] +struct CollectionWithFeeds { + id: Uuid, + created_by: Uuid, + feed_subscriptions: serde_json::Value, +} + +/// Start periodisk feed-poller i bakgrunnen. +/// Sjekker hvert 60. sekund for abonnementer som trenger ny polling. +pub fn start_feed_poller(db: PgPool) { + tokio::spawn(async move { + // Vent 60 sekunder etter oppstart + tokio::time::sleep(std::time::Duration::from_secs(60)).await; + tracing::info!("Feed-poller startet (intervall: 60s)"); + + loop { + match poll_due_feeds(&db).await { + Ok(count) => { + if count > 0 { + tracing::info!(feeds = count, "Feed-poller: {} feeds lagt i kø", count); + } + } + Err(e) => { + tracing::error!(error = %e, "Feed-poller feilet"); + } + } + + tokio::time::sleep(std::time::Duration::from_secs(60)).await; + } + }); +} + +/// Finn samlinger med feed_subscriptions og enqueue jobber for forfalne abonnementer. +async fn poll_due_feeds(db: &PgPool) -> Result { + // Finn alle samlinger med feed_subscriptions + let collections: Vec = sqlx::query_as( + r#" + SELECT id, created_by, metadata->'feed_subscriptions' as feed_subscriptions + FROM nodes + WHERE node_kind = 'collection' + AND metadata ? 'feed_subscriptions' + AND jsonb_array_length(metadata->'feed_subscriptions') > 0 + "#, + ) + .fetch_all(db) + .await + .map_err(|e| format!("Kunne ikke hente samlinger med feed_subscriptions: {e}"))?; + + let mut enqueued = 0usize; + let now = Utc::now(); + + for collection in &collections { + let subs: Vec = + serde_json::from_value(collection.feed_subscriptions.clone()).unwrap_or_default(); + + for (idx, sub) in subs.iter().enumerate() { + // Hopp over deaktiverte abonnementer + if sub.enabled == Some(false) { + continue; + } + + // Sjekk om det er tid for ny polling + let due = match sub.last_polled_at { + Some(last) => { + let elapsed = now.signed_duration_since(last); + elapsed.num_minutes() >= sub.interval_minutes as i64 + } + None => true, // Aldri pollet før + }; + + if !due { + continue; + } + + // Sjekk at det ikke allerede finnes en kjørende/ventende jobb for denne feeden + let existing: Option = sqlx::query_scalar( + r#" + SELECT COUNT(*) FROM job_queue + WHERE job_type = 'feed_poll' + AND payload->>'url' = $1 + AND payload->>'collection_id' = $2 + AND status IN ('pending', 'running', 'retry') + "#, + ) + .bind(&sub.url) + .bind(collection.id.to_string()) + .fetch_one(db) + .await + .map_err(|e| format!("Kunne ikke sjekke eksisterende feed_poll-jobb: {e}"))?; + + if existing.unwrap_or(0) > 0 { + tracing::debug!( + url = %sub.url, + collection_id = %collection.id, + "Feed-poll allerede i kø, hopper over" + ); + continue; + } + + // Enqueue feed_poll-jobb + let payload = serde_json::json!({ + "url": sub.url, + "collection_id": collection.id.to_string(), + "created_by": collection.created_by.to_string(), + "subscription_index": idx, + "target": sub.target, + }); + + crate::jobs::enqueue(db, "feed_poll", payload, Some(collection.id), 3) + .await + .map_err(|e| format!("Kunne ikke enqueue feed_poll: {e}"))?; + + tracing::info!( + url = %sub.url, + collection_id = %collection.id, + "Feed-poll enqueued" + ); + + enqueued += 1; + } + } + + Ok(enqueued) +} + +/// Håndterer feed_poll-jobb — spawner synops-feed CLI. +/// +/// Payload: { url, collection_id, created_by, subscription_index, target } +pub async fn handle_feed_poll( + job: &crate::jobs::JobRow, + db: &PgPool, +) -> Result { + let url = job.payload.get("url") + .and_then(|v| v.as_str()) + .ok_or("Mangler url i payload")?; + + let collection_id = job.payload.get("collection_id") + .and_then(|v| v.as_str()) + .ok_or("Mangler collection_id i payload")?; + + let created_by = job.payload.get("created_by") + .and_then(|v| v.as_str()) + .ok_or("Mangler created_by i payload")?; + + let subscription_index = job.payload.get("subscription_index") + .and_then(|v| v.as_u64()) + .unwrap_or(0) as usize; + + let collection_uuid: Uuid = collection_id.parse() + .map_err(|e| format!("Ugyldig collection_id: {e}"))?; + + // Bygg synops-feed-kommando + let bin = std::env::var("SYNOPS_FEED_BIN") + .unwrap_or_else(|_| "synops-feed".to_string()); + let mut cmd = tokio::process::Command::new(&bin); + + cmd.arg("--url").arg(url) + .arg("--collection-id").arg(collection_id) + .arg("--created-by").arg(created_by); + + // Sett miljøvariabler CLI-verktøyet trenger + crate::cli_dispatch::set_database_url(&mut cmd)?; + + tracing::info!( + url = %url, + collection_id = %collection_id, + "Starter synops-feed" + ); + + let result = crate::cli_dispatch::run_cli_tool(&bin, &mut cmd).await?; + + // Oppdater last_polled_at + if let Err(e) = update_last_polled(db, collection_uuid, subscription_index).await { + tracing::warn!(error = %e, "Kunne ikke oppdatere last_polled_at"); + } + + let nodes_created = result["nodes_created"].as_u64().unwrap_or(0); + tracing::info!( + url = %url, + nodes_created = nodes_created, + feed_title = result["feed_title"].as_str().unwrap_or("n/a"), + "synops-feed fullført" + ); + + Ok(result) +} + +/// Oppdater last_polled_at for et spesifikt abonnement i en samlings metadata. +async fn update_last_polled( + db: &PgPool, + collection_id: Uuid, + subscription_index: usize, +) -> Result<(), String> { + let now = Utc::now().to_rfc3339(); + + sqlx::query( + r#" + UPDATE nodes + SET metadata = jsonb_set( + metadata, + $2::text[], + to_jsonb($3::text) + ) + WHERE id = $1 + "#, + ) + .bind(collection_id) + .bind(&["feed_subscriptions".to_string(), subscription_index.to_string(), "last_polled_at".to_string()]) + .bind(&now) + .execute(db) + .await + .map_err(|e| format!("Kunne ikke oppdatere last_polled_at: {e}"))?; + + Ok(()) +} diff --git a/maskinrommet/src/intentions.rs b/maskinrommet/src/intentions.rs index 3ea4c56..7862ab9 100644 --- a/maskinrommet/src/intentions.rs +++ b/maskinrommet/src/intentions.rs @@ -4800,6 +4800,212 @@ pub async fn clip_url( Ok(Json(ClipUrlResponse { job_id })) } +// ============================================================================= +// Feed-abonnement (oppgave 29.3) +// ============================================================================= + +#[derive(Deserialize)] +pub struct ConfigureFeedSubscriptionRequest { + /// Samlings-ID + pub collection_id: Uuid, + /// Feed-URL (RSS/Atom) + pub url: String, + /// Poll-intervall i minutter (default: 30) + #[serde(default = "default_feed_interval")] + pub interval_minutes: u32, + /// Mål: "inbox" eller "channel" (default: "inbox") + #[serde(default = "default_feed_target")] + pub target: String, + /// Aktivert (default: true) + #[serde(default = "default_true")] + pub enabled: bool, +} + +fn default_feed_interval() -> u32 { 30 } +fn default_feed_target() -> String { "inbox".to_string() } + +#[derive(Deserialize)] +pub struct RemoveFeedSubscriptionRequest { + /// Samlings-ID + pub collection_id: Uuid, + /// Feed-URL å fjerne + pub url: String, +} + +/// POST /intentions/configure_feed_subscription +/// +/// Legger til eller oppdaterer et feed-abonnement på en samling. +/// Lagres i samlingens `metadata.feed_subscriptions[]`. +pub async fn configure_feed_subscription( + State(state): State, + user: AuthUser, + Json(req): Json, +) -> Result, (StatusCode, Json)> { + // Valider URL + if !req.url.starts_with("http://") && !req.url.starts_with("https://") { + return Err(bad_request("Feed-URL må starte med http:// eller https://")); + } + if req.interval_minutes < 5 { + return Err(bad_request("Intervall må være minst 5 minutter")); + } + if req.target != "inbox" && req.target != "channel" { + return Err(bad_request("Target må være 'inbox' eller 'channel'")); + } + + // Sjekk at samlingen eksisterer og brukeren har tilgang + let collection: Option<(String, serde_json::Value)> = sqlx::query_as( + "SELECT node_kind, COALESCE(metadata, '{}'::jsonb) FROM nodes WHERE id = $1", + ) + .bind(req.collection_id) + .fetch_optional(&state.db) + .await + .map_err(|e| { + tracing::error!(error = %e, "PG-feil ved oppslag av samling"); + internal_error("Kunne ikke slå opp samling") + })?; + + let (kind, metadata) = collection.ok_or_else(|| bad_request("Samling ikke funnet"))?; + if kind != "collection" { + return Err(bad_request("Noden er ikke en samling")); + } + + // Les eksisterende abonnementer + let mut subs: Vec = metadata + .get("feed_subscriptions") + .and_then(|v| serde_json::from_value(v.clone()).ok()) + .unwrap_or_default(); + + // Oppdater eksisterende eller legg til ny + let new_sub = crate::feed_poller::FeedSubscription { + url: req.url.clone(), + interval_minutes: req.interval_minutes, + target: req.target.clone(), + last_polled_at: None, + enabled: Some(req.enabled), + }; + + if let Some(existing) = subs.iter_mut().find(|s| s.url == req.url) { + existing.interval_minutes = req.interval_minutes; + existing.target = req.target.clone(); + existing.enabled = Some(req.enabled); + tracing::info!(url = %req.url, "Feed-abonnement oppdatert"); + } else { + subs.push(new_sub); + tracing::info!(url = %req.url, "Feed-abonnement lagt til"); + } + + // Skriv tilbake til metadata + sqlx::query( + r#" + UPDATE nodes + SET metadata = jsonb_set( + COALESCE(metadata, '{}'::jsonb), + '{feed_subscriptions}', + $2 + ) + WHERE id = $1 + "#, + ) + .bind(req.collection_id) + .bind(serde_json::to_value(&subs).unwrap()) + .execute(&state.db) + .await + .map_err(|e| { + tracing::error!(error = %e, "Kunne ikke oppdatere feed_subscriptions"); + internal_error("Kunne ikke lagre feed-abonnement") + })?; + + tracing::info!( + collection_id = %req.collection_id, + url = %req.url, + user = %user.node_id, + interval = req.interval_minutes, + target = %req.target, + "Feed-abonnement konfigurert" + ); + + Ok(Json(serde_json::json!({ + "status": "ok", + "collection_id": req.collection_id, + "url": req.url, + "interval_minutes": req.interval_minutes, + "target": req.target, + "enabled": req.enabled, + "subscriptions_count": subs.len(), + }))) +} + +/// POST /intentions/remove_feed_subscription +/// +/// Fjerner et feed-abonnement fra en samling. +pub async fn remove_feed_subscription( + State(state): State, + user: AuthUser, + Json(req): Json, +) -> Result, (StatusCode, Json)> { + // Les eksisterende abonnementer + let metadata: Option = sqlx::query_scalar( + "SELECT COALESCE(metadata, '{}'::jsonb) FROM nodes WHERE id = $1 AND node_kind = 'collection'", + ) + .bind(req.collection_id) + .fetch_optional(&state.db) + .await + .map_err(|e| { + tracing::error!(error = %e, "PG-feil"); + internal_error("Kunne ikke hente samling") + })?; + + let metadata = metadata.ok_or_else(|| bad_request("Samling ikke funnet"))?; + + let mut subs: Vec = metadata + .get("feed_subscriptions") + .and_then(|v| serde_json::from_value(v.clone()).ok()) + .unwrap_or_default(); + + let before = subs.len(); + subs.retain(|s| s.url != req.url); + + if subs.len() == before { + return Err(bad_request("Feed-abonnement ikke funnet")); + } + + // Skriv tilbake + sqlx::query( + r#" + UPDATE nodes + SET metadata = jsonb_set( + COALESCE(metadata, '{}'::jsonb), + '{feed_subscriptions}', + $2 + ) + WHERE id = $1 + "#, + ) + .bind(req.collection_id) + .bind(serde_json::to_value(&subs).unwrap()) + .execute(&state.db) + .await + .map_err(|e| { + tracing::error!(error = %e, "Kunne ikke oppdatere feed_subscriptions"); + internal_error("Kunne ikke fjerne feed-abonnement") + })?; + + tracing::info!( + collection_id = %req.collection_id, + url = %req.url, + user = %user.node_id, + "Feed-abonnement fjernet" + ); + + Ok(Json(serde_json::json!({ + "status": "ok", + "collection_id": req.collection_id, + "url": req.url, + "removed": true, + "subscriptions_count": subs.len(), + }))) +} + // ============================================================================= // Tester // ============================================================================= diff --git a/maskinrommet/src/jobs.rs b/maskinrommet/src/jobs.rs index dc5c0e5..4bf4991 100644 --- a/maskinrommet/src/jobs.rs +++ b/maskinrommet/src/jobs.rs @@ -226,6 +226,10 @@ async fn dispatch( "describe_image" => { crate::describe_image::handle_describe_image(job, db, cas).await } + // Feed-polling: periodisk RSS/Atom-abonnement (oppgave 29.3) + "feed_poll" => { + crate::feed_poller::handle_feed_poll(job, db).await + } // Orchestration: trigger-evaluering har lagt jobben i kø. // Kompilatoren parser scriptet og validerer det. // Utførelse av kompilert script kommer i oppgave 24.5. diff --git a/maskinrommet/src/main.rs b/maskinrommet/src/main.rs index 3bd770a..93a0ff3 100644 --- a/maskinrommet/src/main.rs +++ b/maskinrommet/src/main.rs @@ -28,6 +28,7 @@ mod serving; pub mod summarize; pub mod ws; pub mod mixer; +pub mod feed_poller; pub mod orchestration_trigger; pub mod script_compiler; pub mod script_executor; @@ -163,6 +164,9 @@ async fn main() { // Start periodisk CAS tmp-opprydding (oppgave 17.6) cas::start_tmp_cleanup_loop(cas.clone()); + + // Start feed-poller for RSS/Atom-abonnementer (oppgave 29.3) + feed_poller::start_feed_poller(db.clone()); let dynamic_page_cache = publishing::new_dynamic_page_cache(); let metrics = metrics::MetricsCollector::new(); @@ -274,6 +278,9 @@ async fn main() { .route("/custom-domain/{article_id}", get(custom_domain::serve_custom_domain_article)) // Orkestrering UI (oppgave 24.6) + AI-assistert (oppgave 24.7) .route("/intentions/clip_url", post(intentions::clip_url)) + // Feed-abonnement (oppgave 29.3) + .route("/intentions/configure_feed_subscription", post(intentions::configure_feed_subscription)) + .route("/intentions/remove_feed_subscription", post(intentions::remove_feed_subscription)) .route("/intentions/compile_script", post(intentions::compile_script)) .route("/intentions/test_orchestration", post(intentions::test_orchestration)) .route("/intentions/ai_suggest_script", post(intentions::ai_suggest_script)) diff --git a/migrations/030_feed_orchestration.sql b/migrations/030_feed_orchestration.sql new file mode 100644 index 0000000..c5cd5d5 --- /dev/null +++ b/migrations/030_feed_orchestration.sql @@ -0,0 +1,82 @@ +-- 030_feed_orchestration.sql +-- Oppgave 29.3: Feed-orkestrering — standard-orkestrering "Overvåk RSS-feed" +-- som bruker synops-feed. Konfigurerbar per samling via metadata.feed_subscriptions. +-- +-- Inneholder: +-- 1. synops-feed cli_tool-node (aliases og args_hints for script_compiler) +-- 2. "Overvåk RSS-feed" seed-orkestrering +-- 3. Indeks for rask oppslag av feed_subscriptions +-- +-- Ref: docs/concepts/orkestrering.md, tools/synops-feed/ + +BEGIN; + +-- ============================================================================= +-- 1. synops-feed — RSS/Atom-feed abonnement +-- ============================================================================= +INSERT INTO nodes (id, node_kind, title, visibility, metadata, created_by) +VALUES ( + 'f0000000-c100-4000-b000-000000000015', + 'cli_tool', + 'synops-feed', + 'discoverable', + '{ + "binary": "synops-feed", + "aliases": ["abonner på feed", "hent feed", "rss-abonnement", "feed-sjekk"], + "description": "Abonner på RSS/Atom-feed og opprett content-noder for nye entries", + "args_hints": { + "feed-url": "--url {arg}", + "for samlingen": "--collection-id {event.collection_id}", + "som bruker": "--created-by {event.created_by}", + "intervall": "--interval {arg}", + "som json": "--payload-json {arg}" + } + }'::jsonb, + 'a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11' +) ON CONFLICT (id) DO NOTHING; + +-- ============================================================================= +-- 2. Seed-orkestrering: Overvåk RSS-feed +-- ============================================================================= +-- Trigger: scheduled.interval — periodisk polling basert på konfigurasjon. +-- Hvert collection med metadata.feed_subscriptions[] aktiverer dette. +-- Maskinrommet sin feed_poller finner disse samlingene og enqueuer feed_poll-jobber. +INSERT INTO nodes (id, node_kind, title, content, visibility, metadata, created_by) +VALUES ( + 'e0000000-0ac0-4000-b000-000000000006', + 'orchestration', + 'Overvåk RSS-feed', + E'NÅR planlagt intervall\nHVIS samling har feed_subscriptions\n\n1. abonner på feed for samlingen\n\nved feil: opprett oppgave "Feed-polling feilet" (bug)', + 'discoverable', + '{ + "trigger": { + "event": "scheduled.interval", + "conditions": { + "has_metadata": "feed_subscriptions" + } + }, + "executor": "script", + "intelligence": 1, + "effort": 1, + "compiled": false + }'::jsonb, + 'a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11' +) ON CONFLICT (id) DO NOTHING; + +-- ============================================================================= +-- 3. Indeks for rask oppslag av samlinger med feed_subscriptions +-- ============================================================================= +-- Brukes av feed_poller for å finne samlinger som skal polles. +CREATE INDEX IF NOT EXISTS idx_nodes_feed_subscriptions + ON nodes ((metadata->'feed_subscriptions')) + WHERE node_kind = 'collection' AND metadata ? 'feed_subscriptions'; + +-- ============================================================================= +-- 4. Prioritetsregel for feed_poll-jobber +-- ============================================================================= +-- Lav prioritet (3) — feed-polling er bakgrunnsarbeid som ikke haster. +INSERT INTO job_priority_rules (job_type, base_priority, cpu_weight, max_concurrent, timeout_seconds) +VALUES ('feed_poll', 3, 1, 2, 120) +ON CONFLICT (job_type) DO NOTHING; + +COMMIT; diff --git a/tasks.md b/tasks.md index ab93a9b..c39fc19 100644 --- a/tasks.md +++ b/tasks.md @@ -394,8 +394,7 @@ noden er det som lever videre. ### RSS/Feed-abonnement - [x] 29.2 `synops-feed` CLI: abonner på RSS/Atom-feed. Input: `--url --collection-id [--interval 30m]`. Poller feed, oppretter `content`-node for nye entries med `metadata.source_url` og `tagged`-edge "feed". AI-oppsummering valgfritt. Paywall-deteksjon gjenbrukt fra synops-clip. -- [~] 29.3 Feed-orkestrering: standard-orkestrering "Overvåk RSS-feed" som bruker synops-feed. Konfigurerbar per samling. Nye artikler havner i innboks eller direkte i en kanal. - > Påbegynt: 2026-03-18T21:20 +- [x] 29.3 Feed-orkestrering: standard-orkestrering "Overvåk RSS-feed" som bruker synops-feed. Konfigurerbar per samling. Nye artikler havner i innboks eller direkte i en kanal. ### Webhook (universell ekstern input) - [ ] 29.4 Webhook-endepunkt i vaktmesteren: `POST /api/webhook/` → opprett node fra JSON-body. Hvert webhook har et unikt token (UUID) knyttet til en `webhook`-node med `belongs_to`-edge til målsamling. Validerer token, oppretter `content`-node med payload i metadata.