server/worker/src/sync.rs
vegard 8b58d434e9 SpacetimeDB som cache foran PG: arkitekturendring
PG er autoritativ, SpacetimeDB er varm cache. Frontend snakker
kun med SpacetimeDB, worker håndterer toveissynk.

Fase 1 — SpacetimeDB-modul:
- delete_message med SyncOutbox-event
- edit_message reducer
- MessageReaction tabell + add/remove_reaction reducers
- load_messages med JSON-parsing (erstatter pipe-format)
- clear_channel reducer for duplikat-fri warmup
- load_reactions reducer

Fase 2 — Worker:
- warmup.rs: PG→ST oppvarming ved oppstart (100 msg/kanal)
- sync.rs: håndter delete/update/reaction actions
- Sync-intervall redusert til 1s

Fase 3 — Frontend:
- spacetime.svelte.ts: ren SpacetimeDB-adapter, ingen PG-hybrid
- ChatConnection interface med edit/delete/react metoder
- ChatBlock bruker chat.edit/delete/react direkte
- PG-adapter som readonly fallback

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-16 02:09:33 +01:00

340 lines
9.9 KiB
Rust

use regex::Regex;
use reqwest::Client;
use serde::Deserialize;
use sqlx::PgPool;
use tracing::{info, warn};
/// SpacetimeDB v2 HTTP SQL-respons (array av result-objekter)
#[derive(Deserialize)]
struct SqlResultEntry {
rows: Option<Vec<Vec<serde_json::Value>>>,
}
/// Parsed SyncOutbox-entry
struct SyncEntry {
id: u64,
table_name: String,
action: String,
payload: String,
}
/// Payload for en chat-melding (insert)
#[derive(Deserialize)]
struct MessagePayload {
id: String,
channel_id: String,
workspace_id: String,
author_id: String,
body: String,
reply_to: String,
}
/// Payload for meldings-oppdatering
#[derive(Deserialize)]
struct MessageUpdatePayload {
id: String,
body: String,
}
/// Payload for meldings-sletting
#[derive(Deserialize)]
struct MessageDeletePayload {
id: String,
}
/// Payload for reaksjon
#[derive(Deserialize)]
struct ReactionPayload {
message_id: String,
user_id: String,
reaction: String,
}
pub async fn run(
pool: PgPool,
http: Client,
spacetimedb_url: String,
module: String,
interval_secs: u64,
) {
info!(
spacetimedb_url = %spacetimedb_url,
module = %module,
interval_secs = interval_secs,
"Starter sync-worker (SpacetimeDB → PG)"
);
let mention_re = Regex::new(r#"data-id="([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})""#)
.expect("ugyldig regex");
loop {
if let Err(e) = sync_batch(&pool, &http, &spacetimedb_url, &module, &mention_re).await {
warn!(error = %e, "Sync-batch feilet");
}
tokio::time::sleep(std::time::Duration::from_secs(interval_secs)).await;
}
}
async fn sync_batch(
pool: &PgPool,
http: &Client,
base_url: &str,
module: &str,
mention_re: &Regex,
) -> anyhow::Result<()> {
// 1. Poll SyncOutbox via HTTP SQL (SpacetimeDB v2 API)
let sql_url = format!("{}/v1/database/{}/sql", base_url, module);
let query = "SELECT id, table_name, action, payload FROM sync_outbox WHERE synced = false";
let resp = http
.post(&sql_url)
.header("Content-Type", "text/plain")
.body(query)
.send()
.await?;
if !resp.status().is_success() {
let status = resp.status();
let body = resp.text().await.unwrap_or_default();
anyhow::bail!("SpacetimeDB SQL-feil ({}): {}", status, body);
}
// v2 returnerer en array av result-objekter
let results: Vec<SqlResultEntry> = resp.json().await?;
let rows = match results.into_iter().next().and_then(|r| r.rows) {
Some(r) if !r.is_empty() => r,
_ => return Ok(()), // Ingen usynkede events
};
// Parse entries
let entries: Vec<SyncEntry> = rows
.into_iter()
.filter_map(|row| {
if row.len() < 4 {
return None;
}
Some(SyncEntry {
id: row[0].as_u64()?,
table_name: row[1].as_str()?.to_string(),
action: row[2].as_str()?.to_string(),
payload: row[3].as_str()?.to_string(),
})
})
.collect();
if entries.is_empty() {
return Ok(());
}
info!(count = entries.len(), "Synker batch fra SpacetimeDB");
let mut synced_ids: Vec<u64> = Vec::new();
// 2. Prosesser hvert event
for entry in &entries {
let result = match (entry.table_name.as_str(), entry.action.as_str()) {
("messages", "insert") => process_message_insert(pool, &entry.payload, mention_re).await,
("messages", "delete") => process_message_delete(pool, &entry.payload).await,
("messages", "update") => process_message_update(pool, &entry.payload).await,
("message_reactions", "insert") => process_reaction_insert(pool, &entry.payload).await,
("message_reactions", "delete") => process_reaction_delete(pool, &entry.payload).await,
_ => {
warn!(
table = %entry.table_name,
action = %entry.action,
"Ukjent sync-event-type, markerer som synket"
);
Ok(())
}
};
match result {
Ok(()) => synced_ids.push(entry.id),
Err(e) => {
warn!(
entry_id = entry.id,
table = %entry.table_name,
action = %entry.action,
error = %e,
"Feil ved synking, hopper over"
);
}
}
}
// 3. Marker som synket via reducer
if !synced_ids.is_empty() {
mark_synced(http, base_url, module, &synced_ids).await?;
info!(count = synced_ids.len(), "Markert som synket");
}
Ok(())
}
async fn process_message_insert(
pool: &PgPool,
payload_json: &str,
mention_re: &Regex,
) -> anyhow::Result<()> {
let msg: MessagePayload = serde_json::from_str(payload_json)?;
let mut tx = pool.begin().await?;
// Insert node
sqlx::query(
"INSERT INTO nodes (id, workspace_id, node_type) VALUES ($1::uuid, $2::uuid, 'melding') ON CONFLICT (id) DO NOTHING"
)
.bind(&msg.id)
.bind(&msg.workspace_id)
.execute(&mut *tx)
.await?;
// Insert message
let reply_to: Option<&str> = if msg.reply_to.is_empty() { None } else { Some(&msg.reply_to) };
sqlx::query(
"INSERT INTO messages (id, channel_id, author_id, body, reply_to) VALUES ($1::uuid, $2::uuid, $3, $4, $5::uuid) ON CONFLICT (id) DO NOTHING"
)
.bind(&msg.id)
.bind(&msg.channel_id)
.bind(&msg.author_id)
.bind(&msg.body)
.bind(reply_to)
.execute(&mut *tx)
.await?;
// Ekstraher mention-UUIDs fra HTML body
for cap in mention_re.captures_iter(&msg.body) {
let mention_id = &cap[1];
let exists: bool = sqlx::query_scalar(
"SELECT EXISTS(SELECT 1 FROM nodes WHERE id = $1::uuid AND workspace_id = $2::uuid)"
)
.bind(mention_id)
.bind(&msg.workspace_id)
.fetch_one(&mut *tx)
.await?;
if exists {
sqlx::query(
"INSERT INTO graph_edges (workspace_id, source_id, target_id, relation_type, created_by, origin) VALUES ($1::uuid, $2::uuid, $3::uuid, 'MENTIONS', $4, 'system') ON CONFLICT (source_id, target_id, relation_type) DO NOTHING"
)
.bind(&msg.workspace_id)
.bind(&msg.id)
.bind(mention_id)
.bind(&msg.author_id)
.execute(&mut *tx)
.await?;
}
}
tx.commit().await?;
Ok(())
}
async fn process_message_delete(pool: &PgPool, payload_json: &str) -> anyhow::Result<()> {
let payload: MessageDeletePayload = serde_json::from_str(payload_json)?;
let mut tx = pool.begin().await?;
// Slett reaksjoner
sqlx::query("DELETE FROM message_reactions WHERE message_id = $1::uuid")
.bind(&payload.id)
.execute(&mut *tx)
.await?;
// Slett graph_edges der meldingen er source
sqlx::query("DELETE FROM graph_edges WHERE source_id = $1::uuid")
.bind(&payload.id)
.execute(&mut *tx)
.await?;
// Slett melding
sqlx::query("DELETE FROM messages WHERE id = $1::uuid")
.bind(&payload.id)
.execute(&mut *tx)
.await?;
// Slett node
sqlx::query("DELETE FROM nodes WHERE id = $1::uuid")
.bind(&payload.id)
.execute(&mut *tx)
.await?;
tx.commit().await?;
info!(id = %payload.id, "Melding slettet fra PG");
Ok(())
}
async fn process_message_update(pool: &PgPool, payload_json: &str) -> anyhow::Result<()> {
let payload: MessageUpdatePayload = serde_json::from_str(payload_json)?;
sqlx::query("UPDATE messages SET body = $1, edited_at = now() WHERE id = $2::uuid")
.bind(&payload.body)
.bind(&payload.id)
.execute(pool)
.await?;
info!(id = %payload.id, "Melding oppdatert i PG");
Ok(())
}
async fn process_reaction_insert(pool: &PgPool, payload_json: &str) -> anyhow::Result<()> {
let payload: ReactionPayload = serde_json::from_str(payload_json)?;
// Fjern eksisterende reaksjon fra denne brukeren (én per bruker per melding)
sqlx::query("DELETE FROM message_reactions WHERE message_id = $1::uuid AND user_id = $2")
.bind(&payload.message_id)
.bind(&payload.user_id)
.execute(pool)
.await?;
sqlx::query(
"INSERT INTO message_reactions (message_id, user_id, reaction) VALUES ($1::uuid, $2, $3)"
)
.bind(&payload.message_id)
.bind(&payload.user_id)
.bind(&payload.reaction)
.execute(pool)
.await?;
Ok(())
}
async fn process_reaction_delete(pool: &PgPool, payload_json: &str) -> anyhow::Result<()> {
let payload: ReactionPayload = serde_json::from_str(payload_json)?;
sqlx::query(
"DELETE FROM message_reactions WHERE message_id = $1::uuid AND user_id = $2 AND reaction = $3"
)
.bind(&payload.message_id)
.bind(&payload.user_id)
.bind(&payload.reaction)
.execute(pool)
.await?;
Ok(())
}
async fn mark_synced(
http: &Client,
base_url: &str,
module: &str,
ids: &[u64],
) -> anyhow::Result<()> {
let url = format!("{}/v1/database/{}/call/mark_synced", base_url, module);
let body = serde_json::json!({ "ids": ids });
let resp = http
.post(&url)
.json(&body)
.send()
.await?;
if !resp.status().is_success() {
let status = resp.status();
let body = resp.text().await.unwrap_or_default();
anyhow::bail!("mark_synced feilet ({}): {}", status, body);
}
Ok(())
}