SpacetimeDB som cache foran PG: arkitekturendring

PG er autoritativ, SpacetimeDB er varm cache. Frontend snakker
kun med SpacetimeDB, worker håndterer toveissynk.

Fase 1 — SpacetimeDB-modul:
- delete_message med SyncOutbox-event
- edit_message reducer
- MessageReaction tabell + add/remove_reaction reducers
- load_messages med JSON-parsing (erstatter pipe-format)
- clear_channel reducer for duplikat-fri warmup
- load_reactions reducer

Fase 2 — Worker:
- warmup.rs: PG→ST oppvarming ved oppstart (100 msg/kanal)
- sync.rs: håndter delete/update/reaction actions
- Sync-intervall redusert til 1s

Fase 3 — Frontend:
- spacetime.svelte.ts: ren SpacetimeDB-adapter, ingen PG-hybrid
- ChatConnection interface med edit/delete/react metoder
- ChatBlock bruker chat.edit/delete/react direkte
- PG-adapter som readonly fallback

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
vegard 2026-03-16 02:09:33 +01:00
parent 63f928bbe6
commit 8b58d434e9
28 changed files with 1443 additions and 240 deletions

82
dev.sh Executable file
View file

@ -0,0 +1,82 @@
#!/usr/bin/env bash
set -euo pipefail
# Sidelinja — start alt for lokal utvikling
# Bruk: ./dev.sh [--clean]
# --clean Sletter SpacetimeDB-data og starter blankt
ROOT="$(cd "$(dirname "$0")" && pwd)"
SPACETIME_URL="http://127.0.0.1:3000"
SPACETIME_MODULE="sidelinja-realtime"
SPACETIME_SERVER="local"
# === Rens ved behov ===
if [[ "${1:-}" == "--clean" ]]; then
echo "=== Renser SpacetimeDB-data ==="
docker compose -f "$ROOT/docker-compose.dev.yml" --env-file "$ROOT/.env.local" \
rm -sf spacetimedb 2>/dev/null || true
rm -rf "$ROOT/.docker-data/spacetimedb"
mkdir -p "$ROOT/.docker-data/spacetimedb"
fi
# === 1. Docker-tjenester ===
echo "=== Starter Docker-tjenester ==="
docker compose -f "$ROOT/docker-compose.dev.yml" --env-file "$ROOT/.env.local" up -d
# === 2. Vent på SpacetimeDB ===
echo "=== Venter på SpacetimeDB ==="
timeout 30 bash -c "until curl -sf $SPACETIME_URL/v1/ping > /dev/null 2>&1; do sleep 1; done" \
|| { echo "FEIL: SpacetimeDB svarte ikke innen 30s"; exit 1; }
echo "SpacetimeDB klar"
# === 3. Konfigurer spacetime CLI (håndterer ny fingerprint automatisk) ===
echo "=== Konfigurerer SpacetimeDB CLI ==="
spacetime server remove "$SPACETIME_SERVER" 2>/dev/null || true
spacetime server add "$SPACETIME_SERVER" --url "$SPACETIME_URL" --default 2>/dev/null || true
# === 4. Publiser modul ===
echo "=== Publiserer SpacetimeDB-modul ==="
cd "$ROOT/spacetimedb"
spacetime publish "$SPACETIME_MODULE" --server "$SPACETIME_SERVER" 2>&1 \
| grep -v "wasm-opt\|Would you like\|WARNING.*UNSTABLE" || true
echo "Modul publisert"
# === 5. Generer TypeScript-bindinger ===
echo "=== Genererer TypeScript-bindinger ==="
spacetime generate --lang typescript \
--out-dir "$ROOT/web/src/lib/chat/module_bindings" \
--module-path .
echo "Bindinger generert"
# === 6. Start worker + frontend ===
echo ""
echo "=== Starter worker + frontend ==="
cd "$ROOT/worker"
cargo run -- --spacetimedb-url "$SPACETIME_URL" --sync-interval 1 --warmup-limit 100 2>&1 | sed 's/^/[worker] /' &
WORKER_PID=$!
cd "$ROOT/web"
npm run dev 2>&1 | sed 's/^/[frontend] /' &
FRONTEND_PID=$!
cleanup() {
echo ""
echo "Stopper worker og frontend..."
kill $WORKER_PID $FRONTEND_PID 2>/dev/null
wait $WORKER_PID $FRONTEND_PID 2>/dev/null
echo "Stoppet. Docker-tjenester kjører fortsatt (bruk 'docker compose down' for å stoppe)."
}
trap cleanup EXIT INT TERM
echo ""
echo "Alt kjører:"
echo " Frontend: http://localhost:5173"
echo " SpacetimeDB: $SPACETIME_URL"
echo " PostgreSQL: localhost:5432"
echo " Worker PID: $WORKER_PID"
echo ""
echo "Ctrl+C for å stoppe worker + frontend."
echo ""
wait

View file

@ -68,11 +68,34 @@ log::info!("{}", log_msg);
```bash
# Publiser modul mot lokal SpacetimeDB (må kjøre i Docker først)
cd spacetimedb
spacetime publish sidelinja-realtime --server http://localhost:3000
spacetime publish sidelinja-realtime --server local
# Generer TypeScript-bindings
spacetime generate --lang typescript --out-dir ../web/src/lib/chat/module_bindings \
--project-path .
--module-path .
```
**Merk:** Docker-containeren (`clockworklabs/spacetime:latest`) må kjøre før publisering. Modulen overlever container-restart (data i volum `.docker-data/spacetimedb`).
**Merk:** Bruk `./dev.sh` for å starte hele stacken automatisk (inkl. SpacetimeDB publish + binding-generering). `./dev.sh --clean` starter blankt.
## 6. Arkitekturendring: SpacetimeDB som cache foran PG (mars 2026)
Tidligere: Hybrid-adapter der frontend merget data fra PG (historikk) og SpacetimeDB (sanntid) med dedup, deletedIds og BigInt-workarounds.
Ny modell:
- **PG autoritativ** — all persistent data i PostgreSQL
- **SpacetimeDB = varm cache** — worker gjør warmup (PG → ST) ved oppstart
- **Frontend snakker KUN med ST** — ingen PG API-kall fra chat-adapteren
- **Worker håndterer toveissynk** — ST → PG for nye/redigerte/slettede meldinger og reaksjoner
### Warmup-flyt
1. Worker starter → `warmup::run()` leser siste N meldinger per kanal fra PG
2. Kaller `clear_channel` reducer per kanal (unngår duplikater ved restart)
3. Kaller `load_messages` reducer med JSON-array (ikke pipe-separert format)
4. Laster også reaksjoner via `load_reactions` reducer
### Sync-flyt (ST → PG)
- SyncOutbox-events prosesseres hver 1. sekund
- Støtter: `messages/insert`, `messages/delete`, `messages/update`, `message_reactions/insert`, `message_reactions/delete`
### Fallback
PG-polling adapter (`pg.svelte.ts`) brukes kun når SpacetimeDB ikke er konfigurert. Markeres som `readonly: true`.

View file

