synops/maskinrommet/src/feed_poller.rs
vegard 7fbdc3f5dc Feed-orkestrering: periodisk RSS/Atom-polling per samling (oppgave 29.3)
Ny standard-orkestrering "Overvåk RSS-feed" som bruker synops-feed CLI.
Samlinger konfigurerer feed-abonnementer via metadata.feed_subscriptions[],
med konfigurerbar URL, intervall og mål (inbox/channel).

Komponenter:
- Migration 030: synops-feed cli_tool-seed, orchestration-seed, indeks, prioritetsregel
- feed_poller.rs: Bakgrunnstask som hvert 60s finner forfalne abonnementer
  og enqueuer feed_poll-jobber. Dedupliserer mot kjørende jobber.
- feed_poll job handler: Spawner synops-feed CLI, oppdaterer last_polled_at
- API: configure_feed_subscription + remove_feed_subscription endepunkter

Verifisert: NRK toppsaker.rss → 100 noder opprettet, last_polled_at oppdatert.
2026-03-18 21:32:00 +00:00

262 lines
8 KiB
Rust

// Feed-poller — periodisk polling av RSS/Atom-feeds.
//
// Finner samlinger med metadata.feed_subscriptions og enqueuer feed_poll-jobber
// for abonnementer som er klare for ny polling (basert på intervall og siste poll).
//
// Samlingens metadata-format:
// ```json
// {
// "feed_subscriptions": [
// {
// "url": "https://nrk.no/toppsaker.rss",
// "interval_minutes": 30,
// "target": "inbox", // "inbox" eller "channel"
// "last_polled_at": null // oppdateres av poller
// }
// ]
// }
// ```
//
// Ref: docs/concepts/orkestrering.md, tools/synops-feed/
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use sqlx::PgPool;
use uuid::Uuid;
/// En enkelt feed-subscription på en samling.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FeedSubscription {
pub url: String,
#[serde(default = "default_interval")]
pub interval_minutes: u32,
#[serde(default = "default_target")]
pub target: String,
pub last_polled_at: Option<DateTime<Utc>>,
#[serde(default)]
pub enabled: Option<bool>,
}
fn default_interval() -> u32 {
30
}
fn default_target() -> String {
"inbox".to_string()
}
/// Rad fra spørring: samling med feed_subscriptions.
#[derive(sqlx::FromRow)]
struct CollectionWithFeeds {
id: Uuid,
created_by: Uuid,
feed_subscriptions: serde_json::Value,
}
/// Start periodisk feed-poller i bakgrunnen.
/// Sjekker hvert 60. sekund for abonnementer som trenger ny polling.
pub fn start_feed_poller(db: PgPool) {
tokio::spawn(async move {
// Vent 60 sekunder etter oppstart
tokio::time::sleep(std::time::Duration::from_secs(60)).await;
tracing::info!("Feed-poller startet (intervall: 60s)");
loop {
match poll_due_feeds(&db).await {
Ok(count) => {
if count > 0 {
tracing::info!(feeds = count, "Feed-poller: {} feeds lagt i kø", count);
}
}
Err(e) => {
tracing::error!(error = %e, "Feed-poller feilet");
}
}
tokio::time::sleep(std::time::Duration::from_secs(60)).await;
}
});
}
/// Finn samlinger med feed_subscriptions og enqueue jobber for forfalne abonnementer.
async fn poll_due_feeds(db: &PgPool) -> Result<usize, String> {
// Finn alle samlinger med feed_subscriptions
let collections: Vec<CollectionWithFeeds> = sqlx::query_as(
r#"
SELECT id, created_by, metadata->'feed_subscriptions' as feed_subscriptions
FROM nodes
WHERE node_kind = 'collection'
AND metadata ? 'feed_subscriptions'
AND jsonb_array_length(metadata->'feed_subscriptions') > 0
"#,
)
.fetch_all(db)
.await
.map_err(|e| format!("Kunne ikke hente samlinger med feed_subscriptions: {e}"))?;
let mut enqueued = 0usize;
let now = Utc::now();
for collection in &collections {
let subs: Vec<FeedSubscription> =
serde_json::from_value(collection.feed_subscriptions.clone()).unwrap_or_default();
for (idx, sub) in subs.iter().enumerate() {
// Hopp over deaktiverte abonnementer
if sub.enabled == Some(false) {
continue;
}
// Sjekk om det er tid for ny polling
let due = match sub.last_polled_at {
Some(last) => {
let elapsed = now.signed_duration_since(last);
elapsed.num_minutes() >= sub.interval_minutes as i64
}
None => true, // Aldri pollet før
};
if !due {
continue;
}
// Sjekk at det ikke allerede finnes en kjørende/ventende jobb for denne feeden
let existing: Option<i64> = sqlx::query_scalar(
r#"
SELECT COUNT(*) FROM job_queue
WHERE job_type = 'feed_poll'
AND payload->>'url' = $1
AND payload->>'collection_id' = $2
AND status IN ('pending', 'running', 'retry')
"#,
)
.bind(&sub.url)
.bind(collection.id.to_string())
.fetch_one(db)
.await
.map_err(|e| format!("Kunne ikke sjekke eksisterende feed_poll-jobb: {e}"))?;
if existing.unwrap_or(0) > 0 {
tracing::debug!(
url = %sub.url,
collection_id = %collection.id,
"Feed-poll allerede i kø, hopper over"
);
continue;
}
// Enqueue feed_poll-jobb
let payload = serde_json::json!({
"url": sub.url,
"collection_id": collection.id.to_string(),
"created_by": collection.created_by.to_string(),
"subscription_index": idx,
"target": sub.target,
});
crate::jobs::enqueue(db, "feed_poll", payload, Some(collection.id), 3)
.await
.map_err(|e| format!("Kunne ikke enqueue feed_poll: {e}"))?;
tracing::info!(
url = %sub.url,
collection_id = %collection.id,
"Feed-poll enqueued"
);
enqueued += 1;
}
}
Ok(enqueued)
}
/// Håndterer feed_poll-jobb — spawner synops-feed CLI.
///
/// Payload: { url, collection_id, created_by, subscription_index, target }
pub async fn handle_feed_poll(
job: &crate::jobs::JobRow,
db: &PgPool,
) -> Result<serde_json::Value, String> {
let url = job.payload.get("url")
.and_then(|v| v.as_str())
.ok_or("Mangler url i payload")?;
let collection_id = job.payload.get("collection_id")
.and_then(|v| v.as_str())
.ok_or("Mangler collection_id i payload")?;
let created_by = job.payload.get("created_by")
.and_then(|v| v.as_str())
.ok_or("Mangler created_by i payload")?;
let subscription_index = job.payload.get("subscription_index")
.and_then(|v| v.as_u64())
.unwrap_or(0) as usize;
let collection_uuid: Uuid = collection_id.parse()
.map_err(|e| format!("Ugyldig collection_id: {e}"))?;
// Bygg synops-feed-kommando
let bin = std::env::var("SYNOPS_FEED_BIN")
.unwrap_or_else(|_| "synops-feed".to_string());
let mut cmd = tokio::process::Command::new(&bin);
cmd.arg("--url").arg(url)
.arg("--collection-id").arg(collection_id)
.arg("--created-by").arg(created_by);
// Sett miljøvariabler CLI-verktøyet trenger
crate::cli_dispatch::set_database_url(&mut cmd)?;
tracing::info!(
url = %url,
collection_id = %collection_id,
"Starter synops-feed"
);
let result = crate::cli_dispatch::run_cli_tool(&bin, &mut cmd).await?;
// Oppdater last_polled_at
if let Err(e) = update_last_polled(db, collection_uuid, subscription_index).await {
tracing::warn!(error = %e, "Kunne ikke oppdatere last_polled_at");
}
let nodes_created = result["nodes_created"].as_u64().unwrap_or(0);
tracing::info!(
url = %url,
nodes_created = nodes_created,
feed_title = result["feed_title"].as_str().unwrap_or("n/a"),
"synops-feed fullført"
);
Ok(result)
}
/// Oppdater last_polled_at for et spesifikt abonnement i en samlings metadata.
async fn update_last_polled(
db: &PgPool,
collection_id: Uuid,
subscription_index: usize,
) -> Result<(), String> {
let now = Utc::now().to_rfc3339();
sqlx::query(
r#"
UPDATE nodes
SET metadata = jsonb_set(
metadata,
$2::text[],
to_jsonb($3::text)
)
WHERE id = $1
"#,
)
.bind(collection_id)
.bind(&["feed_subscriptions".to_string(), subscription_index.to_string(), "last_polled_at".to_string()])
.bind(&now)
.execute(db)
.await
.map_err(|e| format!("Kunne ikke oppdatere last_polled_at: {e}"))?;
Ok(())
}