server/worker/src/warmup.rs
vegard 2ed50d51a9 Warmup: trådbasert henting — hele tråder lastes komplett
messages-modus: hent de N nyeste trådene (sortert etter siste
aktivitet), inkludert alle svar. Ingen orphan-replies.

days-modus: finn alle tråder med minst én melding innenfor
tidsvinduet, last hele tråden (også eldre trådstartere).

all-modus: uendret, henter alt.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-16 02:33:55 +01:00

306 lines
10 KiB
Rust

use reqwest::Client;
use serde::Deserialize;
use sqlx::PgPool;
use tracing::{info, warn};
#[derive(Deserialize)]
struct ChannelConfig {
#[serde(default = "default_warmup_mode")]
warmup_mode: String,
warmup_value: Option<i64>,
}
fn default_warmup_mode() -> String {
"all".to_string()
}
struct ChannelInfo {
id: String,
name: String,
config: ChannelConfig,
}
/// Oppvarming: les meldinger per kanal fra PG og last inn i SpacetimeDB.
/// Respekterer per-kanal warmup-konfigurasjon.
pub async fn run(
pool: &PgPool,
http: &Client,
spacetimedb_url: &str,
module: &str,
default_limit: i64,
) -> anyhow::Result<()> {
info!("Starter oppvarming (PG → SpacetimeDB)");
// Hent kanaler med konfig
let rows: Vec<(String, String, serde_json::Value)> = sqlx::query_as(
"SELECT c.id::text, c.name, c.config FROM channels c JOIN nodes n ON n.id = c.id"
)
.fetch_all(pool)
.await?;
let channels: Vec<ChannelInfo> = rows.into_iter().map(|(id, name, config_val)| {
let config: ChannelConfig = serde_json::from_value(config_val)
.unwrap_or(ChannelConfig { warmup_mode: default_warmup_mode(), warmup_value: None });
ChannelInfo { id, name, config }
}).collect();
if channels.is_empty() {
info!("Ingen kanaler funnet — oppvarming fullført");
return Ok(());
}
let active: Vec<&ChannelInfo> = channels.iter()
.filter(|c| c.config.warmup_mode != "none")
.collect();
info!(
total = channels.len(),
active = active.len(),
skipped = channels.len() - active.len(),
"Kanaler funnet"
);
let mut total_messages = 0u64;
let mut total_reactions = 0u64;
for ch in &active {
// Rydd kanalen i SpacetimeDB først
if let Err(e) = call_reducer(http, spacetimedb_url, module, "clear_channel", &serde_json::json!({
"channel_id": ch.id
})).await {
warn!(channel = %ch.name, error = %e, "Kunne ikke rydde kanal — hopper over");
continue;
}
// Trådbasert henting: finn kvalifiserende tråder, hent alle meldinger i disse
let rows = match ch.config.warmup_mode.as_str() {
"all" => {
// Alt — ingen filtrering
fetch_messages_all(pool, &ch.id).await?
},
"messages" => {
// Siste N tråder (sortert etter nyeste melding i tråden)
let n = ch.config.warmup_value.unwrap_or(default_limit);
fetch_messages_by_threads(pool, &ch.id, n).await?
},
"days" => {
// Alle tråder med minst én melding i tidsvinduet
let days = ch.config.warmup_value.unwrap_or(30);
fetch_messages_by_days(pool, &ch.id, days).await?
},
_ => fetch_messages_all(pool, &ch.id).await?,
};
if rows.is_empty() {
info!(channel = %ch.name, mode = %ch.config.warmup_mode, "Ingen meldinger å laste");
continue;
}
// Bygg JSON-array
let messages: Vec<serde_json::Value> = rows.iter().map(|r| {
serde_json::json!({
"id": r.0,
"channel_id": r.1,
"workspace_id": r.2,
"author_id": r.3,
"author_name": r.4,
"body": r.5,
"message_type": r.6,
"reply_to": r.7.as_deref().unwrap_or(""),
"created_at": r.8
})
}).collect();
let count = messages.len();
let json_str = serde_json::to_string(&messages)?;
if let Err(e) = call_reducer(http, spacetimedb_url, module, "load_messages", &serde_json::json!({
"messages_json": json_str
})).await {
warn!(channel = %ch.name, error = %e, "Feil ved lasting av meldinger");
continue;
}
total_messages += count as u64;
// Hent reaksjoner for denne kanalens meldinger
let reaction_rows: Vec<(String, String, String, String)> = sqlx::query_as(
r#"
SELECT
mr.message_id::text,
COALESCE(mr.user_id, ''),
COALESCE(u.name, 'Ukjent'),
mr.reaction
FROM message_reactions mr
JOIN messages m ON m.id = mr.message_id
LEFT JOIN users u ON u.authentik_id = mr.user_id
WHERE m.channel_id = $1::uuid
"#
)
.bind(&ch.id)
.fetch_all(pool)
.await?;
if !reaction_rows.is_empty() {
let reactions: Vec<serde_json::Value> = reaction_rows.iter().map(|r| {
serde_json::json!({
"message_id": r.0,
"user_id": r.1,
"user_name": r.2,
"reaction": r.3
})
}).collect();
let reactions_json = serde_json::to_string(&reactions)?;
if let Err(e) = call_reducer(http, spacetimedb_url, module, "load_reactions", &serde_json::json!({
"reactions_json": reactions_json
})).await {
warn!(channel = %ch.name, error = %e, "Feil ved lasting av reaksjoner");
} else {
total_reactions += reaction_rows.len() as u64;
}
}
info!(
channel = %ch.name,
mode = %ch.config.warmup_mode,
value = ?ch.config.warmup_value,
messages = count,
reactions = reaction_rows.len(),
"Kanal oppvarmet"
);
}
info!(
channels = active.len(),
messages = total_messages,
reactions = total_reactions,
"Oppvarming fullført"
);
Ok(())
}
type MessageRow = (String, String, String, String, String, String, String, Option<String>, String);
const MESSAGE_COLUMNS: &str = r#"
m.id::text,
m.channel_id::text,
n.workspace_id::text,
COALESCE(m.author_id, ''),
COALESCE(u.name, 'Ukjent'),
COALESCE(m.body, ''),
COALESCE(m.message_type, 'text'),
m.reply_to::text,
m.created_at::text
"#;
const MESSAGE_JOINS: &str = r#"
JOIN nodes n ON n.id = m.id
LEFT JOIN users u ON u.authentik_id = m.author_id
"#;
/// Hent alle meldinger i en kanal.
async fn fetch_messages_all(pool: &PgPool, channel_id: &str) -> anyhow::Result<Vec<MessageRow>> {
let query = format!(
"SELECT {} FROM messages m {} WHERE m.channel_id = $1::uuid ORDER BY m.created_at",
MESSAGE_COLUMNS, MESSAGE_JOINS
);
Ok(sqlx::query_as(&query).bind(channel_id).fetch_all(pool).await?)
}
/// Hent de N nyeste trådene + alle meldinger i disse.
/// En "tråd" = en rotmelding (reply_to IS NULL) med alle svar.
/// Sortert etter nyeste melding i tråden.
async fn fetch_messages_by_threads(pool: &PgPool, channel_id: &str, limit: i64) -> anyhow::Result<Vec<MessageRow>> {
// Finn rot-IDer for de N nyeste trådene.
// En tråds "siste aktivitet" er max(created_at) blant rot + alle svar.
// Løse meldinger (reply_to peker på noe utenfor kanalen) teller som egen tråd.
let query = format!(
r#"
WITH thread_roots AS (
-- Finn rot for hver melding: følg reply_to opp til NULL, eller til utenfor kanalen
SELECT DISTINCT COALESCE(
(SELECT r.id FROM messages r
WHERE r.id = m.reply_to AND r.channel_id = m.channel_id AND r.reply_to IS NULL),
CASE WHEN m.reply_to IS NULL THEN m.id END,
m.id -- orphan-svar → behandles som egen tråd
) AS root_id
FROM messages m
WHERE m.channel_id = $1::uuid
),
ranked_threads AS (
SELECT tr.root_id,
max(m.created_at) AS last_activity
FROM thread_roots tr
JOIN messages m ON m.channel_id = $1::uuid
AND (m.id = tr.root_id OR m.reply_to = tr.root_id)
GROUP BY tr.root_id
ORDER BY last_activity DESC
LIMIT $2
)
SELECT {}
FROM messages m
{}
WHERE m.channel_id = $1::uuid
AND (m.id IN (SELECT root_id FROM ranked_threads)
OR m.reply_to IN (SELECT root_id FROM ranked_threads))
ORDER BY m.created_at
"#,
MESSAGE_COLUMNS, MESSAGE_JOINS
);
Ok(sqlx::query_as(&query).bind(channel_id).bind(limit).fetch_all(pool).await?)
}
/// Hent alle tråder som har minst én melding innen siste N dager.
/// Inkluderer hele tråden (også eldre trådstartere).
async fn fetch_messages_by_days(pool: &PgPool, channel_id: &str, days: i64) -> anyhow::Result<Vec<MessageRow>> {
let query = format!(
r#"
WITH qualifying_roots AS (
-- Finn trådrøtter for meldinger innenfor tidsvinduet
SELECT DISTINCT COALESCE(
(SELECT r.id FROM messages r
WHERE r.id = m.reply_to AND r.channel_id = m.channel_id AND r.reply_to IS NULL),
CASE WHEN m.reply_to IS NULL THEN m.id END,
m.id
) AS root_id
FROM messages m
WHERE m.channel_id = $1::uuid
AND m.created_at >= now() - make_interval(days => $2)
)
SELECT {}
FROM messages m
{}
WHERE m.channel_id = $1::uuid
AND (m.id IN (SELECT root_id FROM qualifying_roots)
OR m.reply_to IN (SELECT root_id FROM qualifying_roots))
ORDER BY m.created_at
"#,
MESSAGE_COLUMNS, MESSAGE_JOINS
);
Ok(sqlx::query_as(&query).bind(channel_id).bind(days as i32).fetch_all(pool).await?)
}
async fn call_reducer(
http: &Client,
base_url: &str,
module: &str,
reducer: &str,
args: &serde_json::Value,
) -> anyhow::Result<()> {
let url = format!("{}/v1/database/{}/call/{}", base_url, module, reducer);
let resp = http
.post(&url)
.json(args)
.send()
.await?;
if !resp.status().is_success() {
let status = resp.status();
let body = resp.text().await.unwrap_or_default();
anyhow::bail!("{} feilet ({}): {}", reducer, status, body);
}
Ok(())
}