use regex::Regex; use reqwest::Client; use serde::Deserialize; use sqlx::PgPool; use tracing::{info, warn}; /// SpacetimeDB v2 HTTP SQL-respons (array av result-objekter) #[derive(Deserialize)] struct SqlResultEntry { rows: Option>>, } /// Parsed SyncOutbox-entry struct SyncEntry { id: u64, table_name: String, action: String, payload: String, } /// Payload for en chat-melding (insert) #[derive(Deserialize)] struct MessagePayload { id: String, channel_id: String, workspace_id: String, author_id: String, body: String, reply_to: String, } /// Payload for meldings-oppdatering #[derive(Deserialize)] struct MessageUpdatePayload { id: String, body: String, } /// Payload for meldings-sletting #[derive(Deserialize)] struct MessageDeletePayload { id: String, } /// Payload for reaksjon #[derive(Deserialize)] struct ReactionPayload { message_id: String, user_id: String, reaction: String, } pub async fn run( pool: PgPool, http: Client, spacetimedb_url: String, module: String, interval_secs: u64, ) { info!( spacetimedb_url = %spacetimedb_url, module = %module, interval_secs = interval_secs, "Starter sync-worker (SpacetimeDB → PG)" ); let mention_re = Regex::new(r#"data-id="([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})""#) .expect("ugyldig regex"); loop { if let Err(e) = sync_batch(&pool, &http, &spacetimedb_url, &module, &mention_re).await { warn!(error = %e, "Sync-batch feilet"); } tokio::time::sleep(std::time::Duration::from_secs(interval_secs)).await; } } async fn sync_batch( pool: &PgPool, http: &Client, base_url: &str, module: &str, mention_re: &Regex, ) -> anyhow::Result<()> { // 1. Poll SyncOutbox via HTTP SQL (SpacetimeDB v2 API) let sql_url = format!("{}/v1/database/{}/sql", base_url, module); let query = "SELECT id, table_name, action, payload FROM sync_outbox WHERE synced = false"; let resp = http .post(&sql_url) .header("Content-Type", "text/plain") .body(query) .send() .await?; if !resp.status().is_success() { let status = resp.status(); let body = resp.text().await.unwrap_or_default(); anyhow::bail!("SpacetimeDB SQL-feil ({}): {}", status, body); } // v2 returnerer en array av result-objekter let results: Vec = resp.json().await?; let rows = match results.into_iter().next().and_then(|r| r.rows) { Some(r) if !r.is_empty() => r, _ => return Ok(()), // Ingen usynkede events }; // Parse entries let entries: Vec = rows .into_iter() .filter_map(|row| { if row.len() < 4 { return None; } Some(SyncEntry { id: row[0].as_u64()?, table_name: row[1].as_str()?.to_string(), action: row[2].as_str()?.to_string(), payload: row[3].as_str()?.to_string(), }) }) .collect(); if entries.is_empty() { return Ok(()); } info!(count = entries.len(), "Synker batch fra SpacetimeDB"); let mut synced_ids: Vec = Vec::new(); // 2. Prosesser hvert event for entry in &entries { let result = match (entry.table_name.as_str(), entry.action.as_str()) { ("messages", "insert") => process_message_insert(pool, &entry.payload, mention_re).await, ("messages", "delete") => process_message_delete(pool, &entry.payload).await, ("messages", "update") => process_message_update(pool, &entry.payload).await, ("message_reactions", "insert") => process_reaction_insert(pool, &entry.payload).await, ("message_reactions", "delete") => process_reaction_delete(pool, &entry.payload).await, _ => { warn!( table = %entry.table_name, action = %entry.action, "Ukjent sync-event-type, markerer som synket" ); Ok(()) } }; match result { Ok(()) => synced_ids.push(entry.id), Err(e) => { warn!( entry_id = entry.id, table = %entry.table_name, action = %entry.action, error = %e, "Feil ved synking, hopper over" ); } } } // 3. Marker som synket via reducer if !synced_ids.is_empty() { mark_synced(http, base_url, module, &synced_ids).await?; info!(count = synced_ids.len(), "Markert som synket"); } Ok(()) } async fn process_message_insert( pool: &PgPool, payload_json: &str, mention_re: &Regex, ) -> anyhow::Result<()> { let msg: MessagePayload = serde_json::from_str(payload_json)?; let mut tx = pool.begin().await?; // Insert node sqlx::query( "INSERT INTO nodes (id, workspace_id, node_type) VALUES ($1::uuid, $2::uuid, 'melding') ON CONFLICT (id) DO NOTHING" ) .bind(&msg.id) .bind(&msg.workspace_id) .execute(&mut *tx) .await?; // Insert message let reply_to: Option<&str> = if msg.reply_to.is_empty() { None } else { Some(&msg.reply_to) }; sqlx::query( "INSERT INTO messages (id, channel_id, author_id, body, reply_to) VALUES ($1::uuid, $2::uuid, $3, $4, $5::uuid) ON CONFLICT (id) DO NOTHING" ) .bind(&msg.id) .bind(&msg.channel_id) .bind(&msg.author_id) .bind(&msg.body) .bind(reply_to) .execute(&mut *tx) .await?; // Ekstraher mention-UUIDs fra HTML body for cap in mention_re.captures_iter(&msg.body) { let mention_id = &cap[1]; let exists: bool = sqlx::query_scalar( "SELECT EXISTS(SELECT 1 FROM nodes WHERE id = $1::uuid AND workspace_id = $2::uuid)" ) .bind(mention_id) .bind(&msg.workspace_id) .fetch_one(&mut *tx) .await?; if exists { sqlx::query( "INSERT INTO graph_edges (workspace_id, source_id, target_id, relation_type, created_by, origin) VALUES ($1::uuid, $2::uuid, $3::uuid, 'MENTIONS', $4, 'system') ON CONFLICT (source_id, target_id, relation_type) DO NOTHING" ) .bind(&msg.workspace_id) .bind(&msg.id) .bind(mention_id) .bind(&msg.author_id) .execute(&mut *tx) .await?; } } tx.commit().await?; Ok(()) } async fn process_message_delete(pool: &PgPool, payload_json: &str) -> anyhow::Result<()> { let payload: MessageDeletePayload = serde_json::from_str(payload_json)?; let mut tx = pool.begin().await?; // Slett reaksjoner sqlx::query("DELETE FROM message_reactions WHERE message_id = $1::uuid") .bind(&payload.id) .execute(&mut *tx) .await?; // Slett graph_edges der meldingen er source sqlx::query("DELETE FROM graph_edges WHERE source_id = $1::uuid") .bind(&payload.id) .execute(&mut *tx) .await?; // Slett melding sqlx::query("DELETE FROM messages WHERE id = $1::uuid") .bind(&payload.id) .execute(&mut *tx) .await?; // Slett node sqlx::query("DELETE FROM nodes WHERE id = $1::uuid") .bind(&payload.id) .execute(&mut *tx) .await?; tx.commit().await?; info!(id = %payload.id, "Melding slettet fra PG"); Ok(()) } async fn process_message_update(pool: &PgPool, payload_json: &str) -> anyhow::Result<()> { let payload: MessageUpdatePayload = serde_json::from_str(payload_json)?; sqlx::query("UPDATE messages SET body = $1, edited_at = now() WHERE id = $2::uuid") .bind(&payload.body) .bind(&payload.id) .execute(pool) .await?; info!(id = %payload.id, "Melding oppdatert i PG"); Ok(()) } async fn process_reaction_insert(pool: &PgPool, payload_json: &str) -> anyhow::Result<()> { let payload: ReactionPayload = serde_json::from_str(payload_json)?; // Fjern eksisterende reaksjon fra denne brukeren (én per bruker per melding) sqlx::query("DELETE FROM message_reactions WHERE message_id = $1::uuid AND user_id = $2") .bind(&payload.message_id) .bind(&payload.user_id) .execute(pool) .await?; sqlx::query( "INSERT INTO message_reactions (message_id, user_id, reaction) VALUES ($1::uuid, $2, $3)" ) .bind(&payload.message_id) .bind(&payload.user_id) .bind(&payload.reaction) .execute(pool) .await?; Ok(()) } async fn process_reaction_delete(pool: &PgPool, payload_json: &str) -> anyhow::Result<()> { let payload: ReactionPayload = serde_json::from_str(payload_json)?; sqlx::query( "DELETE FROM message_reactions WHERE message_id = $1::uuid AND user_id = $2 AND reaction = $3" ) .bind(&payload.message_id) .bind(&payload.user_id) .bind(&payload.reaction) .execute(pool) .await?; Ok(()) } async fn mark_synced( http: &Client, base_url: &str, module: &str, ids: &[u64], ) -> anyhow::Result<()> { let url = format!("{}/v1/database/{}/call/mark_synced", base_url, module); let body = serde_json::json!({ "ids": ids }); let resp = http .post(&url) .json(&body) .send() .await?; if !resp.status().is_success() { let status = resp.status(); let body = resp.text().await.unwrap_or_default(); anyhow::bail!("mark_synced feilet ({}): {}", status, body); } Ok(()) }