From af74749bed651484d2221cfef551475c26c3ef17 Mon Sep 17 00:00:00 2001 From: vegard Date: Mon, 16 Mar 2026 17:38:44 +0100 Subject: [PATCH] SpacetimeDB som eneste datakilde: fjern alle PG-kall fra frontend-adapter MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Utvider SpacetimeDB-modulen med metadata, edited_at og MessageRevision-tabell. Worker skriver via reducers (set_ai_processing, ai_update_message) i stedet for direkte PG-skriving. Frontend-adapteren leser alt fra SpacetimeDB — null fetch()-kall. Co-Authored-By: Claude Opus 4.6 --- spacetimedb/Cargo.lock | 1 + spacetimedb/Cargo.toml | 1 + spacetimedb/src/lib.rs | 185 +++++++++++++++++- web/src/lib/blocks/ChatBlock.svelte | 14 +- .../ai_update_message_reducer.ts | 19 ++ .../module_bindings/chat_message_table.ts | 2 + .../clear_ai_processing_reducer.ts | 16 ++ web/src/lib/chat/module_bindings/index.ts | 20 ++ .../module_bindings/load_revisions_reducer.ts | 15 ++ .../module_bindings/message_revision_table.ts | 18 ++ .../set_ai_processing_reducer.ts | 16 ++ web/src/lib/chat/module_bindings/types.ts | 10 + .../chat/module_bindings/types/reducers.ts | 8 + web/src/lib/chat/spacetime.svelte.ts | 83 ++++---- web/src/lib/chat/types.ts | 7 + web/src/lib/components/MessageBox.svelte | 14 +- web/src/lib/types/message.ts | 1 + worker/src/handlers/ai_text_process.rs | 126 ++++-------- worker/src/sync.rs | 61 +++++- worker/src/warmup.rs | 50 ++++- 20 files changed, 514 insertions(+), 153 deletions(-) create mode 100644 web/src/lib/chat/module_bindings/ai_update_message_reducer.ts create mode 100644 web/src/lib/chat/module_bindings/clear_ai_processing_reducer.ts create mode 100644 web/src/lib/chat/module_bindings/load_revisions_reducer.ts create mode 100644 web/src/lib/chat/module_bindings/message_revision_table.ts create mode 100644 web/src/lib/chat/module_bindings/set_ai_processing_reducer.ts diff --git a/spacetimedb/Cargo.lock b/spacetimedb/Cargo.lock index 5239106..a97807c 100644 --- a/spacetimedb/Cargo.lock +++ b/spacetimedb/Cargo.lock @@ -583,6 +583,7 @@ checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" name = "sidelinja-realtime" version = "0.1.0" dependencies = [ + "chrono", "log", "serde_json", "spacetimedb", diff --git a/spacetimedb/Cargo.toml b/spacetimedb/Cargo.toml index 7856026..3323c2c 100644 --- a/spacetimedb/Cargo.toml +++ b/spacetimedb/Cargo.toml @@ -10,3 +10,4 @@ crate-type = ["cdylib"] spacetimedb = "1.0" serde_json = "1" log = "0.4" +chrono = { version = "0.4", default-features = false, features = ["std"] } diff --git a/spacetimedb/src/lib.rs b/spacetimedb/src/lib.rs index cbf763f..748970e 100644 --- a/spacetimedb/src/lib.rs +++ b/spacetimedb/src/lib.rs @@ -17,6 +17,21 @@ pub struct ChatMessage { pub message_type: String, pub reply_to: String, pub created_at: Timestamp, + /// JSON-streng med metadata (ai_processed, ai_action, etc.). Tom = ingen metadata. + pub metadata: String, + /// ISO-tidsstempel for siste redigering. Tom = aldri redigert. + pub edited_at: String, +} + +/// Revisjonshistorikk for meldinger. Lagrer tidligere versjoner av body. +#[table(name = message_revision, public)] +pub struct MessageRevision { + #[auto_inc] + #[primary_key] + pub id: u64, + pub message_id: String, + pub body: String, + pub edited_at: String, } /// Reaksjon på en melding. Speiler PostgreSQL `message_reactions`. @@ -85,6 +100,8 @@ pub fn send_message( message_type: "text".to_string(), reply_to, created_at: ctx.timestamp, + metadata: String::new(), + edited_at: String::new(), }; ctx.db.chat_message().insert(msg); @@ -134,7 +151,7 @@ pub fn delete_message(ctx: &ReducerContext, id: String, workspace_id: String) -> Ok(()) } -/// Rediger en melding. Oppdaterer body + legger i SyncOutbox. +/// Rediger en melding. Lagrer revisjon, oppdaterer body + legger i SyncOutbox. #[reducer] pub fn edit_message( ctx: &ReducerContext, @@ -147,12 +164,27 @@ pub fn edit_message( } if let Some(mut msg) = ctx.db.chat_message().id().find(&id) { + // Lagre gammel body som revisjon + let now = format_timestamp(ctx.timestamp); + ctx.db.message_revision().insert(MessageRevision { + id: 0, + message_id: id.clone(), + body: msg.body.clone(), + edited_at: now.clone(), + }); + msg.body = new_body.trim().to_string(); + let edited_at = now; + msg.edited_at = edited_at.clone(); + let metadata_for_payload = msg.metadata.clone(); + // Bevar eksisterende metadata ctx.db.chat_message().id().update(msg); let payload = serde_json::json!({ "id": id, - "body": new_body.trim() + "body": new_body.trim(), + "metadata": metadata_for_payload, + "edited_at": edited_at }).to_string(); ctx.db.sync_outbox().insert(SyncOutbox { @@ -172,6 +204,132 @@ pub fn edit_message( } } +/// Sett ai_processing-flagg på en melding. Transient state — ingen outbox. +#[reducer] +pub fn set_ai_processing( + ctx: &ReducerContext, + id: String, + _workspace_id: String, +) -> Result<(), String> { + if let Some(mut msg) = ctx.db.chat_message().id().find(&id) { + // Merger ai_processing: true inn i metadata + let mut meta: serde_json::Value = if msg.metadata.is_empty() { + serde_json::json!({}) + } else { + serde_json::from_str(&msg.metadata).unwrap_or(serde_json::json!({})) + }; + meta["ai_processing"] = serde_json::json!(true); + msg.metadata = meta.to_string(); + ctx.db.chat_message().id().update(msg); + log::info!("AI-prosessering startet: {}", id); + Ok(()) + } else { + Err("Melding ikke funnet".to_string()) + } +} + +/// Fjern ai_processing-flagg (brukes ved feil). +#[reducer] +pub fn clear_ai_processing( + ctx: &ReducerContext, + id: String, + _workspace_id: String, +) -> Result<(), String> { + if let Some(mut msg) = ctx.db.chat_message().id().find(&id) { + if !msg.metadata.is_empty() { + if let Ok(mut meta) = serde_json::from_str::(&msg.metadata) { + if let Some(obj) = meta.as_object_mut() { + obj.remove("ai_processing"); + msg.metadata = if obj.is_empty() { String::new() } else { serde_json::to_string(&meta).unwrap_or_default() }; + } + } + } + ctx.db.chat_message().id().update(msg); + log::info!("AI-prosessering ryddet: {}", id); + Ok(()) + } else { + Err("Melding ikke funnet".to_string()) + } +} + +/// AI-oppdatering av melding. Lagrer revisjon, oppdaterer body/metadata/edited_at. +/// Outbox-entry med action "ai_update" for sync til PG. +#[reducer] +pub fn ai_update_message( + ctx: &ReducerContext, + id: String, + workspace_id: String, + new_body: String, + metadata: String, + edited_at: String, +) -> Result<(), String> { + if let Some(mut msg) = ctx.db.chat_message().id().find(&id) { + let old_body = msg.body.clone(); + + // Lagre gammel body som revisjon + ctx.db.message_revision().insert(MessageRevision { + id: 0, + message_id: id.clone(), + body: old_body.clone(), + edited_at: edited_at.clone(), + }); + + msg.body = new_body.clone(); + msg.metadata = metadata.clone(); + msg.edited_at = edited_at.clone(); + ctx.db.chat_message().id().update(msg); + + let payload = serde_json::json!({ + "id": id, + "body": new_body, + "metadata": metadata, + "edited_at": edited_at, + "revision_body": old_body + }).to_string(); + + ctx.db.sync_outbox().insert(SyncOutbox { + id: 0, + table_name: "messages".to_string(), + action: "ai_update".to_string(), + payload, + workspace_id, + created_at: ctx.timestamp, + synced: false, + }); + + log::info!("AI-oppdatering fullført: {}", id); + Ok(()) + } else { + Err("Melding ikke funnet".to_string()) + } +} + +/// Laster revisjoner fra PostgreSQL ved oppvarming. +#[reducer] +pub fn load_revisions( + ctx: &ReducerContext, + revisions_json: String, +) -> Result<(), String> { + let items: Vec = serde_json::from_str(&revisions_json) + .map_err(|e| format!("Ugyldig JSON: {}", e))?; + + let count = items.len(); + for item in items { + let message_id = item["message_id"].as_str().unwrap_or_default().to_string(); + if message_id.is_empty() { continue; } + + ctx.db.message_revision().insert(MessageRevision { + id: 0, + message_id, + body: item["body"].as_str().unwrap_or_default().to_string(), + edited_at: item["edited_at"].as_str().unwrap_or_default().to_string(), + }); + } + + log::info!("Revisjoner lastet: {}", count); + Ok(()) +} + /// Legg til reaksjon. Én reaksjon per bruker per melding (erstatter tidligere). #[reducer] pub fn add_reaction( @@ -285,6 +443,8 @@ pub fn load_messages( message_type: item["message_type"].as_str().unwrap_or("text").to_string(), reply_to: item["reply_to"].as_str().unwrap_or_default().to_string(), created_at: ctx.timestamp, + metadata: item["metadata"].as_str().unwrap_or_default().to_string(), + edited_at: item["edited_at"].as_str().unwrap_or_default().to_string(), }); } @@ -330,6 +490,14 @@ pub fn clear_channel(ctx: &ReducerContext, channel_id: String) -> Result<(), Str let msg_ids: Vec = messages.iter().map(|m| m.id.clone()).collect(); + // Slett revisjoner for disse meldingene + let revisions: Vec<_> = ctx.db.message_revision().iter() + .filter(|r| msg_ids.contains(&r.message_id)) + .collect(); + for r in revisions { + ctx.db.message_revision().id().delete(&r.id); + } + // Slett reaksjoner for disse meldingene let reactions: Vec<_> = ctx.db.message_reaction().iter() .filter(|r| msg_ids.contains(&r.message_id)) @@ -361,6 +529,19 @@ pub fn mark_synced(ctx: &ReducerContext, ids: Vec) -> Result<(), String> { Ok(()) } +// === Hjelpefunksjoner === + +/// Formater en SpacetimeDB Timestamp til ISO 8601-streng. +fn format_timestamp(ts: Timestamp) -> String { + let duration = ts.to_duration_since_unix_epoch().unwrap_or_default(); + let micros = duration.as_micros() as u64; + let secs = micros / 1_000_000; + let nanos = ((micros % 1_000_000) * 1000) as u32; + let dt = chrono::DateTime::from_timestamp(secs as i64, nanos) + .unwrap_or_default(); + dt.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string() +} + // === Livssyklus === #[reducer(client_connected)] diff --git a/web/src/lib/blocks/ChatBlock.svelte b/web/src/lib/blocks/ChatBlock.svelte index 84efa40..e31f96d 100644 --- a/web/src/lib/blocks/ChatBlock.svelte +++ b/web/src/lib/blocks/ChatBlock.svelte @@ -63,18 +63,12 @@ }); if (!res.ok) return; const { job_id } = await res.json(); - // Sett lokal processing-state umiddelbart (via array-reassign for reaktivitet) - if (chat) { - const msgs = chat.messages; - const idx = msgs.findIndex(m => m.id === messageId); - if (idx >= 0) { - msgs[idx] = { ...msgs[idx], metadata: { ...msgs[idx].metadata, ai_processing: true } }; - } - } - // Poll jobb-status + // Worker kaller set_ai_processing reducer → SpacetimeDB oppdaterer frontend automatisk + // Poll jobb-status som fallback for feilhåndtering pollJob(job_id, messageId); } catch { /* stille feil */ } - } + }, + getRevisions: (messageId: string) => chat?.getRevisions?.(messageId) ?? [] }; function pollJob(jobId: string, messageId?: string) { diff --git a/web/src/lib/chat/module_bindings/ai_update_message_reducer.ts b/web/src/lib/chat/module_bindings/ai_update_message_reducer.ts new file mode 100644 index 0000000..5344d22 --- /dev/null +++ b/web/src/lib/chat/module_bindings/ai_update_message_reducer.ts @@ -0,0 +1,19 @@ +// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE +// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD. + +/* eslint-disable */ +/* tslint:disable */ +import { + TypeBuilder as __TypeBuilder, + t as __t, + type AlgebraicTypeType as __AlgebraicTypeType, + type Infer as __Infer, +} from "spacetimedb"; + +export default { + id: __t.string(), + workspaceId: __t.string(), + newBody: __t.string(), + metadata: __t.string(), + editedAt: __t.string(), +}; diff --git a/web/src/lib/chat/module_bindings/chat_message_table.ts b/web/src/lib/chat/module_bindings/chat_message_table.ts index 66fb368..f8416cb 100644 --- a/web/src/lib/chat/module_bindings/chat_message_table.ts +++ b/web/src/lib/chat/module_bindings/chat_message_table.ts @@ -20,4 +20,6 @@ export default __t.row({ messageType: __t.string().name("message_type"), replyTo: __t.string().name("reply_to"), createdAt: __t.timestamp().name("created_at"), + metadata: __t.string(), + editedAt: __t.string().name("edited_at"), }); diff --git a/web/src/lib/chat/module_bindings/clear_ai_processing_reducer.ts b/web/src/lib/chat/module_bindings/clear_ai_processing_reducer.ts new file mode 100644 index 0000000..75bc4e6 --- /dev/null +++ b/web/src/lib/chat/module_bindings/clear_ai_processing_reducer.ts @@ -0,0 +1,16 @@ +// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE +// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD. + +/* eslint-disable */ +/* tslint:disable */ +import { + TypeBuilder as __TypeBuilder, + t as __t, + type AlgebraicTypeType as __AlgebraicTypeType, + type Infer as __Infer, +} from "spacetimedb"; + +export default { + id: __t.string(), + workspaceId: __t.string(), +}; diff --git a/web/src/lib/chat/module_bindings/index.ts b/web/src/lib/chat/module_bindings/index.ts index ff6dc55..1f0a77f 100644 --- a/web/src/lib/chat/module_bindings/index.ts +++ b/web/src/lib/chat/module_bindings/index.ts @@ -35,20 +35,25 @@ import { // Import all reducer arg schemas import AddReactionReducer from "./add_reaction_reducer"; +import AiUpdateMessageReducer from "./ai_update_message_reducer"; +import ClearAiProcessingReducer from "./clear_ai_processing_reducer"; import ClearChannelReducer from "./clear_channel_reducer"; import DeleteMessageReducer from "./delete_message_reducer"; import EditMessageReducer from "./edit_message_reducer"; import LoadMessagesReducer from "./load_messages_reducer"; import LoadReactionsReducer from "./load_reactions_reducer"; +import LoadRevisionsReducer from "./load_revisions_reducer"; import MarkSyncedReducer from "./mark_synced_reducer"; import RemoveReactionReducer from "./remove_reaction_reducer"; import SendMessageReducer from "./send_message_reducer"; +import SetAiProcessingReducer from "./set_ai_processing_reducer"; // Import all procedure arg schemas // Import all table schema definitions import ChatMessageRow from "./chat_message_table"; import MessageReactionRow from "./message_reaction_table"; +import MessageRevisionRow from "./message_revision_table"; import SyncOutboxRow from "./sync_outbox_table"; /** Type-only namespace exports for generated type groups. */ @@ -77,6 +82,17 @@ const tablesSchema = __schema({ { name: 'message_reaction_id_key', constraint: 'unique', columns: ['id'] }, ], }, MessageReactionRow), + message_revision: __table({ + name: 'message_revision', + indexes: [ + { accessor: 'id', name: 'message_revision_id_idx_btree', algorithm: 'btree', columns: [ + 'id', + ] }, + ], + constraints: [ + { name: 'message_revision_id_key', constraint: 'unique', columns: ['id'] }, + ], + }, MessageRevisionRow), sync_outbox: __table({ name: 'sync_outbox', indexes: [ @@ -93,14 +109,18 @@ const tablesSchema = __schema({ /** The schema information for all reducers in this module. This is defined the same way as the reducers would have been defined in the server, except the body of the reducer is omitted in code generation. */ const reducersSchema = __reducers( __reducerSchema("add_reaction", AddReactionReducer), + __reducerSchema("ai_update_message", AiUpdateMessageReducer), + __reducerSchema("clear_ai_processing", ClearAiProcessingReducer), __reducerSchema("clear_channel", ClearChannelReducer), __reducerSchema("delete_message", DeleteMessageReducer), __reducerSchema("edit_message", EditMessageReducer), __reducerSchema("load_messages", LoadMessagesReducer), __reducerSchema("load_reactions", LoadReactionsReducer), + __reducerSchema("load_revisions", LoadRevisionsReducer), __reducerSchema("mark_synced", MarkSyncedReducer), __reducerSchema("remove_reaction", RemoveReactionReducer), __reducerSchema("send_message", SendMessageReducer), + __reducerSchema("set_ai_processing", SetAiProcessingReducer), ); /** The schema information for all procedures in this module. This is defined the same way as the procedures would have been defined in the server. */ diff --git a/web/src/lib/chat/module_bindings/load_revisions_reducer.ts b/web/src/lib/chat/module_bindings/load_revisions_reducer.ts new file mode 100644 index 0000000..84dbfec --- /dev/null +++ b/web/src/lib/chat/module_bindings/load_revisions_reducer.ts @@ -0,0 +1,15 @@ +// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE +// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD. + +/* eslint-disable */ +/* tslint:disable */ +import { + TypeBuilder as __TypeBuilder, + t as __t, + type AlgebraicTypeType as __AlgebraicTypeType, + type Infer as __Infer, +} from "spacetimedb"; + +export default { + revisionsJson: __t.string(), +}; diff --git a/web/src/lib/chat/module_bindings/message_revision_table.ts b/web/src/lib/chat/module_bindings/message_revision_table.ts new file mode 100644 index 0000000..3ac7de1 --- /dev/null +++ b/web/src/lib/chat/module_bindings/message_revision_table.ts @@ -0,0 +1,18 @@ +// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE +// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD. + +/* eslint-disable */ +/* tslint:disable */ +import { + TypeBuilder as __TypeBuilder, + t as __t, + type AlgebraicTypeType as __AlgebraicTypeType, + type Infer as __Infer, +} from "spacetimedb"; + +export default __t.row({ + id: __t.u64().primaryKey(), + messageId: __t.string().name("message_id"), + body: __t.string(), + editedAt: __t.string().name("edited_at"), +}); diff --git a/web/src/lib/chat/module_bindings/set_ai_processing_reducer.ts b/web/src/lib/chat/module_bindings/set_ai_processing_reducer.ts new file mode 100644 index 0000000..75bc4e6 --- /dev/null +++ b/web/src/lib/chat/module_bindings/set_ai_processing_reducer.ts @@ -0,0 +1,16 @@ +// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE +// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD. + +/* eslint-disable */ +/* tslint:disable */ +import { + TypeBuilder as __TypeBuilder, + t as __t, + type AlgebraicTypeType as __AlgebraicTypeType, + type Infer as __Infer, +} from "spacetimedb"; + +export default { + id: __t.string(), + workspaceId: __t.string(), +}; diff --git a/web/src/lib/chat/module_bindings/types.ts b/web/src/lib/chat/module_bindings/types.ts index 64a5308..c32fd99 100644 --- a/web/src/lib/chat/module_bindings/types.ts +++ b/web/src/lib/chat/module_bindings/types.ts @@ -20,6 +20,8 @@ export const ChatMessage = __t.object("ChatMessage", { messageType: __t.string(), replyTo: __t.string(), createdAt: __t.timestamp(), + metadata: __t.string(), + editedAt: __t.string(), }); export type ChatMessage = __Infer; @@ -32,6 +34,14 @@ export const MessageReaction = __t.object("MessageReaction", { }); export type MessageReaction = __Infer; +export const MessageRevision = __t.object("MessageRevision", { + id: __t.u64(), + messageId: __t.string(), + body: __t.string(), + editedAt: __t.string(), +}); +export type MessageRevision = __Infer; + export const SyncOutbox = __t.object("SyncOutbox", { id: __t.u64(), tableName: __t.string(), diff --git a/web/src/lib/chat/module_bindings/types/reducers.ts b/web/src/lib/chat/module_bindings/types/reducers.ts index a77af42..925bf47 100644 --- a/web/src/lib/chat/module_bindings/types/reducers.ts +++ b/web/src/lib/chat/module_bindings/types/reducers.ts @@ -7,22 +7,30 @@ import { type Infer as __Infer } from "spacetimedb"; // Import all reducer arg schemas import AddReactionReducer from "../add_reaction_reducer"; +import AiUpdateMessageReducer from "../ai_update_message_reducer"; +import ClearAiProcessingReducer from "../clear_ai_processing_reducer"; import ClearChannelReducer from "../clear_channel_reducer"; import DeleteMessageReducer from "../delete_message_reducer"; import EditMessageReducer from "../edit_message_reducer"; import LoadMessagesReducer from "../load_messages_reducer"; import LoadReactionsReducer from "../load_reactions_reducer"; +import LoadRevisionsReducer from "../load_revisions_reducer"; import MarkSyncedReducer from "../mark_synced_reducer"; import RemoveReactionReducer from "../remove_reaction_reducer"; import SendMessageReducer from "../send_message_reducer"; +import SetAiProcessingReducer from "../set_ai_processing_reducer"; export type AddReactionParams = __Infer; +export type AiUpdateMessageParams = __Infer; +export type ClearAiProcessingParams = __Infer; export type ClearChannelParams = __Infer; export type DeleteMessageParams = __Infer; export type EditMessageParams = __Infer; export type LoadMessagesParams = __Infer; export type LoadReactionsParams = __Infer; +export type LoadRevisionsParams = __Infer; export type MarkSyncedParams = __Infer; export type RemoveReactionParams = __Infer; export type SendMessageParams = __Infer; +export type SetAiProcessingParams = __Infer; diff --git a/web/src/lib/chat/spacetime.svelte.ts b/web/src/lib/chat/spacetime.svelte.ts index 57ecdcf..ecf724c 100644 --- a/web/src/lib/chat/spacetime.svelte.ts +++ b/web/src/lib/chat/spacetime.svelte.ts @@ -32,6 +32,15 @@ export function createSpacetimeChat( } catch { createdAt = new Date().toISOString(); } + + // Parse metadata fra JSON-streng + let metadata: MessageData['metadata'] = null; + if (row.metadata) { + try { + metadata = JSON.parse(row.metadata); + } catch { /* ugyldig JSON — ignorer */ } + } + return { id: row.id, channel_id: channelId, @@ -47,7 +56,9 @@ export function createSpacetimeChat( updated_at: createdAt, reactions: reactionMap.get(row.id) ?? [], kanban_view: null, - calendar_view: null + calendar_view: null, + metadata, + edited_at: row.editedAt || null }; } @@ -90,14 +101,12 @@ export function createSpacetimeChat( const bMs = extractMs(b.createdAt); return aMs - bMs; }); - // Bevar PG-spesifikke felter (metadata, edited_at, reply_count, etc.) fra eksisterende meldinger + // Bevar felter som ikke finnes i SpacetimeDB (reply_count, parent_body, etc.) const existing = new Map(messages.map(m => [m.id, m])); messages = rows.map(r => { const msg = spacetimeRowToMessage(r); const prev = existing.get(msg.id); if (prev) { - msg.metadata = prev.metadata; - msg.edited_at = prev.edited_at; msg.reply_count = prev.reply_count; msg.parent_body = prev.parent_body; msg.parent_author_name = prev.parent_author_name; @@ -115,37 +124,6 @@ export function createSpacetimeChat( } } - async function enrichFromPg() { - try { - const res = await fetch(`/api/channels/${channelId}/messages/metadata`); - if (!res.ok) return; - const data: { id: string; edited_at: string | null; metadata: any }[] = await res.json(); - const lookup = new Map(data.map(d => [d.id, d])); - messages = messages.map(m => { - const meta = lookup.get(m.id); - if (!meta) return m; - return { - ...m, - edited_at: meta.edited_at, - metadata: meta.metadata - }; - }); - } catch { - // Ikke kritisk — meldinger vises uansett - } - } - - async function enrichMessageFromPg(messageId: string) { - try { - const res = await fetch(`/api/messages/${messageId}/metadata`); - if (!res.ok) return; - const meta: { edited_at: string | null; metadata: any } = await res.json(); - messages = messages.map(m => - m.id === messageId ? { ...m, edited_at: meta.edited_at, metadata: meta.metadata } : m - ); - } catch { /* ikke kritisk */ } - } - function connectRealtime() { try { conn = DbConnection.builder() @@ -166,15 +144,14 @@ export function createSpacetimeChat( // Initialload — bygg state fra subscription rebuildReactions(); rebuildMessages(); - // Hent redigeringsstatus fra PG (SpacetimeDB har ikke edited_at/metadata) - enrichFromPg(); }) .onError(() => { console.error('[spacetime] subscription error'); }) .subscribe([ `SELECT * FROM chat_message WHERE channel_id = '${channelId}'`, - `SELECT * FROM message_reaction` + `SELECT * FROM message_reaction`, + `SELECT * FROM message_revision` ]); }) .onDisconnect(() => { @@ -198,11 +175,13 @@ export function createSpacetimeChat( // Meldinger oppdatert (edit) conn.db.chat_message.onUpdate((_ctx: EventContext, _oldRow, newRow) => { if (destroyed || newRow.channelId !== channelId) return; + let metadata: MessageData['metadata'] = null; + if (newRow.metadata) { + try { metadata = JSON.parse(newRow.metadata); } catch { /* ignorer */ } + } messages = messages.map(m => - m.id === newRow.id ? { ...m, body: newRow.body, edited_at: new Date().toISOString() } : m + m.id === newRow.id ? { ...m, body: newRow.body, edited_at: newRow.editedAt || m.edited_at, metadata } : m ); - // Hent metadata fra PG (ai_processed osv.) for den oppdaterte meldingen - enrichMessageFromPg(newRow.id); }); // Meldinger slettet @@ -310,6 +289,23 @@ export function createSpacetimeChat( } } + function getRevisions(messageId: string): { id: string; body: string; edited_at: string }[] { + if (!conn) return []; + const revs: { id: string; body: string; edited_at: string }[] = []; + for (const r of conn.db.message_revision.iter()) { + if (r.messageId === messageId) { + revs.push({ + id: String(r.id), + body: r.body, + edited_at: r.editedAt + }); + } + } + // Sorter nyeste først + revs.sort((a, b) => b.edited_at.localeCompare(a.edited_at)); + return revs; + } + function destroy() { destroyed = true; if (conn) { @@ -338,7 +334,8 @@ export function createSpacetimeChat( edit, delete: del, react, - refresh: async () => { rebuildMessages(); await enrichFromPg(); }, - destroy + refresh: async () => { rebuildMessages(); }, + destroy, + getRevisions }; } diff --git a/web/src/lib/chat/types.ts b/web/src/lib/chat/types.ts index 33e6032..5466865 100644 --- a/web/src/lib/chat/types.ts +++ b/web/src/lib/chat/types.ts @@ -22,6 +22,12 @@ export interface MentionRef { * Implementeres av PG-polling og SpacetimeDB. * Alle felter er reaktive (Svelte 5 $state). */ +export interface Revision { + id: string; + body: string; + edited_at: string; +} + export interface ChatConnection { readonly messages: MessageData[]; readonly error: string; @@ -33,4 +39,5 @@ export interface ChatConnection { react(messageId: string, reaction: string): Promise; refresh(): Promise; destroy(): void; + getRevisions?(messageId: string): Revision[]; } diff --git a/web/src/lib/components/MessageBox.svelte b/web/src/lib/components/MessageBox.svelte index ffa478c..a9d7c6b 100644 --- a/web/src/lib/components/MessageBox.svelte +++ b/web/src/lib/components/MessageBox.svelte @@ -231,10 +231,16 @@ async function loadRevisions() { loadingRevisions = true; try { - const res = await fetch(`/api/messages/${message.id}/revisions`); - if (res.ok) { - const data = await res.json(); - revisions = data.revisions ?? []; + if (callbacks.getRevisions) { + // SpacetimeDB-adapter: les direkte fra lokal state + revisions = callbacks.getRevisions(message.id); + } else { + // Fallback: PG-adapter via API + const res = await fetch(`/api/messages/${message.id}/revisions`); + if (res.ok) { + const data = await res.json(); + revisions = data.revisions ?? []; + } } } finally { loadingRevisions = false; diff --git a/web/src/lib/types/message.ts b/web/src/lib/types/message.ts index 72f0154..cc17ba5 100644 --- a/web/src/lib/types/message.ts +++ b/web/src/lib/types/message.ts @@ -52,5 +52,6 @@ export interface MessageBoxCallbacks { onConvertToKanban?: (messageId: string) => void; onConvertToCalendar?: (messageId: string) => void; onMagic?: (messageId: string, action?: string) => void; + getRevisions?: (messageId: string) => { id: string; body: string; edited_at: string }[]; currentUserId?: string; } diff --git a/worker/src/handlers/ai_text_process.rs b/worker/src/handlers/ai_text_process.rs index c834e01..7be2da3 100644 --- a/worker/src/handlers/ai_text_process.rs +++ b/worker/src/handlers/ai_text_process.rs @@ -76,16 +76,11 @@ impl JobHandler for AiTextProcessHandler { // Wrapper som rydder ai_processing-flagget ved feil let result = self.handle_inner(pool, workspace_id, job_id, payload, message_id).await; if result.is_err() { - let _ = sqlx::query( - r#" - UPDATE messages - SET metadata = COALESCE(metadata, '{}'::jsonb) - 'ai_processing' - WHERE id = $1 - "#, - ) - .bind(message_id) - .execute(pool) - .await; + // Rydd ai_processing via SpacetimeDB reducer + let _ = self.call_reducer("clear_ai_processing", &json!({ + "id": message_id.to_string(), + "workspace_id": workspace_id.to_string() + })).await; } result } @@ -165,33 +160,13 @@ impl AiTextProcessHandler { return Ok(Some(json!({ "skipped": true, "reason": "tom melding" }))); } - // 2. Sett ai_processing-flagg så frontend kan vise spinner - sqlx::query( - r#" - UPDATE messages - SET metadata = COALESCE(metadata, '{}'::jsonb) || '{"ai_processing": true}'::jsonb - WHERE id = $1 - "#, - ) - .bind(message_id) - .execute(pool) - .await - .context("Feil ved setting av ai_processing-flagg")?; + // 2. Sett ai_processing-flagg via SpacetimeDB → frontend ser pulsering umiddelbart + self.call_reducer("set_ai_processing", &json!({ + "id": message_id.to_string(), + "workspace_id": workspace_id.to_string() + })).await.context("Feil ved setting av ai_processing-flagg")?; - // 3. Lagre original som revisjon (etter at vi har satt processing-flagg) - sqlx::query( - r#" - INSERT INTO message_revisions (id, message_id, body) - VALUES (gen_random_uuid(), $1, $2) - "#, - ) - .bind(message_id) - .bind(&original_body) - .execute(pool) - .await - .context("Feil ved lagring av revisjon")?; - - // 4. Hent prompt-label fra DB (for metadata-stempel i chat) + // 3. Hent prompt-label fra DB (for metadata-stempel i chat) let prompt_label: Option = sqlx::query_scalar( "SELECT label FROM ai_prompts WHERE action = $1" ) @@ -201,29 +176,27 @@ impl AiTextProcessHandler { .ok() .flatten(); - // 5. Bygg system-prompt basert på action + // 4. Bygg system-prompt basert på action let system_prompt = match prompt_override { Some(custom) => custom.to_string(), None => get_system_prompt_from_db(pool, action).await, }; - // 6. Send til AI Gateway + // 5. Send til AI Gateway let ai_resp = self .call_ai_gateway(&system_prompt, &plain_text, &model) .await .context("AI Gateway-kall feilet")?; - // 7. Beregn faktisk modellnavn - // LiteLLM returnerer alias-navnet i model-feltet — bruk expected_model fra DB + // 6. Beregn faktisk modellnavn let actual_model = match &ai_resp.model_actual { - Some(m) if m != &model => Some(m.clone()), // Gateway returnerte faktisk modellnavn - _ => expected_model, // Bruk oppslaget fra providers-tabellen + Some(m) if m != &model => Some(m.clone()), + _ => expected_model, }; - // Strip openrouter/-prefiks for lesbarhet let actual_model_clean = actual_model.map(|m| m.replace("openrouter/", "").replace("gemini/", "google/")); - // 8. Skriv PG metadata FØR SpacetimeDB-oppdatering - // (frontend henter metadata fra PG når SpacetimeDB-update trigger onUpdate) + // 7. Bygg metadata og oppdater via SpacetimeDB ai_update_message reducer + // Reducer lagrer revisjon, oppdaterer alt atomisk, og lager outbox for PG-sync let metadata = json!({ "ai_processed": true, "ai_action": action, @@ -231,35 +204,17 @@ impl AiTextProcessHandler { "ai_model": actual_model_clean.as_deref().unwrap_or(&model) }); - sqlx::query( - r#" - UPDATE messages - SET metadata = (COALESCE(metadata, '{}'::jsonb) - 'ai_processing') || $1::jsonb, - edited_at = now() - WHERE id = $2 - "#, - ) - .bind(&metadata) - .bind(message_id) - .execute(pool) - .await - .context("Feil ved oppdatering av metadata")?; + let edited_at = chrono::Utc::now().format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string(); - // 9. Oppdater SpacetimeDB + PG body. - // SpacetimeDB er primær når frontend bruker sanntid, men meldingen - // kan også være opprettet via PG-polling — da finnes den ikke i STDB. - if let Err(e) = self.update_spacetimedb(&message_id, workspace_id, &ai_resp.content).await { - warn!(message_id = %message_id, error = %e, "SpacetimeDB-oppdatering feilet, oppdaterer PG direkte"); - } - // Oppdater alltid PG body som fallback (sync ville gjort dette, men kan ta tid) - sqlx::query("UPDATE messages SET body = $1, edited_at = now() WHERE id = $2") - .bind(&ai_resp.content) - .bind(message_id) - .execute(pool) - .await - .context("Feil ved oppdatering av body i PG")?; + self.call_reducer("ai_update_message", &json!({ + "id": message_id.to_string(), + "workspace_id": workspace_id.to_string(), + "new_body": ai_resp.content, + "metadata": metadata.to_string(), + "edited_at": edited_at + })).await.context("SpacetimeDB ai_update_message feilet")?; - // 10. Logg tokenforbruk til ai_usage_log + // 8. Logg tokenforbruk til ai_usage_log sqlx::query( r#" INSERT INTO ai_usage_log (workspace_id, job_id, job_type, model_alias, model_actual, action, prompt_tokens, completion_tokens, total_tokens) @@ -300,40 +255,31 @@ impl AiTextProcessHandler { }))) } - /// Oppdater meldingen i SpacetimeDB via edit_message reducer, - /// slik at frontend (som leser fra SpacetimeDB) ser AI-resultatet. - async fn update_spacetimedb( + /// Kall en SpacetimeDB reducer via HTTP. + async fn call_reducer( &self, - message_id: &Uuid, - workspace_id: &Uuid, - new_body: &str, + reducer: &str, + payload: &serde_json::Value, ) -> anyhow::Result<()> { let url = format!( - "{}/v1/database/{}/call/edit_message", - self.spacetimedb_url, self.spacetimedb_module + "{}/v1/database/{}/call/{}", + self.spacetimedb_url, self.spacetimedb_module, reducer ); - let payload = json!({ - "id": message_id.to_string(), - "workspace_id": workspace_id.to_string(), - "new_body": new_body - }); - let resp = self .http .post(&url) - .json(&payload) + .json(payload) .send() .await - .context("HTTP-kall til SpacetimeDB edit_message feilet")?; + .context(format!("HTTP-kall til SpacetimeDB {} feilet", reducer))?; if !resp.status().is_success() { let status = resp.status(); let body = resp.text().await.unwrap_or_default(); - anyhow::bail!("edit_message feilet ({}): {}", status, body); + anyhow::bail!("{} feilet ({}): {}", reducer, status, body); } - info!(message_id = %message_id, "SpacetimeDB oppdatert med AI-resultat"); Ok(()) } diff --git a/worker/src/sync.rs b/worker/src/sync.rs index 7db70e4..748da31 100644 --- a/worker/src/sync.rs +++ b/worker/src/sync.rs @@ -34,6 +34,18 @@ struct MessagePayload { struct MessageUpdatePayload { id: String, body: String, + metadata: Option, + edited_at: Option, +} + +/// Payload for AI-oppdatering (ai_update action) +#[derive(Deserialize)] +struct AiUpdatePayload { + id: String, + body: String, + metadata: String, + edited_at: String, + revision_body: String, } /// Payload for meldings-sletting @@ -136,6 +148,7 @@ async fn sync_batch( ("messages", "insert") => process_message_insert(pool, &entry.payload, mention_re).await, ("messages", "delete") => process_message_delete(pool, &entry.payload).await, ("messages", "update") => process_message_update(pool, &entry.payload).await, + ("messages", "ai_update") => process_ai_update(pool, &entry.payload).await, ("message_reactions", "insert") => process_reaction_insert(pool, &entry.payload).await, ("message_reactions", "delete") => process_reaction_delete(pool, &entry.payload).await, _ => { @@ -268,16 +281,62 @@ async fn process_message_delete(pool: &PgPool, payload_json: &str) -> anyhow::Re async fn process_message_update(pool: &PgPool, payload_json: &str) -> anyhow::Result<()> { let payload: MessageUpdatePayload = serde_json::from_str(payload_json)?; - sqlx::query("UPDATE messages SET body = $1, edited_at = now() WHERE id = $2::uuid") + // Oppdater body, og eventuelt metadata/edited_at hvis de finnes + if payload.metadata.is_some() || payload.edited_at.is_some() { + sqlx::query( + r#"UPDATE messages SET body = $1, + metadata = CASE WHEN $3::text != '' THEN $3::jsonb ELSE metadata END, + edited_at = CASE WHEN $4::text != '' THEN $4::timestamptz ELSE now() END + WHERE id = $2::uuid"# + ) .bind(&payload.body) .bind(&payload.id) + .bind(payload.metadata.as_deref().unwrap_or("")) + .bind(payload.edited_at.as_deref().unwrap_or("")) .execute(pool) .await?; + } else { + sqlx::query("UPDATE messages SET body = $1, edited_at = now() WHERE id = $2::uuid") + .bind(&payload.body) + .bind(&payload.id) + .execute(pool) + .await?; + } info!(id = %payload.id, "Melding oppdatert i PG"); Ok(()) } +async fn process_ai_update(pool: &PgPool, payload_json: &str) -> anyhow::Result<()> { + let payload: AiUpdatePayload = serde_json::from_str(payload_json)?; + + let mut tx = pool.begin().await?; + + // Oppdater body, metadata, edited_at + sqlx::query( + r#"UPDATE messages SET body = $1, metadata = $2::jsonb, edited_at = $3::timestamptz WHERE id = $4::uuid"# + ) + .bind(&payload.body) + .bind(&payload.metadata) + .bind(&payload.edited_at) + .bind(&payload.id) + .execute(&mut *tx) + .await?; + + // Insert revisjon med gammel body + sqlx::query( + r#"INSERT INTO message_revisions (id, message_id, body) VALUES (gen_random_uuid(), $1::uuid, $2)"# + ) + .bind(&payload.id) + .bind(&payload.revision_body) + .execute(&mut *tx) + .await?; + + tx.commit().await?; + info!(id = %payload.id, "AI-oppdatering synket til PG"); + Ok(()) +} + async fn process_reaction_insert(pool: &PgPool, payload_json: &str) -> anyhow::Result<()> { let payload: ReactionPayload = serde_json::from_str(payload_json)?; diff --git a/worker/src/warmup.rs b/worker/src/warmup.rs index 7cb5aa7..6c6bd5d 100644 --- a/worker/src/warmup.rs +++ b/worker/src/warmup.rs @@ -107,7 +107,9 @@ pub async fn run( "body": r.5, "message_type": r.6, "reply_to": r.7.as_deref().unwrap_or(""), - "created_at": r.8 + "created_at": r.8, + "metadata": r.9, + "edited_at": r.10 }) }).collect(); @@ -161,12 +163,52 @@ pub async fn run( } } + // Hent revisjoner for denne kanalens meldinger + let msg_ids: Vec = rows.iter().map(|r| r.0.clone()).collect(); + let mut total_revisions_ch = 0usize; + if !msg_ids.is_empty() { + let revision_rows: Vec<(String, String, String)> = sqlx::query_as( + r#" + SELECT + mr.message_id::text, + mr.body, + mr.created_at::text + FROM message_revisions mr + WHERE mr.message_id = ANY($1::uuid[]) + ORDER BY mr.created_at DESC + "# + ) + .bind(&msg_ids) + .fetch_all(pool) + .await?; + + if !revision_rows.is_empty() { + let revisions: Vec = revision_rows.iter().map(|r| { + serde_json::json!({ + "message_id": r.0, + "body": r.1, + "edited_at": r.2 + }) + }).collect(); + + let revisions_json = serde_json::to_string(&revisions)?; + if let Err(e) = call_reducer(http, spacetimedb_url, module, "load_revisions", &serde_json::json!({ + "revisions_json": revisions_json + })).await { + warn!(channel = %ch.name, error = %e, "Feil ved lasting av revisjoner"); + } else { + total_revisions_ch = revision_rows.len(); + } + } + } + info!( channel = %ch.name, mode = %ch.config.warmup_mode, value = ?ch.config.warmup_value, messages = count, reactions = reaction_rows.len(), + revisions = total_revisions_ch, "Kanal oppvarmet" ); } @@ -181,7 +223,7 @@ pub async fn run( Ok(()) } -type MessageRow = (String, String, String, String, String, String, String, Option, String); +type MessageRow = (String, String, String, String, String, String, String, Option, String, String, String); const MESSAGE_COLUMNS: &str = r#" m.id::text, @@ -192,7 +234,9 @@ const MESSAGE_COLUMNS: &str = r#" COALESCE(m.body, ''), COALESCE(m.message_type::text, 'text'), m.reply_to::text, - m.created_at::text + m.created_at::text, + COALESCE(m.metadata::text, ''), + COALESCE(m.edited_at::text, '') "#; const MESSAGE_JOINS: &str = r#"