@ -20,9 +20,9 @@ Dersom SpacetimeDB fjernes fra stacken, skal systemet fungere med følgende erst
Denne fallbacken trenger ikke implementeres på forhånd, men SpacetimeDB-moduler skal designes slik at fallbacken forblir triviell.
## 2. Strategi: Event-drevet med kort forsinkelse
SpacetimeDB-modulene (Rust) produserer persisterings-events ved dataendringer. En Rust-worker konsumerer disse og skriver til PostgreSQL, batched med ~5 sekunders vindu.
SpacetimeDB-modulene (Rust) produserer persisterings-events ved dataendringer. En Rust-worker konsumerer disse og skriver til PostgreSQL med ~1 sekunds intervall.
**Akseptabelt datatap:** Maks 5 sekunder ved hard krasj av SpacetimeDB. Dette er akseptabelt for chat, kanban og show notes.
**Akseptabelt datatap:** Maks 1 sekund ved hard krasj av SpacetimeDB. Dette er akseptabelt for chat, kanban og show notes.
**Unntak — kritiske events:** Aha-markører fra studioet (live-innspilling) er tidssensitive og vanskelige å gjenskape. Disse bør flushes til PG umiddelbart (ikke batched) via en dedikert `sync_critical()`-funksjon som skriver direkte til PG i stedet for via `sync_outbox`. Alternativt kan SpacetimeDB-modulen skrive kritiske events til sin egen WAL/disk umiddelbart. Hvilke event-typer som er "kritiske" defineres per workspace i `workspaces.settings`.
@ -98,29 +98,21 @@ Kanban-kort har en `position`-kolonne (float). To brukere som drar kort samtidig
### 8.3 Chat: Ingen konflikter
Meldinger er append-only. Redigering av egne meldinger er last-write-wins — akseptabelt fordi kun én bruker eier meldingen.
## 9. Implementeringsstatus (mars 2025)
## 9. Implementeringsstatus (mars 2026)
### Ferdig
- **SpacetimeDB Rust-modul** (`spacetimedb/src/lib.rs`): `ChatMessage`- og `SyncOutbox`-tabeller. `send_message`-reducer skriver til begge. Publisert som `sidelinja-realtime`.
- **Hybrid-adapter i frontend** (`web/src/lib/chat/spacetime.svelte.ts`): Henter historikk fra PG via REST, lytter på SpacetimeDB for sanntidspush. Ingen oppvarming nødvendig — PG har alltid historikken.
- **PG-fallback:** Fungerer automatisk. Hvis `VITE_SPACETIMEDB_URL` ikke er satt, brukes ren PG-polling (3 sek intervall).
- **SpacetimeDB som cache foran PG:** PG er autoritativ, SpacetimeDB er varm cache. Frontend snakker kun med SpacetimeDB.
- **SpacetimeDB Rust-modul** (`spacetimedb/src/lib.rs`): `ChatMessage`, `MessageReaction` og `SyncOutbox`-tabeller. Reducers: `send_message`, `delete_message`, `edit_message`, `add_reaction`, `remove_reaction`, `load_messages`, `load_reactions`, `clear_channel`, `mark_synced`.
- **Worker warmup** (`worker/src/warmup.rs`): Ved oppstart lastes siste 100 meldinger + reaksjoner per kanal fra PG → SpacetimeDB. Kanaler ryddes først med `clear_channel` for å unngå duplikater.
- **Worker sync** (`worker/src/sync.rs`): Poller `sync_outbox` hvert 1. sekund. Håndterer insert/delete/update for meldinger og insert/delete for reaksjoner.
- **SpacetimeDB-adapter** (`web/src/lib/chat/spacetime.svelte.ts`): Ren SpacetimeDB-adapter. Ingen PG API-kall. Bruker `onInsert`/`onUpdate`/`onDelete` callbacks for sanntid. Reaksjoner bygges fra `message_reaction`-tabellen.
- **PG-fallback** (`web/src/lib/chat/pg.svelte.ts`): Brukes kun når SpacetimeDB ikke er konfigurert. Markert som `readonly: true`.
- **Adapter-mønster:** `ChatConnection`-interface med `send`, `edit`, `delete`, `react` metoder. Factory velger basert på env-variabel.
### Gjenstår
- **Sync-worker (§5.1):** Rust-worker som poller `sync_outbox` i SpacetimeDB og batch-skriver til PostgreSQL. Uten denne workeren persisteres meldinger sendt via SpacetimeDB kun i SpacetimeDB-minnet — de overlever ikke restart.
- **Oppvarming (§5.2):** Ikke implementert, og hybrid-adapteren gjør dette mindre kritisk (klienten henter alltid PG-historikk uavhengig av SpacetimeDB).
- **Workspace-partisjonering (§7):** SpacetimeDB-modulen har `workspace_id`-felt men bruker ikke workspace-token på tilkobling ennå.
### Designvalg tatt
- **Hybrid fremfor ren SpacetimeDB:** Frontend bruker PG for historikk og SpacetimeDB kun for nye meldinger. Dette unngår oppvarmingsproblematikk og gir umiddelbar tilgang til all historikk.
- **Graceful degradation:** SpacetimeDB-tilkoblingsfeil faller stille tilbake til PG. Brukeren ser ingen feilmelding — PG-data beholdes.
- **Adapter-mønster:** `ChatConnection`-interface med to implementasjoner (PG og SpacetimeDB hybrid). Factory velger basert på env-variabel. Gjør det trivielt å teste hver adapter isolert.
### Åpent spørsmål: SpacetimeDB i fase 1?
PG-polling (3 sek) fungerer godt nok for chat og kanban med nåværende brukertall. SpacetimeDB + sync-worker innfører betydelig kompleksitet (outbox, oppvarming, workspace-partisjonering, feilhåndtering) som ennå ikke gir målbar gevinst.
**Alternativ:** Bruk PostgreSQL `LISTEN/NOTIFY` → SvelteKit SSE (Server-Sent Events) som neste steg fra polling. Dette gir sub-sekund sanntid uten ny infrastruktur-avhengighet. SpacetimeDB introduseres først når vi har et konkret behov det ikke dekker (f.eks. LiveKit-studio med høyfrekvent state-sync mellom mange klienter).
**Beslutning:** Utsatt. PG-adapter med polling er "god nok" for Lag 2. SpacetimeDB-koden beholdes men aktiveres ikke i prod før behovet er bevist. Adapter-mønsteret gjør at vi kan bytte uten frontend-endring.
- **Pin/konvertering via SpacetimeDB:** Pin og kanban/kalender-konvertering går fortsatt direkte til PG API.
- **Lazy warmup per kanal:** Alle aktive kanaler oppvarmes ved oppstart. Kan optimaliseres til per-kanal ved tilkobling.
## 10. Instruks for Claude Code
- `sync_outbox`-tabellen i SpacetimeDB bør ha et `synced`-flagg og `created_at`-tidsstempel

View file

@ -584,6 +584,7 @@ name = "sidelinja-realtime"
version = "0.1.0"
dependencies = [
"log",
"serde_json",
"spacetimedb",
]

View file

@ -8,4 +8,5 @@ crate-type = ["cdylib"]
[dependencies]
spacetimedb = "1.0"
serde_json = "1"
log = "0.4"

View file

