PG er autoritativ, SpacetimeDB er varm cache. Frontend snakker kun med SpacetimeDB, worker håndterer toveissynk. Fase 1 — SpacetimeDB-modul: - delete_message med SyncOutbox-event - edit_message reducer - MessageReaction tabell + add/remove_reaction reducers - load_messages med JSON-parsing (erstatter pipe-format) - clear_channel reducer for duplikat-fri warmup - load_reactions reducer Fase 2 — Worker: - warmup.rs: PG→ST oppvarming ved oppstart (100 msg/kanal) - sync.rs: håndter delete/update/reaction actions - Sync-intervall redusert til 1s Fase 3 — Frontend: - spacetime.svelte.ts: ren SpacetimeDB-adapter, ingen PG-hybrid - ChatConnection interface med edit/delete/react metoder - ChatBlock bruker chat.edit/delete/react direkte - PG-adapter som readonly fallback Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
340 lines
9.9 KiB
Rust
340 lines
9.9 KiB
Rust
use regex::Regex;
|
|
use reqwest::Client;
|
|
use serde::Deserialize;
|
|
use sqlx::PgPool;
|
|
use tracing::{info, warn};
|
|
|
|
/// SpacetimeDB v2 HTTP SQL-respons (array av result-objekter)
|
|
#[derive(Deserialize)]
|
|
struct SqlResultEntry {
|
|
rows: Option<Vec<Vec<serde_json::Value>>>,
|
|
}
|
|
|
|
/// Parsed SyncOutbox-entry
|
|
struct SyncEntry {
|
|
id: u64,
|
|
table_name: String,
|
|
action: String,
|
|
payload: String,
|
|
}
|
|
|
|
/// Payload for en chat-melding (insert)
|
|
#[derive(Deserialize)]
|
|
struct MessagePayload {
|
|
id: String,
|
|
channel_id: String,
|
|
workspace_id: String,
|
|
author_id: String,
|
|
body: String,
|
|
reply_to: String,
|
|
}
|
|
|
|
/// Payload for meldings-oppdatering
|
|
#[derive(Deserialize)]
|
|
struct MessageUpdatePayload {
|
|
id: String,
|
|
body: String,
|
|
}
|
|
|
|
/// Payload for meldings-sletting
|
|
#[derive(Deserialize)]
|
|
struct MessageDeletePayload {
|
|
id: String,
|
|
}
|
|
|
|
/// Payload for reaksjon
|
|
#[derive(Deserialize)]
|
|
struct ReactionPayload {
|
|
message_id: String,
|
|
user_id: String,
|
|
reaction: String,
|
|
}
|
|
|
|
pub async fn run(
|
|
pool: PgPool,
|
|
http: Client,
|
|
spacetimedb_url: String,
|
|
module: String,
|
|
interval_secs: u64,
|
|
) {
|
|
info!(
|
|
spacetimedb_url = %spacetimedb_url,
|
|
module = %module,
|
|
interval_secs = interval_secs,
|
|
"Starter sync-worker (SpacetimeDB → PG)"
|
|
);
|
|
|
|
let mention_re = Regex::new(r#"data-id="([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})""#)
|
|
.expect("ugyldig regex");
|
|
|
|
loop {
|
|
if let Err(e) = sync_batch(&pool, &http, &spacetimedb_url, &module, &mention_re).await {
|
|
warn!(error = %e, "Sync-batch feilet");
|
|
}
|
|
tokio::time::sleep(std::time::Duration::from_secs(interval_secs)).await;
|
|
}
|
|
}
|
|
|
|
async fn sync_batch(
|
|
pool: &PgPool,
|
|
http: &Client,
|
|
base_url: &str,
|
|
module: &str,
|
|
mention_re: &Regex,
|
|
) -> anyhow::Result<()> {
|
|
// 1. Poll SyncOutbox via HTTP SQL (SpacetimeDB v2 API)
|
|
let sql_url = format!("{}/v1/database/{}/sql", base_url, module);
|
|
let query = "SELECT id, table_name, action, payload FROM sync_outbox WHERE synced = false";
|
|
|
|
let resp = http
|
|
.post(&sql_url)
|
|
.header("Content-Type", "text/plain")
|
|
.body(query)
|
|
.send()
|
|
.await?;
|
|
|
|
if !resp.status().is_success() {
|
|
let status = resp.status();
|
|
let body = resp.text().await.unwrap_or_default();
|
|
anyhow::bail!("SpacetimeDB SQL-feil ({}): {}", status, body);
|
|
}
|
|
|
|
// v2 returnerer en array av result-objekter
|
|
let results: Vec<SqlResultEntry> = resp.json().await?;
|
|
let rows = match results.into_iter().next().and_then(|r| r.rows) {
|
|
Some(r) if !r.is_empty() => r,
|
|
_ => return Ok(()), // Ingen usynkede events
|
|
};
|
|
|
|
// Parse entries
|
|
let entries: Vec<SyncEntry> = rows
|
|
.into_iter()
|
|
.filter_map(|row| {
|
|
if row.len() < 4 {
|
|
return None;
|
|
}
|
|
Some(SyncEntry {
|
|
id: row[0].as_u64()?,
|
|
table_name: row[1].as_str()?.to_string(),
|
|
action: row[2].as_str()?.to_string(),
|
|
payload: row[3].as_str()?.to_string(),
|
|
})
|
|
})
|
|
.collect();
|
|
|
|
if entries.is_empty() {
|
|
return Ok(());
|
|
}
|
|
|
|
info!(count = entries.len(), "Synker batch fra SpacetimeDB");
|
|
|
|
let mut synced_ids: Vec<u64> = Vec::new();
|
|
|
|
// 2. Prosesser hvert event
|
|
for entry in &entries {
|
|
let result = match (entry.table_name.as_str(), entry.action.as_str()) {
|
|
("messages", "insert") => process_message_insert(pool, &entry.payload, mention_re).await,
|
|
("messages", "delete") => process_message_delete(pool, &entry.payload).await,
|
|
("messages", "update") => process_message_update(pool, &entry.payload).await,
|
|
("message_reactions", "insert") => process_reaction_insert(pool, &entry.payload).await,
|
|
("message_reactions", "delete") => process_reaction_delete(pool, &entry.payload).await,
|
|
_ => {
|
|
warn!(
|
|
table = %entry.table_name,
|
|
action = %entry.action,
|
|
"Ukjent sync-event-type, markerer som synket"
|
|
);
|
|
Ok(())
|
|
}
|
|
};
|
|
|
|
match result {
|
|
Ok(()) => synced_ids.push(entry.id),
|
|
Err(e) => {
|
|
warn!(
|
|
entry_id = entry.id,
|
|
table = %entry.table_name,
|
|
action = %entry.action,
|
|
error = %e,
|
|
"Feil ved synking, hopper over"
|
|
);
|
|
}
|
|
}
|
|
}
|
|
|
|
// 3. Marker som synket via reducer
|
|
if !synced_ids.is_empty() {
|
|
mark_synced(http, base_url, module, &synced_ids).await?;
|
|
info!(count = synced_ids.len(), "Markert som synket");
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
async fn process_message_insert(
|
|
pool: &PgPool,
|
|
payload_json: &str,
|
|
mention_re: &Regex,
|
|
) -> anyhow::Result<()> {
|
|
let msg: MessagePayload = serde_json::from_str(payload_json)?;
|
|
|
|
let mut tx = pool.begin().await?;
|
|
|
|
// Insert node
|
|
sqlx::query(
|
|
"INSERT INTO nodes (id, workspace_id, node_type) VALUES ($1::uuid, $2::uuid, 'melding') ON CONFLICT (id) DO NOTHING"
|
|
)
|
|
.bind(&msg.id)
|
|
.bind(&msg.workspace_id)
|
|
.execute(&mut *tx)
|
|
.await?;
|
|
|
|
// Insert message
|
|
let reply_to: Option<&str> = if msg.reply_to.is_empty() { None } else { Some(&msg.reply_to) };
|
|
sqlx::query(
|
|
"INSERT INTO messages (id, channel_id, author_id, body, reply_to) VALUES ($1::uuid, $2::uuid, $3, $4, $5::uuid) ON CONFLICT (id) DO NOTHING"
|
|
)
|
|
.bind(&msg.id)
|
|
.bind(&msg.channel_id)
|
|
.bind(&msg.author_id)
|
|
.bind(&msg.body)
|
|
.bind(reply_to)
|
|
.execute(&mut *tx)
|
|
.await?;
|
|
|
|
// Ekstraher mention-UUIDs fra HTML body
|
|
for cap in mention_re.captures_iter(&msg.body) {
|
|
let mention_id = &cap[1];
|
|
|
|
let exists: bool = sqlx::query_scalar(
|
|
"SELECT EXISTS(SELECT 1 FROM nodes WHERE id = $1::uuid AND workspace_id = $2::uuid)"
|
|
)
|
|
.bind(mention_id)
|
|
.bind(&msg.workspace_id)
|
|
.fetch_one(&mut *tx)
|
|
.await?;
|
|
|
|
if exists {
|
|
sqlx::query(
|
|
"INSERT INTO graph_edges (workspace_id, source_id, target_id, relation_type, created_by, origin) VALUES ($1::uuid, $2::uuid, $3::uuid, 'MENTIONS', $4, 'system') ON CONFLICT (source_id, target_id, relation_type) DO NOTHING"
|
|
)
|
|
.bind(&msg.workspace_id)
|
|
.bind(&msg.id)
|
|
.bind(mention_id)
|
|
.bind(&msg.author_id)
|
|
.execute(&mut *tx)
|
|
.await?;
|
|
}
|
|
}
|
|
|
|
tx.commit().await?;
|
|
Ok(())
|
|
}
|
|
|
|
async fn process_message_delete(pool: &PgPool, payload_json: &str) -> anyhow::Result<()> {
|
|
let payload: MessageDeletePayload = serde_json::from_str(payload_json)?;
|
|
|
|
let mut tx = pool.begin().await?;
|
|
|
|
// Slett reaksjoner
|
|
sqlx::query("DELETE FROM message_reactions WHERE message_id = $1::uuid")
|
|
.bind(&payload.id)
|
|
.execute(&mut *tx)
|
|
.await?;
|
|
|
|
// Slett graph_edges der meldingen er source
|
|
sqlx::query("DELETE FROM graph_edges WHERE source_id = $1::uuid")
|
|
.bind(&payload.id)
|
|
.execute(&mut *tx)
|
|
.await?;
|
|
|
|
// Slett melding
|
|
sqlx::query("DELETE FROM messages WHERE id = $1::uuid")
|
|
.bind(&payload.id)
|
|
.execute(&mut *tx)
|
|
.await?;
|
|
|
|
// Slett node
|
|
sqlx::query("DELETE FROM nodes WHERE id = $1::uuid")
|
|
.bind(&payload.id)
|
|
.execute(&mut *tx)
|
|
.await?;
|
|
|
|
tx.commit().await?;
|
|
info!(id = %payload.id, "Melding slettet fra PG");
|
|
Ok(())
|
|
}
|
|
|
|
async fn process_message_update(pool: &PgPool, payload_json: &str) -> anyhow::Result<()> {
|
|
let payload: MessageUpdatePayload = serde_json::from_str(payload_json)?;
|
|
|
|
sqlx::query("UPDATE messages SET body = $1, edited_at = now() WHERE id = $2::uuid")
|
|
.bind(&payload.body)
|
|
.bind(&payload.id)
|
|
.execute(pool)
|
|
.await?;
|
|
|
|
info!(id = %payload.id, "Melding oppdatert i PG");
|
|
Ok(())
|
|
}
|
|
|
|
async fn process_reaction_insert(pool: &PgPool, payload_json: &str) -> anyhow::Result<()> {
|
|
let payload: ReactionPayload = serde_json::from_str(payload_json)?;
|
|
|
|
// Fjern eksisterende reaksjon fra denne brukeren (én per bruker per melding)
|
|
sqlx::query("DELETE FROM message_reactions WHERE message_id = $1::uuid AND user_id = $2")
|
|
.bind(&payload.message_id)
|
|
.bind(&payload.user_id)
|
|
.execute(pool)
|
|
.await?;
|
|
|
|
sqlx::query(
|
|
"INSERT INTO message_reactions (message_id, user_id, reaction) VALUES ($1::uuid, $2, $3)"
|
|
)
|
|
.bind(&payload.message_id)
|
|
.bind(&payload.user_id)
|
|
.bind(&payload.reaction)
|
|
.execute(pool)
|
|
.await?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
async fn process_reaction_delete(pool: &PgPool, payload_json: &str) -> anyhow::Result<()> {
|
|
let payload: ReactionPayload = serde_json::from_str(payload_json)?;
|
|
|
|
sqlx::query(
|
|
"DELETE FROM message_reactions WHERE message_id = $1::uuid AND user_id = $2 AND reaction = $3"
|
|
)
|
|
.bind(&payload.message_id)
|
|
.bind(&payload.user_id)
|
|
.bind(&payload.reaction)
|
|
.execute(pool)
|
|
.await?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
async fn mark_synced(
|
|
http: &Client,
|
|
base_url: &str,
|
|
module: &str,
|
|
ids: &[u64],
|
|
) -> anyhow::Result<()> {
|
|
let url = format!("{}/v1/database/{}/call/mark_synced", base_url, module);
|
|
let body = serde_json::json!({ "ids": ids });
|
|
|
|
let resp = http
|
|
.post(&url)
|
|
.json(&body)
|
|
.send()
|
|
.await?;
|
|
|
|
if !resp.status().is_success() {
|
|
let status = resp.status();
|
|
let body = resp.text().await.unwrap_or_default();
|
|
anyhow::bail!("mark_synced feilet ({}): {}", status, body);
|
|
}
|
|
|
|
Ok(())
|
|
}
|