diff --git a/worker/src/warmup.rs b/worker/src/warmup.rs index 0b1c1d1..bec8e76 100644 --- a/worker/src/warmup.rs +++ b/worker/src/warmup.rs @@ -72,49 +72,25 @@ pub async fn run( continue; } - // Bygg WHERE-clause basert på warmup-modus - let (where_clause, limit) = match ch.config.warmup_mode.as_str() { + // Trådbasert henting: finn kvalifiserende tråder, hent alle meldinger i disse + let rows = match ch.config.warmup_mode.as_str() { + "all" => { + // Alt — ingen filtrering + fetch_messages_all(pool, &ch.id).await? + }, "messages" => { + // Siste N tråder (sortert etter nyeste melding i tråden) let n = ch.config.warmup_value.unwrap_or(default_limit); - (String::new(), n) + fetch_messages_by_threads(pool, &ch.id, n).await? }, "days" => { + // Alle tråder med minst én melding i tidsvinduet let days = ch.config.warmup_value.unwrap_or(30); - (format!("AND m.created_at >= now() - interval '{} days'", days), i64::MAX) + fetch_messages_by_days(pool, &ch.id, days).await? }, - // "all" og alt annet - _ => (String::new(), i64::MAX), + _ => fetch_messages_all(pool, &ch.id).await?, }; - let query = format!( - r#" - SELECT - m.id::text, - m.channel_id::text, - n.workspace_id::text, - COALESCE(m.author_id, ''), - COALESCE(u.name, 'Ukjent'), - COALESCE(m.body, ''), - COALESCE(m.message_type, 'text'), - m.reply_to::text, - m.created_at::text - FROM messages m - JOIN nodes n ON n.id = m.id - LEFT JOIN users u ON u.authentik_id = m.author_id - WHERE m.channel_id = $1::uuid {} - ORDER BY m.created_at DESC - LIMIT $2 - "#, - where_clause - ); - - let rows: Vec<(String, String, String, String, String, String, String, Option, String)> = - sqlx::query_as(&query) - .bind(&ch.id) - .bind(limit) - .fetch_all(pool) - .await?; - if rows.is_empty() { info!(channel = %ch.name, mode = %ch.config.warmup_mode, "Ingen meldinger å laste"); continue; @@ -205,6 +181,107 @@ pub async fn run( Ok(()) } +type MessageRow = (String, String, String, String, String, String, String, Option, String); + +const MESSAGE_COLUMNS: &str = r#" + m.id::text, + m.channel_id::text, + n.workspace_id::text, + COALESCE(m.author_id, ''), + COALESCE(u.name, 'Ukjent'), + COALESCE(m.body, ''), + COALESCE(m.message_type, 'text'), + m.reply_to::text, + m.created_at::text +"#; + +const MESSAGE_JOINS: &str = r#" + JOIN nodes n ON n.id = m.id + LEFT JOIN users u ON u.authentik_id = m.author_id +"#; + +/// Hent alle meldinger i en kanal. +async fn fetch_messages_all(pool: &PgPool, channel_id: &str) -> anyhow::Result> { + let query = format!( + "SELECT {} FROM messages m {} WHERE m.channel_id = $1::uuid ORDER BY m.created_at", + MESSAGE_COLUMNS, MESSAGE_JOINS + ); + Ok(sqlx::query_as(&query).bind(channel_id).fetch_all(pool).await?) +} + +/// Hent de N nyeste trådene + alle meldinger i disse. +/// En "tråd" = en rotmelding (reply_to IS NULL) med alle svar. +/// Sortert etter nyeste melding i tråden. +async fn fetch_messages_by_threads(pool: &PgPool, channel_id: &str, limit: i64) -> anyhow::Result> { + // Finn rot-IDer for de N nyeste trådene. + // En tråds "siste aktivitet" er max(created_at) blant rot + alle svar. + // Løse meldinger (reply_to peker på noe utenfor kanalen) teller som egen tråd. + let query = format!( + r#" + WITH thread_roots AS ( + -- Finn rot for hver melding: følg reply_to opp til NULL, eller til utenfor kanalen + SELECT DISTINCT COALESCE( + (SELECT r.id FROM messages r + WHERE r.id = m.reply_to AND r.channel_id = m.channel_id AND r.reply_to IS NULL), + CASE WHEN m.reply_to IS NULL THEN m.id END, + m.id -- orphan-svar → behandles som egen tråd + ) AS root_id + FROM messages m + WHERE m.channel_id = $1::uuid + ), + ranked_threads AS ( + SELECT tr.root_id, + max(m.created_at) AS last_activity + FROM thread_roots tr + JOIN messages m ON m.channel_id = $1::uuid + AND (m.id = tr.root_id OR m.reply_to = tr.root_id) + GROUP BY tr.root_id + ORDER BY last_activity DESC + LIMIT $2 + ) + SELECT {} + FROM messages m + {} + WHERE m.channel_id = $1::uuid + AND (m.id IN (SELECT root_id FROM ranked_threads) + OR m.reply_to IN (SELECT root_id FROM ranked_threads)) + ORDER BY m.created_at + "#, + MESSAGE_COLUMNS, MESSAGE_JOINS + ); + Ok(sqlx::query_as(&query).bind(channel_id).bind(limit).fetch_all(pool).await?) +} + +/// Hent alle tråder som har minst én melding innen siste N dager. +/// Inkluderer hele tråden (også eldre trådstartere). +async fn fetch_messages_by_days(pool: &PgPool, channel_id: &str, days: i64) -> anyhow::Result> { + let query = format!( + r#" + WITH qualifying_roots AS ( + -- Finn trådrøtter for meldinger innenfor tidsvinduet + SELECT DISTINCT COALESCE( + (SELECT r.id FROM messages r + WHERE r.id = m.reply_to AND r.channel_id = m.channel_id AND r.reply_to IS NULL), + CASE WHEN m.reply_to IS NULL THEN m.id END, + m.id + ) AS root_id + FROM messages m + WHERE m.channel_id = $1::uuid + AND m.created_at >= now() - make_interval(days => $2) + ) + SELECT {} + FROM messages m + {} + WHERE m.channel_id = $1::uuid + AND (m.id IN (SELECT root_id FROM qualifying_roots) + OR m.reply_to IN (SELECT root_id FROM qualifying_roots)) + ORDER BY m.created_at + "#, + MESSAGE_COLUMNS, MESSAGE_JOINS + ); + Ok(sqlx::query_as(&query).bind(channel_id).bind(days as i32).fetch_all(pool).await?) +} + async fn call_reducer( http: &Client, base_url: &str,