@ -19,6 +19,18 @@ pub struct ChatMessage {
pub created_at: Timestamp,
}
/// Reaksjon på en melding. Speiler PostgreSQL `message_reactions`.
#[table(name = message_reaction, public)]
pub struct MessageReaction {
#[auto_inc]
#[primary_key]
pub id: u64,
pub message_id: String,
pub user_id: String,
pub user_name: String,
pub reaction: String,
}
/// Outbox for synkronisering til PostgreSQL.
/// Rust sync-worker leser denne og batch-skriver til PG.
#[table(name = sync_outbox, public)]
@ -44,6 +56,7 @@ pub fn send_message(
id: String,
channel_id: String,
workspace_id: String,
author_id: String,
author_name: String,
body: String,
reply_to: String,
@ -52,19 +65,21 @@ pub fn send_message(
return Err("Melding kan ikke være tom".to_string());
}
// Bygg payload først (før verdiene flyttes inn i ChatMessage)
let payload = format!(
r#"{{"id":"{}","channel_id":"{}","workspace_id":"{}","author_id":"{}","body":"{}","reply_to":"{}"}}"#,
id, channel_id, workspace_id, ctx.sender.to_hex(),
body.trim().replace('"', r#"\""#),
reply_to
);
// Bygg payload med serde_json for korrekt escaping
let payload = serde_json::json!({
"id": id,
"channel_id": channel_id,
"workspace_id": workspace_id,
"author_id": author_id,
"body": body.trim(),
"reply_to": reply_to
}).to_string();
let msg = ChatMessage {
id,
channel_id,
workspace_id: workspace_id.clone(),
author_id: ctx.sender.to_hex().to_string(),
author_id,
author_name,
body: body.trim().to_string(),
message_type: "text".to_string(),
@ -88,31 +103,187 @@ pub fn send_message(
Ok(())
}
/// Slett en melding. Kalles fra klienten.
/// Fjerner fra SpacetimeDB + legger i SyncOutbox for PG-sletting.
#[reducer]
pub fn delete_message(ctx: &ReducerContext, id: String, workspace_id: String) -> Result<(), String> {
if let Some(_) = ctx.db.chat_message().id().find(&id) {
ctx.db.chat_message().id().delete(&id);
// Slett tilhørende reaksjoner
let reactions: Vec<_> = ctx.db.message_reaction().iter()
.filter(|r| r.message_id == id)
.collect();
for r in reactions {
ctx.db.message_reaction().id().delete(&r.id);
}
let payload = serde_json::json!({ "id": id }).to_string();
ctx.db.sync_outbox().insert(SyncOutbox {
id: 0,
table_name: "messages".to_string(),
action: "delete".to_string(),
payload,
workspace_id,
created_at: ctx.timestamp,
synced: false,
});
log::info!("Melding slettet: {}", id);
}
Ok(())
}
/// Rediger en melding. Oppdaterer body + legger i SyncOutbox.
#[reducer]
pub fn edit_message(
ctx: &ReducerContext,
id: String,
workspace_id: String,
new_body: String,
) -> Result<(), String> {
if new_body.trim().is_empty() {
return Err("Melding kan ikke være tom".to_string());
}
if let Some(mut msg) = ctx.db.chat_message().id().find(&id) {
msg.body = new_body.trim().to_string();
ctx.db.chat_message().id().update(msg);
let payload = serde_json::json!({
"id": id,
"body": new_body.trim()
}).to_string();
ctx.db.sync_outbox().insert(SyncOutbox {
id: 0,
table_name: "messages".to_string(),
action: "update".to_string(),
payload,
workspace_id,
created_at: ctx.timestamp,
synced: false,
});
log::info!("Melding redigert: {}", id);
Ok(())
} else {
Err("Melding ikke funnet".to_string())
}
}
/// Legg til reaksjon. Én reaksjon per bruker per melding (erstatter tidligere).
#[reducer]
pub fn add_reaction(
ctx: &ReducerContext,
message_id: String,
workspace_id: String,
user_id: String,
user_name: String,
reaction: String,
) -> Result<(), String> {
// Fjern eventuell eksisterende reaksjon fra denne brukeren
let existing: Vec<_> = ctx.db.message_reaction().iter()
.filter(|r| r.message_id == message_id && r.user_id == user_id)
.collect();
for r in existing {
ctx.db.message_reaction().id().delete(&r.id);
}
ctx.db.message_reaction().insert(MessageReaction {
id: 0,
message_id: message_id.clone(),
user_id: user_id.clone(),
user_name,
reaction: reaction.clone(),
});
let payload = serde_json::json!({
"message_id": message_id,
"user_id": user_id,
"reaction": reaction
}).to_string();
ctx.db.sync_outbox().insert(SyncOutbox {
id: 0,
table_name: "message_reactions".to_string(),
action: "insert".to_string(),
payload,
workspace_id,
created_at: ctx.timestamp,
synced: false,
});
log::info!("Reaksjon lagt til: {} på {}", reaction, message_id);
Ok(())
}
/// Fjern reaksjon.
#[reducer]
pub fn remove_reaction(
ctx: &ReducerContext,
message_id: String,
workspace_id: String,
user_id: String,
reaction: String,
) -> Result<(), String> {
let matching: Vec<_> = ctx.db.message_reaction().iter()
.filter(|r| r.message_id == message_id && r.user_id == user_id && r.reaction == reaction)
.collect();
for r in matching {
ctx.db.message_reaction().id().delete(&r.id);
}
let payload = serde_json::json!({
"message_id": message_id,
"user_id": user_id,
"reaction": reaction
}).to_string();
ctx.db.sync_outbox().insert(SyncOutbox {
id: 0,
table_name: "message_reactions".to_string(),
action: "delete".to_string(),
payload,
workspace_id,
created_at: ctx.timestamp,
synced: false,
});
log::info!("Reaksjon fjernet: {} på {}", reaction, message_id);
Ok(())
}
/// Laster meldinger fra PostgreSQL ved oppvarming.
/// Kalles av sync-worker, ikke av klienter direkte.
/// Mottar JSON-array med meldingsobjekter.
#[reducer]
pub fn load_messages(
ctx: &ReducerContext,
messages_json: Vec<String>,
messages_json: String,
) -> Result<(), String> {
let count = messages_json.len();
for json_str in messages_json {
// Enkel parsing — sync-worker sender ferdig-formaterte meldinger
let parts: Vec<&str> = json_str.splitn(8, '|').collect();
if parts.len() < 8 {
log::warn!("Ugyldig melding ved oppvarming: {}", json_str);
let items: Vec<serde_json::Value> = serde_json::from_str(&messages_json)
.map_err(|e| format!("Ugyldig JSON: {}", e))?;
let count = items.len();
for item in items {
let id = item["id"].as_str().unwrap_or_default().to_string();
if id.is_empty() { continue; }
// Hopp over om meldingen allerede finnes
if ctx.db.chat_message().id().find(&id).is_some() {
continue;
}
ctx.db.chat_message().insert(ChatMessage {
id: parts[0].to_string(),
channel_id: parts[1].to_string(),
workspace_id: parts[2].to_string(),
author_id: parts[3].to_string(),
author_name: parts[4].to_string(),
body: parts[5].to_string(),
message_type: parts[6].to_string(),
reply_to: parts[7].to_string(),
id,
channel_id: item["channel_id"].as_str().unwrap_or_default().to_string(),
workspace_id: item["workspace_id"].as_str().unwrap_or_default().to_string(),
author_id: item["author_id"].as_str().unwrap_or_default().to_string(),
author_name: item["author_name"].as_str().unwrap_or_default().to_string(),
body: item["body"].as_str().unwrap_or_default().to_string(),
message_type: item["message_type"].as_str().unwrap_or("text").to_string(),
reply_to: item["reply_to"].as_str().unwrap_or_default().to_string(),
created_at: ctx.timestamp,
});
}
@ -121,6 +292,61 @@ pub fn load_messages(
Ok(())
}
/// Laster reaksjoner fra PostgreSQL ved oppvarming.
#[reducer]
pub fn load_reactions(
ctx: &ReducerContext,
reactions_json: String,
) -> Result<(), String> {
let items: Vec<serde_json::Value> = serde_json::from_str(&reactions_json)
.map_err(|e| format!("Ugyldig JSON: {}", e))?;
let count = items.len();
for item in items {
let message_id = item["message_id"].as_str().unwrap_or_default().to_string();
let user_id = item["user_id"].as_str().unwrap_or_default().to_string();
if message_id.is_empty() || user_id.is_empty() { continue; }
ctx.db.message_reaction().insert(MessageReaction {
id: 0,
message_id,
user_id,
user_name: item["user_name"].as_str().unwrap_or_default().to_string(),
reaction: item["reaction"].as_str().unwrap_or_default().to_string(),
});
}
log::info!("Reaksjoner lastet: {}", count);
Ok(())
}
/// Fjerner alle meldinger og reaksjoner for en kanal.
/// Brukes av warmup for å unngå duplikater ved restart.
#[reducer]
pub fn clear_channel(ctx: &ReducerContext, channel_id: String) -> Result<(), String> {
let messages: Vec<_> = ctx.db.chat_message().iter()
.filter(|m| m.channel_id == channel_id)
.collect();
let msg_ids: Vec<String> = messages.iter().map(|m| m.id.clone()).collect();
// Slett reaksjoner for disse meldingene
let reactions: Vec<_> = ctx.db.message_reaction().iter()
.filter(|r| msg_ids.contains(&r.message_id))
.collect();
for r in reactions {
ctx.db.message_reaction().id().delete(&r.id);
}
// Slett meldingene
for msg in messages {
ctx.db.chat_message().id().delete(&msg.id);
}
log::info!("Kanal ryddet: {} ({} meldinger)", channel_id, msg_ids.len());
Ok(())
}
/// Markerer sync-outbox-events som synket.
/// Kalles av sync-worker etter vellykket PG-skriving.
#[reducer]

View file

@ -25,38 +25,16 @@
get currentUserId() { return currentUserId; },
onMentionClick: (entityId: string) => goto(`/entities/${entityId}`),
onEdit: async (messageId: string, newBody: string) => {
// Oppdater lokalt umiddelbart (optimistisk)
chat?.updateLocal?.(messageId, newBody);
try {
const res = await fetch(`/api/messages/${messageId}`, {
method: 'PATCH',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ body: newBody })
});
if (res.ok) await chat?.refresh();
} catch { /* nettverksfeil — allerede oppdatert lokalt */ }
await chat?.edit(messageId, newBody);
},
onDelete: async (messageId: string) => {
// Fjern fra lokal state umiddelbart
chat?.removeLocal?.(messageId);
// Slett fra PG (ignorerer 404 — meldingen kan være usynket)
try { await fetch(`/api/messages/${messageId}`, { method: 'DELETE' }); } catch {}
// Slett fra SpacetimeDB slik at den ikke dukker opp igjen ved reconnect
chat?.deleteFromSpacetime?.(messageId);
await chat?.delete(messageId);
},
onReaction: async (messageId: string, reaction: string) => {
try {
const msg = chat?.messages.find(m => m.id === messageId);
const existing = msg?.reactions?.find(r => r.reaction === reaction);
const res = await fetch(`/api/messages/${messageId}/reactions`, {
method: existing?.user_reacted ? 'DELETE' : 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ reaction })
});
if (res.ok) await chat?.refresh();
} catch { /* stille feil */ }
await chat?.react(messageId, reaction);
},
onTogglePin: async (messageId: string, pinned: boolean) => {
// Pin lever kun i PG — direkte API-kall
try {
const res = await fetch(`/api/messages/${messageId}`, {
method: 'PATCH',
@ -133,16 +111,85 @@
prevCount = count;
});
interface ThreadGroup {
root: MessageData;
replies: MessageData[];
lastActivity: string;
}
let grouped = $derived.by(() => {
const groups: { date: string; messages: MessageData[] }[] = [];
let currentDate = '';
// 1. Bygg map for rask oppslag
const byId = new Map<string, MessageData>();
for (const msg of messages) byId.set(msg.id, msg);
// 2. Finn root for en melding (følg reply_to-kjeden)
function findRoot(msg: MessageData): string | null {
let current = msg;
const seen = new Set<string>();
while (current.reply_to) {
if (seen.has(current.reply_to)) break;
seen.add(current.reply_to);
const parent = byId.get(current.reply_to);
if (!parent) return null; // orphan — parent paginert bort
current = parent;
}
return current.id;
}
// 3. Partisjonér i tråder
const threads = new Map<string, ThreadGroup>();
const orphans: MessageData[] = [];
for (const msg of messages) {
const date = formatDate(msg.created_at);
if (!msg.reply_to) {
// Root-melding
if (!threads.has(msg.id)) {
threads.set(msg.id, { root: msg, replies: [], lastActivity: msg.created_at });
} else {
// Tråden ble opprettet av en reply først — sett root
threads.get(msg.id)!.root = msg;
}
} else {
const rootId = findRoot(msg);
if (rootId && byId.has(rootId) && rootId !== msg.id) {
if (!threads.has(rootId)) {
threads.set(rootId, { root: byId.get(rootId)!, replies: [], lastActivity: byId.get(rootId)!.created_at });
}
const thread = threads.get(rootId)!;
thread.replies.push(msg);
if (msg.created_at > thread.lastActivity) {
thread.lastActivity = msg.created_at;
}
} else {
// Orphan reply — parent ikke i meldingslisten
orphans.push(msg);
}
}
}
// 4. Sorter replies kronologisk innad i tråd
for (const thread of threads.values()) {
thread.replies.sort((a, b) => a.created_at.localeCompare(b.created_at));
}
// 5. Orphans blir standalone-tråder
for (const msg of orphans) {
threads.set(msg.id, { root: msg, replies: [], lastActivity: msg.created_at });
}
// 6. Sorter alle tråder etter lastActivity ASC (aktive flyter ned)
const sorted = [...threads.values()].sort((a, b) => a.lastActivity.localeCompare(b.lastActivity));
// 7. Datogrupper basert på lastActivity
const groups: { date: string; threads: ThreadGroup[] }[] = [];
let currentDate = '';
for (const thread of sorted) {
const date = formatDate(thread.lastActivity);
if (date !== currentDate) {
currentDate = date;
groups.push({ date, messages: [] });
groups.push({ date, threads: [] });
}
groups[groups.length - 1].messages.push(msg);
groups[groups.length - 1].threads.push(thread);
}
return groups;
});
@ -171,8 +218,15 @@
<div class="date-divider">
<span>{group.date}</span>
</div>
{#each group.messages as msg (msg.id)}
<MessageBox message={msg} mode="expanded" callbacks={chatCallbacks} />
{#each group.threads as thread (thread.root.id)}
<MessageBox message={thread.root} mode="expanded" callbacks={chatCallbacks} />
{#if thread.replies.length > 0}
<div class="thread-replies">
{#each thread.replies as reply (reply.id)}
<MessageBox message={reply} mode="expanded" callbacks={chatCallbacks} isReply={true} />
{/each}
</div>
{/if}
{/each}
{/each}
@ -244,6 +298,12 @@
border-radius: 10px;
}
.thread-replies {
margin-left: 1.5rem;
border-left: 2px solid #2d3148;
padding-left: 0.5rem;
}
.empty {
display: flex;
align-items: center;

View file

@ -6,15 +6,15 @@ import { createSpacetimeChat } from './spacetime.svelte';
/**
* Factory som velger chat-adapter basert konfigurasjon.
*
* Når VITE_SPACETIMEDB_URL er satt, brukes hybrid-adapter
* (PG for historikk + SpacetimeDB for sanntid).
* Ellers ren PG-polling.
* Når VITE_SPACETIMEDB_URL er satt, brukes SpacetimeDB-adapter
* (all data via SpacetimeDB, worker håndterer PG-synk).
* Ellers ren PG-polling som fallback.
*/
export function createChat(channelId: string, user: ChatUser): ChatConnection {
export function createChat(channelId: string, user: ChatUser, workspaceId?: string): ChatConnection {
if (browser) {
const spacetimeUrl = import.meta.env.VITE_SPACETIMEDB_URL;
if (spacetimeUrl) {
return createSpacetimeChat(channelId, spacetimeUrl, 'sidelinja-realtime', user);
if (spacetimeUrl && workspaceId) {
return createSpacetimeChat(channelId, spacetimeUrl, 'sidelinja-realtime', user, workspaceId);
}
}

View file

@ -0,0 +1,19 @@
// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE
// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD.
/* eslint-disable */
/* tslint:disable */
import {
TypeBuilder as __TypeBuilder,
t as __t,
type AlgebraicTypeType as __AlgebraicTypeType,
type Infer as __Infer,
} from "spacetimedb";
export default {
messageId: __t.string(),
workspaceId: __t.string(),
userId: __t.string(),
userName: __t.string(),
reaction: __t.string(),
};

View file

@ -0,0 +1,15 @@
// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE
// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD.
/* eslint-disable */
/* tslint:disable */
import {
TypeBuilder as __TypeBuilder,
t as __t,
type AlgebraicTypeType as __AlgebraicTypeType,
type Infer as __Infer,
} from "spacetimedb";
export default {
channelId: __t.string(),
};

View file

@ -0,0 +1,16 @@
// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE
// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD.
/* eslint-disable */
/* tslint:disable */
import {
TypeBuilder as __TypeBuilder,
t as __t,
type AlgebraicTypeType as __AlgebraicTypeType,
type Infer as __Infer,
} from "spacetimedb";
export default {
id: __t.string(),
workspaceId: __t.string(),
};

View file

@ -0,0 +1,17 @@
// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE
// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD.
/* eslint-disable */
/* tslint:disable */
import {
TypeBuilder as __TypeBuilder,
t as __t,
type AlgebraicTypeType as __AlgebraicTypeType,
type Infer as __Infer,
} from "spacetimedb";
export default {
id: __t.string(),
workspaceId: __t.string(),
newBody: __t.string(),
};

View file

@ -34,14 +34,21 @@ import {
} from "spacetimedb";
// Import all reducer arg schemas
import AddReactionReducer from "./add_reaction_reducer";
import ClearChannelReducer from "./clear_channel_reducer";
import DeleteMessageReducer from "./delete_message_reducer";
import EditMessageReducer from "./edit_message_reducer";
import LoadMessagesReducer from "./load_messages_reducer";
import LoadReactionsReducer from "./load_reactions_reducer";
import MarkSyncedReducer from "./mark_synced_reducer";
import RemoveReactionReducer from "./remove_reaction_reducer";
import SendMessageReducer from "./send_message_reducer";
// Import all procedure arg schemas
// Import all table schema definitions
import ChatMessageRow from "./chat_message_table";
import MessageReactionRow from "./message_reaction_table";
import SyncOutboxRow from "./sync_outbox_table";
/** Type-only namespace exports for generated type groups. */
@ -59,6 +66,17 @@ const tablesSchema = __schema({
{ name: 'chat_message_id_key', constraint: 'unique', columns: ['id'] },
],
}, ChatMessageRow),
message_reaction: __table({
name: 'message_reaction',
indexes: [
{ accessor: 'id', name: 'message_reaction_id_idx_btree', algorithm: 'btree', columns: [
'id',
] },
],
constraints: [
{ name: 'message_reaction_id_key', constraint: 'unique', columns: ['id'] },
],
}, MessageReactionRow),
sync_outbox: __table({
name: 'sync_outbox',
indexes: [
@ -74,8 +92,14 @@ const tablesSchema = __schema({
/** The schema information for all reducers in this module. This is defined the same way as the reducers would have been defined in the server, except the body of the reducer is omitted in code generation. */
const reducersSchema = __reducers(
__reducerSchema("add_reaction", AddReactionReducer),
__reducerSchema("clear_channel", ClearChannelReducer),
__reducerSchema("delete_message", DeleteMessageReducer),
__reducerSchema("edit_message", EditMessageReducer),
__reducerSchema("load_messages", LoadMessagesReducer),
__reducerSchema("load_reactions", LoadReactionsReducer),
__reducerSchema("mark_synced", MarkSyncedReducer),
__reducerSchema("remove_reaction", RemoveReactionReducer),
__reducerSchema("send_message", SendMessageReducer),
);
@ -132,4 +156,3 @@ export class DbConnection extends __DbConnectionImpl<typeof REMOTE_MODULE> {
return new SubscriptionBuilder(this);
};
}

View file

@ -11,5 +11,5 @@ import {
} from "spacetimedb";
export default {
messagesJson: __t.array(__t.string()),
messagesJson: __t.string(),
};

View file

@ -0,0 +1,15 @@
// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE
// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD.
/* eslint-disable */
/* tslint:disable */
import {
TypeBuilder as __TypeBuilder,
t as __t,
type AlgebraicTypeType as __AlgebraicTypeType,
type Infer as __Infer,
} from "spacetimedb";
export default {
reactionsJson: __t.string(),
};

View file

@ -0,0 +1,19 @@
// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE
// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD.
/* eslint-disable */
/* tslint:disable */
import {
TypeBuilder as __TypeBuilder,
t as __t,
type AlgebraicTypeType as __AlgebraicTypeType,
type Infer as __Infer,
} from "spacetimedb";
export default __t.row({
id: __t.u64().primaryKey(),
messageId: __t.string().name("message_id"),
userId: __t.string().name("user_id"),
userName: __t.string().name("user_name"),
reaction: __t.string(),
});

View file

@ -0,0 +1,18 @@
// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE
// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD.
/* eslint-disable */
/* tslint:disable */
import {
TypeBuilder as __TypeBuilder,
t as __t,
type AlgebraicTypeType as __AlgebraicTypeType,
type Infer as __Infer,
} from "spacetimedb";
export default {
messageId: __t.string(),
workspaceId: __t.string(),
userId: __t.string(),
reaction: __t.string(),
};

View file

@ -14,6 +14,7 @@ export default {
id: __t.string(),
channelId: __t.string(),
workspaceId: __t.string(),
authorId: __t.string(),
authorName: __t.string(),
body: __t.string(),
replyTo: __t.string(),

View file

@ -23,6 +23,15 @@ export const ChatMessage = __t.object("ChatMessage", {
});
export type ChatMessage = __Infer<typeof ChatMessage>;
export const MessageReaction = __t.object("MessageReaction", {
id: __t.u64(),
messageId: __t.string(),
userId: __t.string(),
userName: __t.string(),
reaction: __t.string(),
});
export type MessageReaction = __Infer<typeof MessageReaction>;
export const SyncOutbox = __t.object("SyncOutbox", {
id: __t.u64(),
tableName: __t.string(),
@ -33,4 +42,3 @@ export const SyncOutbox = __t.object("SyncOutbox", {
synced: __t.bool(),
});
export type SyncOutbox = __Infer<typeof SyncOutbox>;

View file

@ -6,11 +6,22 @@
import { type Infer as __Infer } from "spacetimedb";
// Import all reducer arg schemas
import AddReactionReducer from "../add_reaction_reducer";
import ClearChannelReducer from "../clear_channel_reducer";
import DeleteMessageReducer from "../delete_message_reducer";
import EditMessageReducer from "../edit_message_reducer";
import LoadMessagesReducer from "../load_messages_reducer";
import LoadReactionsReducer from "../load_reactions_reducer";
import MarkSyncedReducer from "../mark_synced_reducer";
import RemoveReactionReducer from "../remove_reaction_reducer";
import SendMessageReducer from "../send_message_reducer";
export type AddReactionParams = __Infer<typeof AddReactionReducer>;
export type ClearChannelParams = __Infer<typeof ClearChannelReducer>;
export type DeleteMessageParams = __Infer<typeof DeleteMessageReducer>;
export type EditMessageParams = __Infer<typeof EditMessageReducer>;
export type LoadMessagesParams = __Infer<typeof LoadMessagesReducer>;
export type LoadReactionsParams = __Infer<typeof LoadReactionsReducer>;
export type MarkSyncedParams = __Infer<typeof MarkSyncedReducer>;
export type RemoveReactionParams = __Infer<typeof RemoveReactionReducer>;
export type SendMessageParams = __Infer<typeof SendMessageReducer>;

View file

@ -3,8 +3,8 @@ import type { ChatConnection, MentionRef } from './types';
/**
* Chat-adapter som poller PostgreSQL via REST API.
* Brukes som fallback når SpacetimeDB ikke er tilgjengelig,
* og som referanseimplementasjon for testing.
* Brukes som fallback når SpacetimeDB ikke er tilgjengelig.
* Begrenset modus ingen sanntid, polling hvert 3. sekund.
*/
export function createPgChat(channelId: string): ChatConnection {
let messages = $state<MessageData[]>([]);
@ -66,6 +66,37 @@ export function createPgChat(channelId: string): ChatConnection {
}
}
async function edit(messageId: string, newBody: string) {
try {
const res = await fetch(`/api/messages/${messageId}`, {
method: 'PATCH',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ body: newBody })
});
if (res.ok) await refresh();
} catch { /* stille feil */ }
}
async function del(messageId: string) {
try {
await fetch(`/api/messages/${messageId}`, { method: 'DELETE' });
await refresh();
} catch { /* stille feil */ }
}
async function react(messageId: string, reaction: string) {
try {
const msg = messages.find(m => m.id === messageId);
const existing = msg?.reactions?.find(r => r.reaction === reaction);
const res = await fetch(`/api/messages/${messageId}/reactions`, {
method: existing?.user_reacted ? 'DELETE' : 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ reaction })
});
if (res.ok) await refresh();
} catch { /* stille feil */ }
}
function destroy() {
destroyed = true;
if (timer) clearInterval(timer);
@ -79,7 +110,11 @@ export function createPgChat(channelId: string): ChatConnection {
get messages() { return messages; },
get error() { return error; },
get connected() { return connected; },
get readonly() { return true; },
send,
edit,
delete: del,
react,
refresh,
destroy
};

View file

@ -1,14 +1,12 @@
import type { MessageData } from '$lib/types/message';
import type { MessageData, ReactionSummary } from '$lib/types/message';
import type { ChatConnection, ChatUser, MentionRef } from './types';
import { DbConnection, type EventContext } from './module_bindings';
/**
* Hybrid chat-adapter:
* - Henter eksisterende meldinger fra PostgreSQL via REST (som PG-adapteren)
* - Lytter nye meldinger i sanntid via SpacetimeDB WebSocket
* - Sender nye meldinger via SpacetimeDB reducer ( synkes til PG av worker)
*
* Ingen oppvarming nødvendig PG har historikken, SpacetimeDB har sanntid.
* SpacetimeDB-only chat-adapter.
* All data (historikk + sanntid) kommer fra SpacetimeDB.
* Worker håndterer warmup (PG ST) og sync (ST PG).
* Frontend snakker KUN med SpacetimeDB.
*/
export function createSpacetimeChat(
channelId: string,
@ -22,103 +20,8 @@ export function createSpacetimeChat(
let connected = $state(false);
let conn: InstanceType<typeof DbConnection> | null = null;
let destroyed = false;
const deletedIds = new Set<string>();
function toMessageData(raw: Record<string, unknown>): MessageData {
return {
id: raw.id as string,
channel_id: (raw.channel_id as string) ?? null,
reply_to: (raw.reply_to as string) ?? null,
author_id: (raw.author_id as string) ?? null,
author_name: (raw.author_name as string) ?? null,
message_type: (raw.message_type as string) ?? 'chat',
title: (raw.title as string) ?? null,
body: (raw.body as string) ?? '',
pinned: (raw.pinned as boolean) ?? false,
visibility: (raw.visibility as 'workspace' | 'private') ?? 'workspace',
created_at: raw.created_at as string,
updated_at: (raw.updated_at as string) ?? (raw.created_at as string),
reply_count: (raw.reply_count as number) ?? 0,
parent_body: (raw.parent_body as string) ?? null,
parent_author_name: (raw.parent_author_name as string) ?? null,
reactions: (raw.reactions as MessageData['reactions']) ?? [],
kanban_view: (raw.kanban_view as MessageData['kanban_view']) ?? null,
calendar_view: (raw.calendar_view as MessageData['calendar_view']) ?? null
};
}
// Hent historikk fra PG (merger med SpacetimeDB-meldinger som ikke finnes i PG)
async function loadFromPg() {
try {
const res = await fetch(`/api/channels/${channelId}/messages`);
if (!res.ok) throw new Error('Feil ved lasting');
const raw: Record<string, unknown>[] = await res.json();
const pgMessages = raw.map(toMessageData);
const pgIds = new Set(pgMessages.map(m => m.id));
// Behold SpacetimeDB-meldinger som ikke finnes i PG ennå, unntatt slettede
const spacetimeOnly = messages.filter(m => !pgIds.has(m.id) && !deletedIds.has(m.id));
messages = [...pgMessages, ...spacetimeOnly];
} catch {
error = 'Kunne ikke laste meldinger';
}
}
// Koble til SpacetimeDB for sanntidsoppdateringer
function connectRealtime() {
try {
conn = DbConnection.builder()
.withUri(spacetimeUrl)
.withDatabaseName(moduleName)
.onConnect((connection) => {
if (destroyed) return;
connected = true;
error = '';
try {
sessionStorage.setItem('spacetime_token', '');
} catch { /* SSR-safe */ }
// Abonner på meldinger for denne kanalen
connection.subscriptionBuilder()
.onError(() => {
console.error('[spacetime] subscription error');
})
.subscribe([
`SELECT * FROM chat_message WHERE channel_id = '${channelId}'`
]);
})
.onDisconnect(() => {
connected = false;
})
.onConnectError((_ctx, err) => {
console.warn('[spacetime] connection error, PG-data beholdes:', err);
// Beholder PG-data — ingen error til bruker
})
.withToken(getStoredToken() ?? '')
.build();
// Nye meldinger i sanntid
conn.db.chat_message.onInsert((ctx: EventContext, row) => {
if (destroyed) return;
if (row.channelId !== channelId) return;
if (deletedIds.has(row.id)) return;
// Dedupliser mot eksisterende
if (messages.some(m => m.id === row.id)) return;
const msg = spacetimeRowToMessage(row);
messages = [...messages, msg];
});
// Fjern meldinger som slettes i sanntid
conn.db.chat_message.onDelete((ctx: EventContext, row) => {
if (destroyed) return;
if (row.channelId !== channelId) return;
messages = messages.filter(m => m.id !== row.id);
});
} catch (e) {
console.warn('[spacetime] setup feilet, bruker kun PG:', e);
}
}
// Lokal reaksjonsstate (SpacetimeDB har message_reaction-tabellen)
let reactionMap = $state<Map<string, ReactionSummary[]>>(new Map());
function spacetimeRowToMessage(row: any): MessageData {
let createdAt: string;
@ -142,29 +45,152 @@ export function createSpacetimeChat(
visibility: 'workspace',
created_at: createdAt,
updated_at: createdAt,
reactions: reactionMap.get(row.id) ?? [],
kanban_view: null,
calendar_view: null
};
}
async function sendViaPgApi(body: string, mentions?: MentionRef[], replyTo?: string) {
const res = await fetch(`/api/channels/${channelId}/messages`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ body, mentions, replyTo: replyTo ?? null })
});
if (!res.ok) throw new Error('Feil ved sending');
await loadFromPg();
function rebuildReactions() {
// Bygg reaksjonsaggregat fra message_reaction-tabellen
if (!conn) return;
const newMap = new Map<string, ReactionSummary[]>();
for (const r of conn.db.message_reaction.iter()) {
const msgId = r.messageId;
if (!newMap.has(msgId)) newMap.set(msgId, []);
const summaries = newMap.get(msgId)!;
const existing = summaries.find(s => s.reaction === r.reaction);
if (existing) {
existing.count++;
if (r.userId === user.id) existing.user_reacted = true;
} else {
summaries.push({
reaction: r.reaction,
count: 1,
user_reacted: r.userId === user.id
});
}
}
reactionMap = newMap;
}
async function send(body: string, mentions?: MentionRef[], replyTo?: string) {
// Fallback til PG API hvis SpacetimeDB er nede
if (!conn || !connected) {
try {
await sendViaPgApi(body, mentions, replyTo);
} catch {
error = 'Kunne ikke sende melding';
function rebuildMessages() {
if (!conn) return;
const rows: any[] = [];
for (const row of conn.db.chat_message.iter()) {
if (row.channelId === channelId) {
rows.push(row);
}
}
// Sorter kronologisk
rows.sort((a, b) => {
const aMs = extractMs(a.createdAt);
const bMs = extractMs(b.createdAt);
return aMs - bMs;
});
messages = rows.map(r => spacetimeRowToMessage(r));
}
function extractMs(ts: any): number {
try {
const micros = ts?.microsSinceEpoch;
return typeof micros === 'bigint' ? Number(micros / 1000n) : Number(micros) / 1000;
} catch {
return 0;
}
}
function connectRealtime() {
try {
conn = DbConnection.builder()
.withUri(spacetimeUrl)
.withDatabaseName(moduleName)
.onConnect((connection) => {
if (destroyed) return;
connected = true;
error = '';
try {
sessionStorage.setItem('spacetime_token', '');
} catch { /* SSR-safe */ }
// Abonner på meldinger + reaksjoner for denne kanalen
connection.subscriptionBuilder()
.onApplied(() => {
// Initialload — bygg state fra subscription
rebuildReactions();
rebuildMessages();
})
.onError(() => {
console.error('[spacetime] subscription error');
})
.subscribe([
`SELECT * FROM chat_message WHERE channel_id = '${channelId}'`,
`SELECT mr.* FROM message_reaction mr JOIN chat_message cm ON cm.id = mr.message_id WHERE cm.channel_id = '${channelId}'`
]);
})
.onDisconnect(() => {
connected = false;
})
.onConnectError((_ctx, err) => {
console.warn('[spacetime] connection error:', err);
error = 'Tilkobling til sanntidstjeneste feilet';
})
.withToken(getStoredToken() ?? '')
.build();
// Nye meldinger i sanntid
conn.db.chat_message.onInsert((_ctx: EventContext, row) => {
if (destroyed || row.channelId !== channelId) return;
if (messages.some(m => m.id === row.id)) return;
const msg = spacetimeRowToMessage(row);
messages = [...messages, msg];
});
// Meldinger oppdatert (edit)
conn.db.chat_message.onUpdate((_ctx: EventContext, _oldRow, newRow) => {
if (destroyed || newRow.channelId !== channelId) return;
messages = messages.map(m =>
m.id === newRow.id ? { ...m, body: newRow.body } : m
);
});
// Meldinger slettet
conn.db.chat_message.onDelete((_ctx: EventContext, row) => {
if (destroyed || row.channelId !== channelId) return;
messages = messages.filter(m => m.id !== row.id);
});
// Reaksjoner — rebuild ved endring
conn.db.message_reaction.onInsert((_ctx: EventContext, _row) => {
if (destroyed) return;
rebuildReactions();
// Oppdater reactions i messages
messages = messages.map(m => ({
...m,
reactions: reactionMap.get(m.id) ?? []
}));
});
conn.db.message_reaction.onDelete((_ctx: EventContext, _row) => {
if (destroyed) return;
rebuildReactions();
messages = messages.map(m => ({
...m,
reactions: reactionMap.get(m.id) ?? []
}));
});
} catch (e) {
console.warn('[spacetime] setup feilet:', e);
error = 'Kunne ikke koble til sanntidstjeneste';
}
}
async function send(body: string, _mentions?: MentionRef[], replyTo?: string) {
if (!conn || !connected) {
error = 'Ikke tilkoblet';
return;
}
try {
@ -178,12 +204,64 @@ export function createSpacetimeChat(
body,
replyTo: replyTo ?? ''
});
// Ingen reload — onInsert-callback viser meldingen instant
} catch {
error = 'Kunne ikke sende melding';
}
}
async function edit(messageId: string, newBody: string) {
if (!conn || !connected) return;
try {
conn.reducers.editMessage({
id: messageId,
workspaceId,
newBody
});
} catch {
error = 'Kunne ikke redigere melding';
}
}
async function del(messageId: string) {
if (!conn || !connected) return;
try {
conn.reducers.deleteMessage({
id: messageId,
workspaceId
});
} catch {
error = 'Kunne ikke slette melding';
}
}
async function react(messageId: string, reaction: string) {
if (!conn || !connected) return;
try {
// Sjekk om bruker allerede har reagert med denne
const existing = reactionMap.get(messageId)?.find(
r => r.reaction === reaction && r.user_reacted
);
if (existing) {
conn.reducers.removeReaction({
messageId,
workspaceId,
userId: user.id,
reaction
});
} else {
conn.reducers.addReaction({
messageId,
workspaceId,
userId: user.id,
userName: user.name,
reaction
});
}
} catch {
error = 'Kunne ikke legge til reaksjon';
}
}
function destroy() {
destroyed = true;
if (conn) {
@ -200,38 +278,19 @@ export function createSpacetimeChat(
}
}
// Start begge deler parallelt
loadFromPg();
// Start tilkobling
connectRealtime();
function updateLocal(messageId: string, newBody: string) {
messages = messages.map(m =>
m.id === messageId ? { ...m, body: newBody } : m
);
}
function removeLocal(messageId: string) {
deletedIds.add(messageId);
messages = messages.filter(m => m.id !== messageId);
}
function deleteFromSpacetime(messageId: string) {
if (conn && connected) {
try {
conn.reducers.deleteMessage({ id: messageId });
} catch { /* SpacetimeDB nede — ignoreres */ }
}
}
return {
get messages() { return messages; },
get error() { return error; },
get connected() { return connected; },
get readonly() { return false; },
send,
refresh: loadFromPg,
updateLocal,
removeLocal,
deleteFromSpacetime,
edit,
delete: del,
react,
refresh: async () => { rebuildMessages(); },
destroy
};
}

View file

@ -26,10 +26,11 @@ export interface ChatConnection {
readonly messages: MessageData[];
readonly error: string;
readonly connected: boolean;
readonly readonly: boolean;
send(body: string, mentions?: MentionRef[], replyTo?: string): Promise<void>;
edit(messageId: string, newBody: string): Promise<void>;
delete(messageId: string): Promise<void>;
react(messageId: string, reaction: string): Promise<void>;
refresh(): Promise<void>;
removeLocal?(messageId: string): void;
updateLocal?(messageId: string, newBody: string): void;
deleteFromSpacetime?(messageId: string): void;
destroy(): void;
}

13
worker/Cargo.lock generated
View file

@ -1407,6 +1407,18 @@ dependencies = [
"bitflags",
]
[[package]]
name = "regex"
version = "1.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e10754a14b9137dd7b1e3e5b0493cc9171fdd105e0ab477f51b72e7f3ac0e276"
dependencies = [
"aho-corasick",
"memchr",
"regex-automata",
"regex-syntax",
]
[[package]]
name = "regex-automata"
version = "0.4.14"
@ -1701,6 +1713,7 @@ dependencies = [
"async-trait",
"chrono",
"clap",
"regex",
"reqwest",
"serde",
"serde_json",

View file

@ -16,3 +16,4 @@ tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] }
async-trait = "0.1"
anyhow = "1"
regex = "1"

View file

@ -1,9 +1,11 @@
use clap::Parser;
use sqlx::postgres::PgPoolOptions;
use std::sync::Arc;
use tracing::info;
use tracing::{info, warn};
mod handlers;
mod sync;
mod warmup;
mod worker;
#[derive(Parser)]
@ -32,6 +34,22 @@ struct Cli {
/// Polling-intervall i sekunder
#[arg(long, default_value = "1")]
poll_interval: u64,
/// SpacetimeDB URL
#[arg(long, env = "SPACETIMEDB_URL", default_value = "http://localhost:3000")]
spacetimedb_url: String,
/// SpacetimeDB modulnavn
#[arg(long, env = "SPACETIMEDB_MODULE", default_value = "sidelinja-realtime")]
spacetimedb_module: String,
/// Sync-intervall i sekunder (SpacetimeDB → PG)
#[arg(long, default_value = "1")]
sync_interval: u64,
/// Maks meldinger per kanal ved oppvarming
#[arg(long, default_value = "100")]
warmup_limit: i64,
}
#[tokio::main]
@ -67,5 +85,26 @@ async fn main() -> anyhow::Result<()> {
let registered: Vec<&str> = registry.keys().map(|k| k.as_str()).collect();
info!(?registered, "Registrerte jobbtyper");
// Oppvarming: last PG-data inn i SpacetimeDB
let http = reqwest::Client::new();
if let Err(e) = warmup::run(
&pool,
&http,
&cli.spacetimedb_url,
&cli.spacetimedb_module,
cli.warmup_limit,
).await {
warn!(error = %e, "Oppvarming feilet — fortsetter uten historikk i SpacetimeDB");
}
// Spawn sync-worker som parallell task
let sync_pool = pool.clone();
let spacetimedb_url = cli.spacetimedb_url.clone();
let spacetimedb_module = cli.spacetimedb_module.clone();
let sync_interval = cli.sync_interval;
tokio::spawn(async move {
sync::run(sync_pool, http, spacetimedb_url, spacetimedb_module, sync_interval).await;
});
worker::run(pool, registry, cli.max_concurrent, cli.poll_interval).await
}

340
worker/src/sync.rs Normal file
View file

@ -0,0 +1,340 @@
use regex::Regex;
use reqwest::Client;
use serde::Deserialize;
use sqlx::PgPool;
use tracing::{info, warn};
/// SpacetimeDB v2 HTTP SQL-respons (array av result-objekter)
#[derive(Deserialize)]
struct SqlResultEntry {
rows: Option<Vec<Vec<serde_json::Value>>>,
}
/// Parsed SyncOutbox-entry
struct SyncEntry {
id: u64,
table_name: String,
action: String,
payload: String,
}
/// Payload for en chat-melding (insert)
#[derive(Deserialize)]
struct MessagePayload {
id: String,
channel_id: String,
workspace_id: String,
author_id: String,
body: String,
reply_to: String,
}
/// Payload for meldings-oppdatering
#[derive(Deserialize)]
struct MessageUpdatePayload {
id: String,
body: String,
}
/// Payload for meldings-sletting
#[derive(Deserialize)]
struct MessageDeletePayload {
id: String,
}
/// Payload for reaksjon
#[derive(Deserialize)]
struct ReactionPayload {
message_id: String,
user_id: String,
reaction: String,
}
pub async fn run(
pool: PgPool,
http: Client,
spacetimedb_url: String,
module: String,
interval_secs: u64,
) {
info!(
spacetimedb_url = %spacetimedb_url,
module = %module,
interval_secs = interval_secs,
"Starter sync-worker (SpacetimeDB → PG)"
);
let mention_re = Regex::new(r#"data-id="([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})""#)
.expect("ugyldig regex");
loop {
if let Err(e) = sync_batch(&pool, &http, &spacetimedb_url, &module, &mention_re).await {
warn!(error = %e, "Sync-batch feilet");
}
tokio::time::sleep(std::time::Duration::from_secs(interval_secs)).await;
}
}
async fn sync_batch(
pool: &PgPool,
http: &Client,
base_url: &str,
module: &str,
mention_re: &Regex,
) -> anyhow::Result<()> {
// 1. Poll SyncOutbox via HTTP SQL (SpacetimeDB v2 API)
let sql_url = format!("{}/v1/database/{}/sql", base_url, module);
let query = "SELECT id, table_name, action, payload FROM sync_outbox WHERE synced = false";
let resp = http
.post(&sql_url)
.header("Content-Type", "text/plain")
.body(query)
.send()
.await?;
if !resp.status().is_success() {
let status = resp.status();
let body = resp.text().await.unwrap_or_default();
anyhow::bail!("SpacetimeDB SQL-feil ({}): {}", status, body);
}
// v2 returnerer en array av result-objekter
let results: Vec<SqlResultEntry> = resp.json().await?;
let rows = match results.into_iter().next().and_then(|r| r.rows) {
Some(r) if !r.is_empty() => r,
_ => return Ok(()), // Ingen usynkede events
};
// Parse entries
let entries: Vec<SyncEntry> = rows
.into_iter()
.filter_map(|row| {
if row.len() < 4 {
return None;
}
Some(SyncEntry {
id: row[0].as_u64()?,
table_name: row[1].as_str()?.to_string(),
action: row[2].as_str()?.to_string(),
payload: row[3].as_str()?.to_string(),
})
})
.collect();
if entries.is_empty() {
return Ok(());
}
info!(count = entries.len(), "Synker batch fra SpacetimeDB");
let mut synced_ids: Vec<u64> = Vec::new();
// 2. Prosesser hvert event
for entry in &entries {
let result = match (entry.table_name.as_str(), entry.action.as_str()) {
("messages", "insert") => process_message_insert(pool, &entry.payload, mention_re).await,
("messages", "delete") => process_message_delete(pool, &entry.payload).await,
("messages", "update") => process_message_update(pool, &entry.payload).await,
("message_reactions", "insert") => process_reaction_insert(pool, &entry.payload).await,
("message_reactions", "delete") => process_reaction_delete(pool, &entry.payload).await,
_ => {
warn!(
table = %entry.table_name,
action = %entry.action,
"Ukjent sync-event-type, markerer som synket"
);
Ok(())
}
};
match result {
Ok(()) => synced_ids.push(entry.id),
Err(e) => {
warn!(
entry_id = entry.id,
table = %entry.table_name,
action = %entry.action,
error = %e,
"Feil ved synking, hopper over"
);
}
}
}
// 3. Marker som synket via reducer
if !synced_ids.is_empty() {
mark_synced(http, base_url, module, &synced_ids).await?;
info!(count = synced_ids.len(), "Markert som synket");
}
Ok(())
}
async fn process_message_insert(
pool: &PgPool,
payload_json: &str,
mention_re: &Regex,
) -> anyhow::Result<()> {
let msg: MessagePayload = serde_json::from_str(payload_json)?;
let mut tx = pool.begin().await?;
// Insert node
sqlx::query(
"INSERT INTO nodes (id, workspace_id, node_type) VALUES ($1::uuid, $2::uuid, 'melding') ON CONFLICT (id) DO NOTHING"
)
.bind(&msg.id)
.bind(&msg.workspace_id)
.execute(&mut *tx)
.await?;
// Insert message
let reply_to: Option<&str> = if msg.reply_to.is_empty() { None } else { Some(&msg.reply_to) };
sqlx::query(
"INSERT INTO messages (id, channel_id, author_id, body, reply_to) VALUES ($1::uuid, $2::uuid, $3, $4, $5::uuid) ON CONFLICT (id) DO NOTHING"
)
.bind(&msg.id)
.bind(&msg.channel_id)
.bind(&msg.author_id)
.bind(&msg.body)
.bind(reply_to)
.execute(&mut *tx)
.await?;
// Ekstraher mention-UUIDs fra HTML body
for cap in mention_re.captures_iter(&msg.body) {
let mention_id = &cap[1];
let exists: bool = sqlx::query_scalar(
"SELECT EXISTS(SELECT 1 FROM nodes WHERE id = $1::uuid AND workspace_id = $2::uuid)"
)
.bind(mention_id)
.bind(&msg.workspace_id)
.fetch_one(&mut *tx)
.await?;
if exists {
sqlx::query(
"INSERT INTO graph_edges (workspace_id, source_id, target_id, relation_type, created_by, origin) VALUES ($1::uuid, $2::uuid, $3::uuid, 'MENTIONS', $4, 'system') ON CONFLICT (source_id, target_id, relation_type) DO NOTHING"
)
.bind(&msg.workspace_id)
.bind(&msg.id)
.bind(mention_id)
.bind(&msg.author_id)
.execute(&mut *tx)
.await?;
}
}
tx.commit().await?;
Ok(())
}
async fn process_message_delete(pool: &PgPool, payload_json: &str) -> anyhow::Result<()> {
let payload: MessageDeletePayload = serde_json::from_str(payload_json)?;
let mut tx = pool.begin().await?;
// Slett reaksjoner
sqlx::query("DELETE FROM message_reactions WHERE message_id = $1::uuid")
.bind(&payload.id)
.execute(&mut *tx)
.await?;
// Slett graph_edges der meldingen er source
sqlx::query("DELETE FROM graph_edges WHERE source_id = $1::uuid")
.bind(&payload.id)
.execute(&mut *tx)
.await?;
// Slett melding
sqlx::query("DELETE FROM messages WHERE id = $1::uuid")
.bind(&payload.id)
.execute(&mut *tx)
.await?;
// Slett node
sqlx::query("DELETE FROM nodes WHERE id = $1::uuid")
.bind(&payload.id)
.execute(&mut *tx)
.await?;
tx.commit().await?;
info!(id = %payload.id, "Melding slettet fra PG");
Ok(())
}
async fn process_message_update(pool: &PgPool, payload_json: &str) -> anyhow::Result<()> {
let payload: MessageUpdatePayload = serde_json::from_str(payload_json)?;
sqlx::query("UPDATE messages SET body = $1, edited_at = now() WHERE id = $2::uuid")
.bind(&payload.body)
.bind(&payload.id)
.execute(pool)
.await?;
info!(id = %payload.id, "Melding oppdatert i PG");
Ok(())
}
async fn process_reaction_insert(pool: &PgPool, payload_json: &str) -> anyhow::Result<()> {
let payload: ReactionPayload = serde_json::from_str(payload_json)?;
// Fjern eksisterende reaksjon fra denne brukeren (én per bruker per melding)
sqlx::query("DELETE FROM message_reactions WHERE message_id = $1::uuid AND user_id = $2")
.bind(&payload.message_id)
.bind(&payload.user_id)
.execute(pool)
.await?;
sqlx::query(
"INSERT INTO message_reactions (message_id, user_id, reaction) VALUES ($1::uuid, $2, $3)"
)
.bind(&payload.message_id)
.bind(&payload.user_id)
.bind(&payload.reaction)
.execute(pool)
.await?;
Ok(())
}
async fn process_reaction_delete(pool: &PgPool, payload_json: &str) -> anyhow::Result<()> {
let payload: ReactionPayload = serde_json::from_str(payload_json)?;
sqlx::query(
"DELETE FROM message_reactions WHERE message_id = $1::uuid AND user_id = $2 AND reaction = $3"
)
.bind(&payload.message_id)
.bind(&payload.user_id)
.bind(&payload.reaction)
.execute(pool)
.await?;
Ok(())
}
async fn mark_synced(
http: &Client,
base_url: &str,
module: &str,
ids: &[u64],
) -> anyhow::Result<()> {
let url = format!("{}/v1/database/{}/call/mark_synced", base_url, module);
let body = serde_json::json!({ "ids": ids });
let resp = http
.post(&url)
.json(&body)
.send()
.await?;
if !resp.status().is_success() {
let status = resp.status();
let body = resp.text().await.unwrap_or_default();
anyhow::bail!("mark_synced feilet ({}): {}", status, body);
}
Ok(())
}

168
worker/src/warmup.rs Normal file
View file

@ -0,0 +1,168 @@
use reqwest::Client;
use sqlx::PgPool;
use tracing::{info, warn};
/// Oppvarming: les siste N meldinger per aktive kanal fra PG og last inn i SpacetimeDB.
pub async fn run(
pool: &PgPool,
http: &Client,
spacetimedb_url: &str,
module: &str,
limit: i64,
) -> anyhow::Result<()> {
info!(limit, "Starter oppvarming (PG → SpacetimeDB)");
// Finn aktive kanaler (kanaler med meldinger)
let channels: Vec<(String,)> = sqlx::query_as(
"SELECT DISTINCT channel_id::text FROM messages WHERE channel_id IS NOT NULL"
)
.fetch_all(pool)
.await?;
if channels.is_empty() {
info!("Ingen aktive kanaler funnet — oppvarming fullført");
return Ok(());
}
info!(channels = channels.len(), "Aktive kanaler funnet");
let mut total_messages = 0u64;
let mut total_reactions = 0u64;
for (channel_id,) in &channels {
// Rydd kanalen i SpacetimeDB først for å unngå duplikater
if let Err(e) = call_reducer(http, spacetimedb_url, module, "clear_channel", &serde_json::json!({
"channel_id": channel_id
})).await {
warn!(channel_id, error = %e, "Kunne ikke rydde kanal — hopper over");
continue;
}
// Hent meldinger med forfatterinfo
let rows: Vec<(String, String, String, String, String, String, String, Option<String>, String)> = sqlx::query_as(
r#"
SELECT
m.id::text,
m.channel_id::text,
n.workspace_id::text,
COALESCE(m.author_id, ''),
COALESCE(u.name, 'Ukjent'),
COALESCE(m.body, ''),
COALESCE(m.message_type, 'text'),
m.reply_to::text,
m.created_at::text
FROM messages m
JOIN nodes n ON n.id = m.id
LEFT JOIN users u ON u.authentik_id = m.author_id
WHERE m.channel_id = $1::uuid
ORDER BY m.created_at DESC
LIMIT $2
"#
)
.bind(channel_id)
.bind(limit)
.fetch_all(pool)
.await?;
if rows.is_empty() { continue; }
// Bygg JSON-array
let messages: Vec<serde_json::Value> = rows.iter().map(|r| {
serde_json::json!({
"id": r.0,
"channel_id": r.1,
"workspace_id": r.2,
"author_id": r.3,
"author_name": r.4,
"body": r.5,
"message_type": r.6,
"reply_to": r.7.as_deref().unwrap_or(""),
"created_at": r.8
})
}).collect();
let count = messages.len();
let json_str = serde_json::to_string(&messages)?;
if let Err(e) = call_reducer(http, spacetimedb_url, module, "load_messages", &serde_json::json!({
"messages_json": json_str
})).await {
warn!(channel_id, error = %e, "Feil ved lasting av meldinger");
continue;
}
total_messages += count as u64;
// Hent reaksjoner for denne kanalens meldinger
let reaction_rows: Vec<(String, String, String, String)> = sqlx::query_as(
r#"
SELECT
mr.message_id::text,
COALESCE(mr.user_id, ''),
COALESCE(u.name, 'Ukjent'),
mr.reaction
FROM message_reactions mr
JOIN messages m ON m.id = mr.message_id
LEFT JOIN users u ON u.authentik_id = mr.user_id
WHERE m.channel_id = $1::uuid
"#
)
.bind(channel_id)
.fetch_all(pool)
.await?;
if !reaction_rows.is_empty() {
let reactions: Vec<serde_json::Value> = reaction_rows.iter().map(|r| {
serde_json::json!({
"message_id": r.0,
"user_id": r.1,
"user_name": r.2,
"reaction": r.3
})
}).collect();
let reactions_json = serde_json::to_string(&reactions)?;
if let Err(e) = call_reducer(http, spacetimedb_url, module, "load_reactions", &serde_json::json!({
"reactions_json": reactions_json
})).await {
warn!(channel_id, error = %e, "Feil ved lasting av reaksjoner");
} else {
total_reactions += reaction_rows.len() as u64;
}
}
info!(channel_id, messages = count, reactions = reaction_rows.len(), "Kanal oppvarmet");
}
info!(
channels = channels.len(),
messages = total_messages,
reactions = total_reactions,
"Oppvarming fullført"
);
Ok(())
}
async fn call_reducer(
http: &Client,
base_url: &str,
module: &str,
reducer: &str,
args: &serde_json::Value,
) -> anyhow::Result<()> {
let url = format!("{}/v1/database/{}/call/{}", base_url, module, reducer);
let resp = http
.post(&url)
.json(args)
.send()
.await?;
if !resp.status().is_success() {
let status = resp.status();
let body = resp.text().await.unwrap_or_default();
anyhow::bail!("{} feilet ({}): {}", reducer, status, body);
}
Ok(())
}