SpacetimeDB som eneste datakilde: fjern alle PG-kall fra frontend-adapter

Utvider SpacetimeDB-modulen med metadata, edited_at og MessageRevision-tabell.
Worker skriver via reducers (set_ai_processing, ai_update_message) i stedet for
direkte PG-skriving. Frontend-adapteren leser alt fra SpacetimeDB — null fetch()-kall.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
vegard 2026-03-16 17:38:44 +01:00
parent 1e065b827d
commit af74749bed
20 changed files with 514 additions and 153 deletions

View file

@ -583,6 +583,7 @@ checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64"
name = "sidelinja-realtime"
version = "0.1.0"
dependencies = [
"chrono",
"log",
"serde_json",
"spacetimedb",

View file

@ -10,3 +10,4 @@ crate-type = ["cdylib"]
spacetimedb = "1.0"
serde_json = "1"
log = "0.4"
chrono = { version = "0.4", default-features = false, features = ["std"] }

View file

@ -17,6 +17,21 @@ pub struct ChatMessage {
pub message_type: String,
pub reply_to: String,
pub created_at: Timestamp,
/// JSON-streng med metadata (ai_processed, ai_action, etc.). Tom = ingen metadata.
pub metadata: String,
/// ISO-tidsstempel for siste redigering. Tom = aldri redigert.
pub edited_at: String,
}
/// Revisjonshistorikk for meldinger. Lagrer tidligere versjoner av body.
#[table(name = message_revision, public)]
pub struct MessageRevision {
#[auto_inc]
#[primary_key]
pub id: u64,
pub message_id: String,
pub body: String,
pub edited_at: String,
}
/// Reaksjon på en melding. Speiler PostgreSQL `message_reactions`.
@ -85,6 +100,8 @@ pub fn send_message(
message_type: "text".to_string(),
reply_to,
created_at: ctx.timestamp,
metadata: String::new(),
edited_at: String::new(),
};
ctx.db.chat_message().insert(msg);
@ -134,7 +151,7 @@ pub fn delete_message(ctx: &ReducerContext, id: String, workspace_id: String) ->
Ok(())
}
/// Rediger en melding. Oppdaterer body + legger i SyncOutbox.
/// Rediger en melding. Lagrer revisjon, oppdaterer body + legger i SyncOutbox.
#[reducer]
pub fn edit_message(
ctx: &ReducerContext,
@ -147,12 +164,27 @@ pub fn edit_message(
}
if let Some(mut msg) = ctx.db.chat_message().id().find(&id) {
// Lagre gammel body som revisjon
let now = format_timestamp(ctx.timestamp);
ctx.db.message_revision().insert(MessageRevision {
id: 0,
message_id: id.clone(),
body: msg.body.clone(),
edited_at: now.clone(),
});
msg.body = new_body.trim().to_string();
let edited_at = now;
msg.edited_at = edited_at.clone();
let metadata_for_payload = msg.metadata.clone();
// Bevar eksisterende metadata
ctx.db.chat_message().id().update(msg);
let payload = serde_json::json!({
"id": id,
"body": new_body.trim()
"body": new_body.trim(),
"metadata": metadata_for_payload,
"edited_at": edited_at
}).to_string();
ctx.db.sync_outbox().insert(SyncOutbox {
@ -172,6 +204,132 @@ pub fn edit_message(
}
}
/// Sett ai_processing-flagg på en melding. Transient state — ingen outbox.
#[reducer]
pub fn set_ai_processing(
ctx: &ReducerContext,
id: String,
_workspace_id: String,
) -> Result<(), String> {
if let Some(mut msg) = ctx.db.chat_message().id().find(&id) {
// Merger ai_processing: true inn i metadata
let mut meta: serde_json::Value = if msg.metadata.is_empty() {
serde_json::json!({})
} else {
serde_json::from_str(&msg.metadata).unwrap_or(serde_json::json!({}))
};
meta["ai_processing"] = serde_json::json!(true);
msg.metadata = meta.to_string();
ctx.db.chat_message().id().update(msg);
log::info!("AI-prosessering startet: {}", id);
Ok(())
} else {
Err("Melding ikke funnet".to_string())
}
}
/// Fjern ai_processing-flagg (brukes ved feil).
#[reducer]
pub fn clear_ai_processing(
ctx: &ReducerContext,
id: String,
_workspace_id: String,
) -> Result<(), String> {
if let Some(mut msg) = ctx.db.chat_message().id().find(&id) {
if !msg.metadata.is_empty() {
if let Ok(mut meta) = serde_json::from_str::<serde_json::Value>(&msg.metadata) {
if let Some(obj) = meta.as_object_mut() {
obj.remove("ai_processing");
msg.metadata = if obj.is_empty() { String::new() } else { serde_json::to_string(&meta).unwrap_or_default() };
}
}
}
ctx.db.chat_message().id().update(msg);
log::info!("AI-prosessering ryddet: {}", id);
Ok(())
} else {
Err("Melding ikke funnet".to_string())
}
}
/// AI-oppdatering av melding. Lagrer revisjon, oppdaterer body/metadata/edited_at.
/// Outbox-entry med action "ai_update" for sync til PG.
#[reducer]
pub fn ai_update_message(
ctx: &ReducerContext,
id: String,
workspace_id: String,
new_body: String,
metadata: String,
edited_at: String,
) -> Result<(), String> {
if let Some(mut msg) = ctx.db.chat_message().id().find(&id) {
let old_body = msg.body.clone();
// Lagre gammel body som revisjon
ctx.db.message_revision().insert(MessageRevision {
id: 0,
message_id: id.clone(),
body: old_body.clone(),
edited_at: edited_at.clone(),
});
msg.body = new_body.clone();
msg.metadata = metadata.clone();
msg.edited_at = edited_at.clone();
ctx.db.chat_message().id().update(msg);
let payload = serde_json::json!({
"id": id,
"body": new_body,
"metadata": metadata,
"edited_at": edited_at,
"revision_body": old_body
}).to_string();
ctx.db.sync_outbox().insert(SyncOutbox {
id: 0,
table_name: "messages".to_string(),
action: "ai_update".to_string(),
payload,
workspace_id,
created_at: ctx.timestamp,
synced: false,
});
log::info!("AI-oppdatering fullført: {}", id);
Ok(())
} else {
Err("Melding ikke funnet".to_string())
}
}
/// Laster revisjoner fra PostgreSQL ved oppvarming.
#[reducer]
pub fn load_revisions(
ctx: &ReducerContext,
revisions_json: String,
) -> Result<(), String> {
let items: Vec<serde_json::Value> = serde_json::from_str(&revisions_json)
.map_err(|e| format!("Ugyldig JSON: {}", e))?;
let count = items.len();
for item in items {
let message_id = item["message_id"].as_str().unwrap_or_default().to_string();
if message_id.is_empty() { continue; }
ctx.db.message_revision().insert(MessageRevision {
id: 0,
message_id,
body: item["body"].as_str().unwrap_or_default().to_string(),
edited_at: item["edited_at"].as_str().unwrap_or_default().to_string(),
});
}
log::info!("Revisjoner lastet: {}", count);
Ok(())
}
/// Legg til reaksjon. Én reaksjon per bruker per melding (erstatter tidligere).
#[reducer]
pub fn add_reaction(
@ -285,6 +443,8 @@ pub fn load_messages(
message_type: item["message_type"].as_str().unwrap_or("text").to_string(),
reply_to: item["reply_to"].as_str().unwrap_or_default().to_string(),
created_at: ctx.timestamp,
metadata: item["metadata"].as_str().unwrap_or_default().to_string(),
edited_at: item["edited_at"].as_str().unwrap_or_default().to_string(),
});
}
@ -330,6 +490,14 @@ pub fn clear_channel(ctx: &ReducerContext, channel_id: String) -> Result<(), Str
let msg_ids: Vec<String> = messages.iter().map(|m| m.id.clone()).collect();
// Slett revisjoner for disse meldingene
let revisions: Vec<_> = ctx.db.message_revision().iter()
.filter(|r| msg_ids.contains(&r.message_id))
.collect();
for r in revisions {
ctx.db.message_revision().id().delete(&r.id);
}
// Slett reaksjoner for disse meldingene
let reactions: Vec<_> = ctx.db.message_reaction().iter()
.filter(|r| msg_ids.contains(&r.message_id))
@ -361,6 +529,19 @@ pub fn mark_synced(ctx: &ReducerContext, ids: Vec<u64>) -> Result<(), String> {
Ok(())
}
// === Hjelpefunksjoner ===
/// Formater en SpacetimeDB Timestamp til ISO 8601-streng.
fn format_timestamp(ts: Timestamp) -> String {
let duration = ts.to_duration_since_unix_epoch().unwrap_or_default();
let micros = duration.as_micros() as u64;
let secs = micros / 1_000_000;
let nanos = ((micros % 1_000_000) * 1000) as u32;
let dt = chrono::DateTime::from_timestamp(secs as i64, nanos)
.unwrap_or_default();
dt.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string()
}
// === Livssyklus ===
#[reducer(client_connected)]

View file

@ -63,18 +63,12 @@
});
if (!res.ok) return;
const { job_id } = await res.json();
// Sett lokal processing-state umiddelbart (via array-reassign for reaktivitet)
if (chat) {
const msgs = chat.messages;
const idx = msgs.findIndex(m => m.id === messageId);
if (idx >= 0) {
msgs[idx] = { ...msgs[idx], metadata: { ...msgs[idx].metadata, ai_processing: true } };
}
}
// Poll jobb-status
// Worker kaller set_ai_processing reducer → SpacetimeDB oppdaterer frontend automatisk
// Poll jobb-status som fallback for feilhåndtering
pollJob(job_id, messageId);
} catch { /* stille feil */ }
}
},
getRevisions: (messageId: string) => chat?.getRevisions?.(messageId) ?? []
};
function pollJob(jobId: string, messageId?: string) {

View file

@ -0,0 +1,19 @@
// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE
// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD.
/* eslint-disable */
/* tslint:disable */
import {
TypeBuilder as __TypeBuilder,
t as __t,
type AlgebraicTypeType as __AlgebraicTypeType,
type Infer as __Infer,
} from "spacetimedb";
export default {
id: __t.string(),
workspaceId: __t.string(),
newBody: __t.string(),
metadata: __t.string(),
editedAt: __t.string(),
};

View file

@ -20,4 +20,6 @@ export default __t.row({
messageType: __t.string().name("message_type"),
replyTo: __t.string().name("reply_to"),
createdAt: __t.timestamp().name("created_at"),
metadata: __t.string(),
editedAt: __t.string().name("edited_at"),
});

View file

@ -0,0 +1,16 @@
// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE
// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD.
/* eslint-disable */
/* tslint:disable */
import {
TypeBuilder as __TypeBuilder,
t as __t,
type AlgebraicTypeType as __AlgebraicTypeType,
type Infer as __Infer,
} from "spacetimedb";
export default {
id: __t.string(),
workspaceId: __t.string(),
};

View file

@ -35,20 +35,25 @@ import {
// Import all reducer arg schemas
import AddReactionReducer from "./add_reaction_reducer";
import AiUpdateMessageReducer from "./ai_update_message_reducer";
import ClearAiProcessingReducer from "./clear_ai_processing_reducer";
import ClearChannelReducer from "./clear_channel_reducer";
import DeleteMessageReducer from "./delete_message_reducer";
import EditMessageReducer from "./edit_message_reducer";
import LoadMessagesReducer from "./load_messages_reducer";
import LoadReactionsReducer from "./load_reactions_reducer";
import LoadRevisionsReducer from "./load_revisions_reducer";
import MarkSyncedReducer from "./mark_synced_reducer";
import RemoveReactionReducer from "./remove_reaction_reducer";
import SendMessageReducer from "./send_message_reducer";
import SetAiProcessingReducer from "./set_ai_processing_reducer";
// Import all procedure arg schemas
// Import all table schema definitions
import ChatMessageRow from "./chat_message_table";
import MessageReactionRow from "./message_reaction_table";
import MessageRevisionRow from "./message_revision_table";
import SyncOutboxRow from "./sync_outbox_table";
/** Type-only namespace exports for generated type groups. */
@ -77,6 +82,17 @@ const tablesSchema = __schema({
{ name: 'message_reaction_id_key', constraint: 'unique', columns: ['id'] },
],
}, MessageReactionRow),
message_revision: __table({
name: 'message_revision',
indexes: [
{ accessor: 'id', name: 'message_revision_id_idx_btree', algorithm: 'btree', columns: [
'id',
] },
],
constraints: [
{ name: 'message_revision_id_key', constraint: 'unique', columns: ['id'] },
],
}, MessageRevisionRow),
sync_outbox: __table({
name: 'sync_outbox',
indexes: [
@ -93,14 +109,18 @@ const tablesSchema = __schema({
/** The schema information for all reducers in this module. This is defined the same way as the reducers would have been defined in the server, except the body of the reducer is omitted in code generation. */
const reducersSchema = __reducers(
__reducerSchema("add_reaction", AddReactionReducer),
__reducerSchema("ai_update_message", AiUpdateMessageReducer),
__reducerSchema("clear_ai_processing", ClearAiProcessingReducer),
__reducerSchema("clear_channel", ClearChannelReducer),
__reducerSchema("delete_message", DeleteMessageReducer),
__reducerSchema("edit_message", EditMessageReducer),
__reducerSchema("load_messages", LoadMessagesReducer),
__reducerSchema("load_reactions", LoadReactionsReducer),
__reducerSchema("load_revisions", LoadRevisionsReducer),
__reducerSchema("mark_synced", MarkSyncedReducer),
__reducerSchema("remove_reaction", RemoveReactionReducer),
__reducerSchema("send_message", SendMessageReducer),
__reducerSchema("set_ai_processing", SetAiProcessingReducer),
);
/** The schema information for all procedures in this module. This is defined the same way as the procedures would have been defined in the server. */

View file

@ -0,0 +1,15 @@
// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE
// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD.
/* eslint-disable */
/* tslint:disable */
import {
TypeBuilder as __TypeBuilder,
t as __t,
type AlgebraicTypeType as __AlgebraicTypeType,
type Infer as __Infer,
} from "spacetimedb";
export default {
revisionsJson: __t.string(),
};

View file

@ -0,0 +1,18 @@
// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE
// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD.
/* eslint-disable */
/* tslint:disable */
import {
TypeBuilder as __TypeBuilder,
t as __t,
type AlgebraicTypeType as __AlgebraicTypeType,
type Infer as __Infer,
} from "spacetimedb";
export default __t.row({
id: __t.u64().primaryKey(),
messageId: __t.string().name("message_id"),
body: __t.string(),
editedAt: __t.string().name("edited_at"),
});

View file

@ -0,0 +1,16 @@
// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE
// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD.
/* eslint-disable */
/* tslint:disable */
import {
TypeBuilder as __TypeBuilder,
t as __t,
type AlgebraicTypeType as __AlgebraicTypeType,
type Infer as __Infer,
} from "spacetimedb";
export default {
id: __t.string(),
workspaceId: __t.string(),
};

View file

@ -20,6 +20,8 @@ export const ChatMessage = __t.object("ChatMessage", {
messageType: __t.string(),
replyTo: __t.string(),
createdAt: __t.timestamp(),
metadata: __t.string(),
editedAt: __t.string(),
});
export type ChatMessage = __Infer<typeof ChatMessage>;
@ -32,6 +34,14 @@ export const MessageReaction = __t.object("MessageReaction", {
});
export type MessageReaction = __Infer<typeof MessageReaction>;
export const MessageRevision = __t.object("MessageRevision", {
id: __t.u64(),
messageId: __t.string(),
body: __t.string(),
editedAt: __t.string(),
});
export type MessageRevision = __Infer<typeof MessageRevision>;
export const SyncOutbox = __t.object("SyncOutbox", {
id: __t.u64(),
tableName: __t.string(),

View file

@ -7,22 +7,30 @@ import { type Infer as __Infer } from "spacetimedb";
// Import all reducer arg schemas
import AddReactionReducer from "../add_reaction_reducer";
import AiUpdateMessageReducer from "../ai_update_message_reducer";
import ClearAiProcessingReducer from "../clear_ai_processing_reducer";
import ClearChannelReducer from "../clear_channel_reducer";
import DeleteMessageReducer from "../delete_message_reducer";
import EditMessageReducer from "../edit_message_reducer";
import LoadMessagesReducer from "../load_messages_reducer";
import LoadReactionsReducer from "../load_reactions_reducer";
import LoadRevisionsReducer from "../load_revisions_reducer";
import MarkSyncedReducer from "../mark_synced_reducer";
import RemoveReactionReducer from "../remove_reaction_reducer";
import SendMessageReducer from "../send_message_reducer";
import SetAiProcessingReducer from "../set_ai_processing_reducer";
export type AddReactionParams = __Infer<typeof AddReactionReducer>;
export type AiUpdateMessageParams = __Infer<typeof AiUpdateMessageReducer>;
export type ClearAiProcessingParams = __Infer<typeof ClearAiProcessingReducer>;
export type ClearChannelParams = __Infer<typeof ClearChannelReducer>;
export type DeleteMessageParams = __Infer<typeof DeleteMessageReducer>;
export type EditMessageParams = __Infer<typeof EditMessageReducer>;
export type LoadMessagesParams = __Infer<typeof LoadMessagesReducer>;
export type LoadReactionsParams = __Infer<typeof LoadReactionsReducer>;
export type LoadRevisionsParams = __Infer<typeof LoadRevisionsReducer>;
export type MarkSyncedParams = __Infer<typeof MarkSyncedReducer>;
export type RemoveReactionParams = __Infer<typeof RemoveReactionReducer>;
export type SendMessageParams = __Infer<typeof SendMessageReducer>;
export type SetAiProcessingParams = __Infer<typeof SetAiProcessingReducer>;

View file

@ -32,6 +32,15 @@ export function createSpacetimeChat(
} catch {
createdAt = new Date().toISOString();
}
// Parse metadata fra JSON-streng
let metadata: MessageData['metadata'] = null;
if (row.metadata) {
try {
metadata = JSON.parse(row.metadata);
} catch { /* ugyldig JSON — ignorer */ }
}
return {
id: row.id,
channel_id: channelId,
@ -47,7 +56,9 @@ export function createSpacetimeChat(
updated_at: createdAt,
reactions: reactionMap.get(row.id) ?? [],
kanban_view: null,
calendar_view: null
calendar_view: null,
metadata,
edited_at: row.editedAt || null
};
}
@ -90,14 +101,12 @@ export function createSpacetimeChat(
const bMs = extractMs(b.createdAt);
return aMs - bMs;
});
// Bevar PG-spesifikke felter (metadata, edited_at, reply_count, etc.) fra eksisterende meldinger
// Bevar felter som ikke finnes i SpacetimeDB (reply_count, parent_body, etc.)
const existing = new Map(messages.map(m => [m.id, m]));
messages = rows.map(r => {
const msg = spacetimeRowToMessage(r);
const prev = existing.get(msg.id);
if (prev) {
msg.metadata = prev.metadata;
msg.edited_at = prev.edited_at;
msg.reply_count = prev.reply_count;
msg.parent_body = prev.parent_body;
msg.parent_author_name = prev.parent_author_name;
@ -115,37 +124,6 @@ export function createSpacetimeChat(
}
}
async function enrichFromPg() {
try {
const res = await fetch(`/api/channels/${channelId}/messages/metadata`);
if (!res.ok) return;
const data: { id: string; edited_at: string | null; metadata: any }[] = await res.json();
const lookup = new Map(data.map(d => [d.id, d]));
messages = messages.map(m => {
const meta = lookup.get(m.id);
if (!meta) return m;
return {
...m,
edited_at: meta.edited_at,
metadata: meta.metadata
};
});
} catch {
// Ikke kritisk — meldinger vises uansett
}
}
async function enrichMessageFromPg(messageId: string) {
try {
const res = await fetch(`/api/messages/${messageId}/metadata`);
if (!res.ok) return;
const meta: { edited_at: string | null; metadata: any } = await res.json();
messages = messages.map(m =>
m.id === messageId ? { ...m, edited_at: meta.edited_at, metadata: meta.metadata } : m
);
} catch { /* ikke kritisk */ }
}
function connectRealtime() {
try {
conn = DbConnection.builder()
@ -166,15 +144,14 @@ export function createSpacetimeChat(
// Initialload — bygg state fra subscription
rebuildReactions();
rebuildMessages();
// Hent redigeringsstatus fra PG (SpacetimeDB har ikke edited_at/metadata)
enrichFromPg();
})
.onError(() => {
console.error('[spacetime] subscription error');
})
.subscribe([
`SELECT * FROM chat_message WHERE channel_id = '${channelId}'`,
`SELECT * FROM message_reaction`
`SELECT * FROM message_reaction`,
`SELECT * FROM message_revision`
]);
})
.onDisconnect(() => {
@ -198,11 +175,13 @@ export function createSpacetimeChat(
// Meldinger oppdatert (edit)
conn.db.chat_message.onUpdate((_ctx: EventContext, _oldRow, newRow) => {
if (destroyed || newRow.channelId !== channelId) return;
let metadata: MessageData['metadata'] = null;
if (newRow.metadata) {
try { metadata = JSON.parse(newRow.metadata); } catch { /* ignorer */ }
}
messages = messages.map(m =>
m.id === newRow.id ? { ...m, body: newRow.body, edited_at: new Date().toISOString() } : m
m.id === newRow.id ? { ...m, body: newRow.body, edited_at: newRow.editedAt || m.edited_at, metadata } : m
);
// Hent metadata fra PG (ai_processed osv.) for den oppdaterte meldingen
enrichMessageFromPg(newRow.id);
});
// Meldinger slettet
@ -310,6 +289,23 @@ export function createSpacetimeChat(
}
}
function getRevisions(messageId: string): { id: string; body: string; edited_at: string }[] {
if (!conn) return [];
const revs: { id: string; body: string; edited_at: string }[] = [];
for (const r of conn.db.message_revision.iter()) {
if (r.messageId === messageId) {
revs.push({
id: String(r.id),
body: r.body,
edited_at: r.editedAt
});
}
}
// Sorter nyeste først
revs.sort((a, b) => b.edited_at.localeCompare(a.edited_at));
return revs;
}
function destroy() {
destroyed = true;
if (conn) {
@ -338,7 +334,8 @@ export function createSpacetimeChat(
edit,
delete: del,
react,
refresh: async () => { rebuildMessages(); await enrichFromPg(); },
destroy
refresh: async () => { rebuildMessages(); },
destroy,
getRevisions
};
}

View file

@ -22,6 +22,12 @@ export interface MentionRef {
* Implementeres av PG-polling og SpacetimeDB.
* Alle felter er reaktive (Svelte 5 $state).
*/
export interface Revision {
id: string;
body: string;
edited_at: string;
}
export interface ChatConnection {
readonly messages: MessageData[];
readonly error: string;
@ -33,4 +39,5 @@ export interface ChatConnection {
react(messageId: string, reaction: string): Promise<void>;
refresh(): Promise<void>;
destroy(): void;
getRevisions?(messageId: string): Revision[];
}

View file

@ -231,10 +231,16 @@
async function loadRevisions() {
loadingRevisions = true;
try {
const res = await fetch(`/api/messages/${message.id}/revisions`);
if (res.ok) {
const data = await res.json();
revisions = data.revisions ?? [];
if (callbacks.getRevisions) {
// SpacetimeDB-adapter: les direkte fra lokal state
revisions = callbacks.getRevisions(message.id);
} else {
// Fallback: PG-adapter via API
const res = await fetch(`/api/messages/${message.id}/revisions`);
if (res.ok) {
const data = await res.json();
revisions = data.revisions ?? [];
}
}
} finally {
loadingRevisions = false;

View file

@ -52,5 +52,6 @@ export interface MessageBoxCallbacks {
onConvertToKanban?: (messageId: string) => void;
onConvertToCalendar?: (messageId: string) => void;
onMagic?: (messageId: string, action?: string) => void;
getRevisions?: (messageId: string) => { id: string; body: string; edited_at: string }[];
currentUserId?: string;
}

View file

@ -76,16 +76,11 @@ impl JobHandler for AiTextProcessHandler {
// Wrapper som rydder ai_processing-flagget ved feil
let result = self.handle_inner(pool, workspace_id, job_id, payload, message_id).await;
if result.is_err() {
let _ = sqlx::query(
r#"
UPDATE messages
SET metadata = COALESCE(metadata, '{}'::jsonb) - 'ai_processing'
WHERE id = $1
"#,
)
.bind(message_id)
.execute(pool)
.await;
// Rydd ai_processing via SpacetimeDB reducer
let _ = self.call_reducer("clear_ai_processing", &json!({
"id": message_id.to_string(),
"workspace_id": workspace_id.to_string()
})).await;
}
result
}
@ -165,33 +160,13 @@ impl AiTextProcessHandler {
return Ok(Some(json!({ "skipped": true, "reason": "tom melding" })));
}
// 2. Sett ai_processing-flagg så frontend kan vise spinner
sqlx::query(
r#"
UPDATE messages
SET metadata = COALESCE(metadata, '{}'::jsonb) || '{"ai_processing": true}'::jsonb
WHERE id = $1
"#,
)
.bind(message_id)
.execute(pool)
.await
.context("Feil ved setting av ai_processing-flagg")?;
// 2. Sett ai_processing-flagg via SpacetimeDB → frontend ser pulsering umiddelbart
self.call_reducer("set_ai_processing", &json!({
"id": message_id.to_string(),
"workspace_id": workspace_id.to_string()
})).await.context("Feil ved setting av ai_processing-flagg")?;
// 3. Lagre original som revisjon (etter at vi har satt processing-flagg)
sqlx::query(
r#"
INSERT INTO message_revisions (id, message_id, body)
VALUES (gen_random_uuid(), $1, $2)
"#,
)
.bind(message_id)
.bind(&original_body)
.execute(pool)
.await
.context("Feil ved lagring av revisjon")?;
// 4. Hent prompt-label fra DB (for metadata-stempel i chat)
// 3. Hent prompt-label fra DB (for metadata-stempel i chat)
let prompt_label: Option<String> = sqlx::query_scalar(
"SELECT label FROM ai_prompts WHERE action = $1"
)
@ -201,29 +176,27 @@ impl AiTextProcessHandler {
.ok()
.flatten();
// 5. Bygg system-prompt basert på action
// 4. Bygg system-prompt basert på action
let system_prompt = match prompt_override {
Some(custom) => custom.to_string(),
None => get_system_prompt_from_db(pool, action).await,
};
// 6. Send til AI Gateway
// 5. Send til AI Gateway
let ai_resp = self
.call_ai_gateway(&system_prompt, &plain_text, &model)
.await
.context("AI Gateway-kall feilet")?;
// 7. Beregn faktisk modellnavn
// LiteLLM returnerer alias-navnet i model-feltet — bruk expected_model fra DB
// 6. Beregn faktisk modellnavn
let actual_model = match &ai_resp.model_actual {
Some(m) if m != &model => Some(m.clone()), // Gateway returnerte faktisk modellnavn
_ => expected_model, // Bruk oppslaget fra providers-tabellen
Some(m) if m != &model => Some(m.clone()),
_ => expected_model,
};
// Strip openrouter/-prefiks for lesbarhet
let actual_model_clean = actual_model.map(|m| m.replace("openrouter/", "").replace("gemini/", "google/"));
// 8. Skriv PG metadata FØR SpacetimeDB-oppdatering
// (frontend henter metadata fra PG når SpacetimeDB-update trigger onUpdate)
// 7. Bygg metadata og oppdater via SpacetimeDB ai_update_message reducer
// Reducer lagrer revisjon, oppdaterer alt atomisk, og lager outbox for PG-sync
let metadata = json!({
"ai_processed": true,
"ai_action": action,
@ -231,35 +204,17 @@ impl AiTextProcessHandler {
"ai_model": actual_model_clean.as_deref().unwrap_or(&model)
});
sqlx::query(
r#"
UPDATE messages
SET metadata = (COALESCE(metadata, '{}'::jsonb) - 'ai_processing') || $1::jsonb,
edited_at = now()
WHERE id = $2
"#,
)
.bind(&metadata)
.bind(message_id)
.execute(pool)
.await
.context("Feil ved oppdatering av metadata")?;
let edited_at = chrono::Utc::now().format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string();
// 9. Oppdater SpacetimeDB + PG body.
// SpacetimeDB er primær når frontend bruker sanntid, men meldingen
// kan også være opprettet via PG-polling — da finnes den ikke i STDB.
if let Err(e) = self.update_spacetimedb(&message_id, workspace_id, &ai_resp.content).await {
warn!(message_id = %message_id, error = %e, "SpacetimeDB-oppdatering feilet, oppdaterer PG direkte");
}
// Oppdater alltid PG body som fallback (sync ville gjort dette, men kan ta tid)
sqlx::query("UPDATE messages SET body = $1, edited_at = now() WHERE id = $2")
.bind(&ai_resp.content)
.bind(message_id)
.execute(pool)
.await
.context("Feil ved oppdatering av body i PG")?;
self.call_reducer("ai_update_message", &json!({
"id": message_id.to_string(),
"workspace_id": workspace_id.to_string(),
"new_body": ai_resp.content,
"metadata": metadata.to_string(),
"edited_at": edited_at
})).await.context("SpacetimeDB ai_update_message feilet")?;
// 10. Logg tokenforbruk til ai_usage_log
// 8. Logg tokenforbruk til ai_usage_log
sqlx::query(
r#"
INSERT INTO ai_usage_log (workspace_id, job_id, job_type, model_alias, model_actual, action, prompt_tokens, completion_tokens, total_tokens)
@ -300,40 +255,31 @@ impl AiTextProcessHandler {
})))
}
/// Oppdater meldingen i SpacetimeDB via edit_message reducer,
/// slik at frontend (som leser fra SpacetimeDB) ser AI-resultatet.
async fn update_spacetimedb(
/// Kall en SpacetimeDB reducer via HTTP.
async fn call_reducer(
&self,
message_id: &Uuid,
workspace_id: &Uuid,
new_body: &str,
reducer: &str,
payload: &serde_json::Value,
) -> anyhow::Result<()> {
let url = format!(
"{}/v1/database/{}/call/edit_message",
self.spacetimedb_url, self.spacetimedb_module
"{}/v1/database/{}/call/{}",
self.spacetimedb_url, self.spacetimedb_module, reducer
);
let payload = json!({
"id": message_id.to_string(),
"workspace_id": workspace_id.to_string(),
"new_body": new_body
});
let resp = self
.http
.post(&url)
.json(&payload)
.json(payload)
.send()
.await
.context("HTTP-kall til SpacetimeDB edit_message feilet")?;
.context(format!("HTTP-kall til SpacetimeDB {} feilet", reducer))?;
if !resp.status().is_success() {
let status = resp.status();
let body = resp.text().await.unwrap_or_default();
anyhow::bail!("edit_message feilet ({}): {}", status, body);
anyhow::bail!("{} feilet ({}): {}", reducer, status, body);
}
info!(message_id = %message_id, "SpacetimeDB oppdatert med AI-resultat");
Ok(())
}

View file

@ -34,6 +34,18 @@ struct MessagePayload {
struct MessageUpdatePayload {
id: String,
body: String,
metadata: Option<String>,
edited_at: Option<String>,
}
/// Payload for AI-oppdatering (ai_update action)
#[derive(Deserialize)]
struct AiUpdatePayload {
id: String,
body: String,
metadata: String,
edited_at: String,
revision_body: String,
}
/// Payload for meldings-sletting
@ -136,6 +148,7 @@ async fn sync_batch(
("messages", "insert") => process_message_insert(pool, &entry.payload, mention_re).await,
("messages", "delete") => process_message_delete(pool, &entry.payload).await,
("messages", "update") => process_message_update(pool, &entry.payload).await,
("messages", "ai_update") => process_ai_update(pool, &entry.payload).await,
("message_reactions", "insert") => process_reaction_insert(pool, &entry.payload).await,
("message_reactions", "delete") => process_reaction_delete(pool, &entry.payload).await,
_ => {
@ -268,16 +281,62 @@ async fn process_message_delete(pool: &PgPool, payload_json: &str) -> anyhow::Re
async fn process_message_update(pool: &PgPool, payload_json: &str) -> anyhow::Result<()> {
let payload: MessageUpdatePayload = serde_json::from_str(payload_json)?;
sqlx::query("UPDATE messages SET body = $1, edited_at = now() WHERE id = $2::uuid")
// Oppdater body, og eventuelt metadata/edited_at hvis de finnes
if payload.metadata.is_some() || payload.edited_at.is_some() {
sqlx::query(
r#"UPDATE messages SET body = $1,
metadata = CASE WHEN $3::text != '' THEN $3::jsonb ELSE metadata END,
edited_at = CASE WHEN $4::text != '' THEN $4::timestamptz ELSE now() END
WHERE id = $2::uuid"#
)
.bind(&payload.body)
.bind(&payload.id)
.bind(payload.metadata.as_deref().unwrap_or(""))
.bind(payload.edited_at.as_deref().unwrap_or(""))
.execute(pool)
.await?;
} else {
sqlx::query("UPDATE messages SET body = $1, edited_at = now() WHERE id = $2::uuid")
.bind(&payload.body)
.bind(&payload.id)
.execute(pool)
.await?;
}
info!(id = %payload.id, "Melding oppdatert i PG");
Ok(())
}
async fn process_ai_update(pool: &PgPool, payload_json: &str) -> anyhow::Result<()> {
let payload: AiUpdatePayload = serde_json::from_str(payload_json)?;
let mut tx = pool.begin().await?;
// Oppdater body, metadata, edited_at
sqlx::query(
r#"UPDATE messages SET body = $1, metadata = $2::jsonb, edited_at = $3::timestamptz WHERE id = $4::uuid"#
)
.bind(&payload.body)
.bind(&payload.metadata)
.bind(&payload.edited_at)
.bind(&payload.id)
.execute(&mut *tx)
.await?;
// Insert revisjon med gammel body
sqlx::query(
r#"INSERT INTO message_revisions (id, message_id, body) VALUES (gen_random_uuid(), $1::uuid, $2)"#
)
.bind(&payload.id)
.bind(&payload.revision_body)
.execute(&mut *tx)
.await?;
tx.commit().await?;
info!(id = %payload.id, "AI-oppdatering synket til PG");
Ok(())
}
async fn process_reaction_insert(pool: &PgPool, payload_json: &str) -> anyhow::Result<()> {
let payload: ReactionPayload = serde_json::from_str(payload_json)?;

View file

@ -107,7 +107,9 @@ pub async fn run(
"body": r.5,
"message_type": r.6,
"reply_to": r.7.as_deref().unwrap_or(""),
"created_at": r.8
"created_at": r.8,
"metadata": r.9,
"edited_at": r.10
})
}).collect();
@ -161,12 +163,52 @@ pub async fn run(
}
}
// Hent revisjoner for denne kanalens meldinger
let msg_ids: Vec<String> = rows.iter().map(|r| r.0.clone()).collect();
let mut total_revisions_ch = 0usize;
if !msg_ids.is_empty() {
let revision_rows: Vec<(String, String, String)> = sqlx::query_as(
r#"
SELECT
mr.message_id::text,
mr.body,
mr.created_at::text
FROM message_revisions mr
WHERE mr.message_id = ANY($1::uuid[])
ORDER BY mr.created_at DESC
"#
)
.bind(&msg_ids)
.fetch_all(pool)
.await?;
if !revision_rows.is_empty() {
let revisions: Vec<serde_json::Value> = revision_rows.iter().map(|r| {
serde_json::json!({
"message_id": r.0,
"body": r.1,
"edited_at": r.2
})
}).collect();
let revisions_json = serde_json::to_string(&revisions)?;
if let Err(e) = call_reducer(http, spacetimedb_url, module, "load_revisions", &serde_json::json!({
"revisions_json": revisions_json
})).await {
warn!(channel = %ch.name, error = %e, "Feil ved lasting av revisjoner");
} else {
total_revisions_ch = revision_rows.len();
}
}
}
info!(
channel = %ch.name,
mode = %ch.config.warmup_mode,
value = ?ch.config.warmup_value,
messages = count,
reactions = reaction_rows.len(),
revisions = total_revisions_ch,
"Kanal oppvarmet"
);
}
@ -181,7 +223,7 @@ pub async fn run(
Ok(())
}
type MessageRow = (String, String, String, String, String, String, String, Option<String>, String);
type MessageRow = (String, String, String, String, String, String, String, Option<String>, String, String, String);
const MESSAGE_COLUMNS: &str = r#"
m.id::text,
@ -192,7 +234,9 @@ const MESSAGE_COLUMNS: &str = r#"
COALESCE(m.body, ''),
COALESCE(m.message_type::text, 'text'),
m.reply_to::text,
m.created_at::text
m.created_at::text,
COALESCE(m.metadata::text, ''),
COALESCE(m.edited_at::text, '')
"#;
const MESSAGE_JOINS: &str = r#"