synops/maskinrommet/src/ai_process.rs
vegard aee6adc425 Fjern STDB-skrivestien: all skriving går kun til PG (oppgave 22.3)
SpacetimeDB var brukt som «instant feedback»-lag mellom portvokteren
og frontend. Nå som PG NOTIFY-triggere og WebSocket er på plass
(oppgave 22.1–22.2), er STDB-skrivestien overflødig.

Endringer:
- intentions.rs: Alle CRUD-operasjoner (create/update/delete node/edge)
  skriver nå synkront til PG i stedet for STDB-først + async PG-jobbkø.
  PG NOTIFY-triggere gir umiddelbar sanntidsoppdatering til klienter.
  Tilgangsgivende edges (owner/admin/member_of/reader) bruker transaksjon
  med recompute_access direkte i handleren.
- maintenance.rs: Fjernet StdbClient fra alle funksjoner. Varsler
  opprettes/oppdateres/slettes direkte i PG.
- agent.rs, audio.rs, tts.rs, ai_process.rs: Fjernet STDB-synk etter
  CLI-verktøy-kjøring. PG NOTIFY dekker sanntidsvisning.
- pg_writes.rs: Fjernet sync_node_access_to_stdb. access_changed
  NOTIFY-trigger håndterer dette.
- workspace.rs: Synkrone PG-skrivinger med recompute_access.
- summarize.rs, ai_edges.rs: Fjernet StdbClient fra signaturer.
- jobs.rs: Fjernet StdbClient fra dispatch og start_worker.
- main.rs: Fjernet STDB-initialisering, warmup, stdb_monitor.
  StdbClient fjernet fra AppState. stdb.rs beholdt som død kode
  (fjernes i oppgave 22.4).
- health.rs: Fjernet STDB-helsesjekk fra dashboard.
- Slettet warmup.rs og stdb_monitor.rs (PG→STDB-synk ikke lenger
  relevant).
- docs/retninger/datalaget.md: Markert fase M3 som fullført.
2026-03-18 13:11:33 +00:00

478 lines
14 KiB
Rust

// AI-prosessering — hent kilde-content + preset-prompt, kall AI Gateway,
// og utfør direction-spesifikk logikk.
//
// Jobbtype: "ai_process"
// Payload: {
// "source_node_id": "<uuid>",
// "ai_preset_id": "<uuid>",
// "direction": "node_to_tool" | "tool_to_node",
// "requested_by": "<uuid>"
// }
//
// Flyten:
// 1. Hent kilde-node content fra PG
// 2. Hent AI-preset prompt + modellprofil fra PG
// 3. Map modellprofil → LiteLLM-alias (flash → sidelinja/rutine, standard → sidelinja/resonering)
// 4. Send til AI Gateway (LiteLLM)
// 5. Logg forbruk i ai_usage_log
// 6. Direction-logikk:
// - tool_to_node: lagre original som revisjon i node_revisions, oppdater node content
// - node_to_tool: opprett ny node med AI-output, opprett derived_from + processed_by edges
//
// Ref: docs/features/ai_verktoy.md, docs/infra/ai_gateway.md
use serde::{Deserialize, Serialize};
use sqlx::PgPool;
use uuid::Uuid;
use crate::jobs::JobRow;
use crate::resource_usage;
#[derive(sqlx::FromRow)]
struct SourceNodeRow {
content: Option<String>,
title: Option<String>,
node_kind: String,
visibility: String,
metadata: serde_json::Value,
}
#[derive(sqlx::FromRow)]
struct PresetRow {
title: Option<String>,
metadata: Option<serde_json::Value>,
}
/// OpenAI-kompatibel chat completion request.
#[derive(Serialize)]
struct ChatRequest {
model: String,
messages: Vec<ChatMessage>,
temperature: f32,
}
#[derive(Serialize)]
struct ChatMessage {
role: String,
content: String,
}
/// OpenAI-kompatibel chat completion response.
#[derive(Deserialize)]
struct ChatResponse {
choices: Vec<Choice>,
#[serde(default)]
usage: Option<UsageInfo>,
#[serde(default)]
model: Option<String>,
}
#[derive(Deserialize, Clone)]
struct UsageInfo {
#[serde(default)]
prompt_tokens: i64,
#[serde(default)]
completion_tokens: i64,
}
#[derive(Deserialize)]
struct Choice {
message: MessageContent,
}
#[derive(Deserialize)]
struct MessageContent {
content: Option<String>,
}
/// Mapper modellprofil til LiteLLM-alias.
/// Ref: docs/features/ai_verktoy.md § 4, docs/infra/ai_gateway.md § 3.4
fn model_profile_to_alias(profile: &str) -> &'static str {
match profile {
"flash" => "sidelinja/rutine",
"standard" => "sidelinja/resonering",
_ => "sidelinja/rutine", // fallback til billigste
}
}
/// Håndterer ai_process-jobb.
pub async fn handle_ai_process(
job: &JobRow,
db: &PgPool,
) -> Result<serde_json::Value, String> {
let source_node_id: Uuid = job
.payload
.get("source_node_id")
.and_then(|v| v.as_str())
.and_then(|s| s.parse().ok())
.ok_or("Mangler source_node_id i payload")?;
let ai_preset_id: Uuid = job
.payload
.get("ai_preset_id")
.and_then(|v| v.as_str())
.and_then(|s| s.parse().ok())
.ok_or("Mangler ai_preset_id i payload")?;
let direction = job
.payload
.get("direction")
.and_then(|v| v.as_str())
.ok_or("Mangler direction i payload")?;
let requested_by: Uuid = job
.payload
.get("requested_by")
.and_then(|v| v.as_str())
.and_then(|s| s.parse().ok())
.ok_or("Mangler requested_by i payload")?;
// 1. Hent kilde-node (inkl. visibility og metadata for direction-logikk)
let source = sqlx::query_as::<_, SourceNodeRow>(
"SELECT content, title, node_kind, visibility::text AS visibility, metadata FROM nodes WHERE id = $1",
)
.bind(source_node_id)
.fetch_optional(db)
.await
.map_err(|e| format!("PG-feil ved henting av kilde-node: {e}"))?
.ok_or("Kilde-node finnes ikke")?;
let source_content = source
.content
.as_ref()
.filter(|c| !c.is_empty())
.ok_or("Kilde-noden har ikke innhold å behandle")?
.clone();
// 2. Hent AI-preset
let preset = sqlx::query_as::<_, PresetRow>(
"SELECT title, metadata FROM nodes WHERE id = $1 AND node_kind = 'ai_preset'",
)
.bind(ai_preset_id)
.fetch_optional(db)
.await
.map_err(|e| format!("PG-feil ved henting av AI-preset: {e}"))?
.ok_or("AI-preset finnes ikke")?;
let metadata = preset
.metadata
.ok_or("AI-preset mangler metadata")?;
let prompt = metadata
.get("prompt")
.and_then(|v| v.as_str())
.ok_or("AI-preset mangler prompt i metadata")?;
let model_profile = metadata
.get("model_profile")
.and_then(|v| v.as_str())
.unwrap_or("flash");
// 3. Map modellprofil → LiteLLM-alias
let model_alias = model_profile_to_alias(model_profile);
tracing::info!(
source_node_id = %source_node_id,
ai_preset_id = %ai_preset_id,
direction = %direction,
model_alias = %model_alias,
preset_title = ?preset.title,
source_content_len = source_content.len(),
"Starter AI-prosessering"
);
// 4. Kall AI Gateway
let (ai_output, usage, actual_model) =
call_ai_gateway(model_alias, prompt, &source_content).await?;
tracing::info!(
source_node_id = %source_node_id,
output_len = ai_output.len(),
actual_model = ?actual_model,
"AI-prosessering fullført"
);
// 5. Logg forbruk i ai_usage_log
let collection_id = resource_usage::find_collection_for_node(db, source_node_id).await;
let (tokens_in, tokens_out) = usage
.as_ref()
.map(|u| (u.prompt_tokens, u.completion_tokens))
.unwrap_or((0, 0));
let total_tokens = tokens_in + tokens_out;
// ai_usage_log — detaljert AI-forbrukslogg
if let Err(e) = sqlx::query(
r#"
INSERT INTO ai_usage_log
(collection_node_id, job_id, model_alias, model_actual,
prompt_tokens, completion_tokens, total_tokens, job_type)
VALUES ($1, $2, $3, $4, $5, $6, $7, 'ai_process')
"#,
)
.bind(collection_id)
.bind(job.id)
.bind(model_alias)
.bind(actual_model.as_deref())
.bind(tokens_in as i32)
.bind(tokens_out as i32)
.bind(total_tokens as i32)
.execute(db)
.await
{
tracing::warn!(error = %e, "Kunne ikke logge AI-forbruk i ai_usage_log");
}
// resource_usage_log — generell ressurslogging
if let Err(e) = resource_usage::log(
db,
source_node_id,
Some(requested_by),
collection_id,
"ai",
serde_json::json!({
"model_level": model_profile,
"model_id": actual_model.unwrap_or_else(|| "unknown".to_string()),
"model_alias": model_alias,
"tokens_in": tokens_in,
"tokens_out": tokens_out,
"job_type": "ai_process",
"preset_id": ai_preset_id.to_string(),
"direction": direction
}),
)
.await
{
tracing::warn!(error = %e, "Kunne ikke logge AI-ressursforbruk");
}
// 6. Direction-logikk
match direction {
"tool_to_node" => {
handle_tool_to_node(
db, job, source_node_id, ai_preset_id, requested_by,
&source, &source_content, &ai_output,
).await?;
tracing::info!(
source_node_id = %source_node_id,
"tool_to_node: original lagret som revisjon, node oppdatert med AI-output"
);
Ok(serde_json::json!({
"status": "completed",
"source_node_id": source_node_id.to_string(),
"ai_preset_id": ai_preset_id.to_string(),
"direction": "tool_to_node",
"ai_output": ai_output,
"tokens_in": tokens_in,
"tokens_out": tokens_out,
"total_tokens": total_tokens
}))
}
"node_to_tool" => {
let new_node_id = handle_node_to_tool(
db, source_node_id, ai_preset_id, requested_by,
&source, &ai_output, preset.title.as_deref(),
).await?;
tracing::info!(
source_node_id = %source_node_id,
new_node_id = %new_node_id,
"node_to_tool: ny node opprettet med derived_from + processed_by edges"
);
Ok(serde_json::json!({
"status": "completed",
"source_node_id": source_node_id.to_string(),
"new_node_id": new_node_id.to_string(),
"ai_preset_id": ai_preset_id.to_string(),
"direction": "node_to_tool",
"ai_output": ai_output,
"tokens_in": tokens_in,
"tokens_out": tokens_out,
"total_tokens": total_tokens
}))
}
other => Err(format!("Ugyldig direction: {other}")),
}
}
/// tool_to_node: "Penselen" — AI-verktøyet brukes PÅ noden.
/// Lagrer original content som revisjon, oppdaterer noden med AI-output.
/// Ref: docs/features/ai_verktoy.md § 2.2
async fn handle_tool_to_node(
db: &PgPool,
job: &JobRow,
source_node_id: Uuid,
ai_preset_id: Uuid,
requested_by: Uuid,
source: &SourceNodeRow,
original_content: &str,
ai_output: &str,
) -> Result<(), String> {
// 1. Lagre originalt innhold som revisjon
sqlx::query(
r#"
INSERT INTO node_revisions (node_id, content, title, metadata, revision_type, created_by, ai_preset_id, job_id)
VALUES ($1, $2, $3, $4, 'ai_edit', $5, $6, $7)
"#,
)
.bind(source_node_id)
.bind(original_content)
.bind(source.title.as_deref())
.bind(&source.metadata)
.bind(requested_by)
.bind(ai_preset_id)
.bind(job.id)
.execute(db)
.await
.map_err(|e| format!("Kunne ikke lagre revisjon: {e}"))?;
// 2. Oppdater node content i PG (NOTIFY-trigger sender sanntidsoppdatering)
sqlx::query(
"UPDATE nodes SET content = $2 WHERE id = $1",
)
.bind(source_node_id)
.bind(ai_output)
.execute(db)
.await
.map_err(|e| format!("PG update node content feilet: {e}"))?;
Ok(())
}
/// node_to_tool: "Kverna" — noden sendes GJENNOM verktøyet.
/// Oppretter ny node med AI-output, med derived_from-edge til kilde
/// og processed_by-edge til AI-preset.
/// Ref: docs/features/ai_verktoy.md § 2.2
async fn handle_node_to_tool(
db: &PgPool,
source_node_id: Uuid,
ai_preset_id: Uuid,
requested_by: Uuid,
source: &SourceNodeRow,
ai_output: &str,
preset_title: Option<&str>,
) -> Result<Uuid, String> {
let new_node_id = Uuid::now_v7();
let new_title = format!(
"{}{}",
source.title.as_deref().unwrap_or("Uten tittel"),
preset_title.unwrap_or("AI"),
);
let new_metadata = serde_json::json!({
"ai_generated": true,
"source_node_id": source_node_id.to_string(),
"ai_preset_id": ai_preset_id.to_string()
});
// 1. Opprett ny node i PG (NOTIFY-trigger sender sanntidsoppdatering)
sqlx::query(
r#"
INSERT INTO nodes (id, node_kind, title, content, visibility, metadata, created_by)
VALUES ($1, 'content', $2, $3, $4::visibility, $5, $6)
"#,
)
.bind(new_node_id)
.bind(&new_title)
.bind(ai_output)
.bind(&source.visibility)
.bind(&new_metadata)
.bind(requested_by)
.execute(db)
.await
.map_err(|e| format!("PG insert ny node feilet: {e}"))?;
// 2. Opprett derived_from-edge: ny node → kilde-node
let derived_edge_id = Uuid::now_v7();
sqlx::query(
r#"
INSERT INTO edges (id, source_id, target_id, edge_type, metadata, system, created_by)
VALUES ($1, $2, $3, 'derived_from', '{}', false, $4)
"#,
)
.bind(derived_edge_id)
.bind(new_node_id)
.bind(source_node_id)
.bind(requested_by)
.execute(db)
.await
.map_err(|e| format!("PG insert derived_from-edge feilet: {e}"))?;
// 3. Opprett processed_by-edge: ny node → AI-preset
let processed_edge_id = Uuid::now_v7();
sqlx::query(
r#"
INSERT INTO edges (id, source_id, target_id, edge_type, metadata, system, created_by)
VALUES ($1, $2, $3, 'processed_by', '{}', false, $4)
"#,
)
.bind(processed_edge_id)
.bind(new_node_id)
.bind(ai_preset_id)
.bind(requested_by)
.execute(db)
.await
.map_err(|e| format!("PG insert processed_by-edge feilet: {e}"))?;
Ok(new_node_id)
}
/// Kall AI Gateway (LiteLLM) for tekstbehandling.
/// Returnerer (output_text, usage, actual_model_name).
async fn call_ai_gateway(
model_alias: &str,
system_prompt: &str,
user_content: &str,
) -> Result<(String, Option<UsageInfo>, Option<String>), String> {
let gateway_url = std::env::var("AI_GATEWAY_URL")
.unwrap_or_else(|_| "http://localhost:4000".to_string());
let api_key = std::env::var("LITELLM_MASTER_KEY").unwrap_or_default();
let request = ChatRequest {
model: model_alias.to_string(),
messages: vec![
ChatMessage {
role: "system".to_string(),
content: system_prompt.to_string(),
},
ChatMessage {
role: "user".to_string(),
content: user_content.to_string(),
},
],
temperature: 0.3,
};
let client = reqwest::Client::new();
let url = format!("{gateway_url}/v1/chat/completions");
let resp = client
.post(&url)
.header("Authorization", format!("Bearer {api_key}"))
.header("Content-Type", "application/json")
.json(&request)
.timeout(std::time::Duration::from_secs(120))
.send()
.await
.map_err(|e| format!("AI Gateway-kall feilet: {e}"))?;
if !resp.status().is_success() {
let status = resp.status();
let body = resp.text().await.unwrap_or_default();
return Err(format!("AI Gateway returnerte {status}: {body}"));
}
let chat_resp: ChatResponse = resp
.json()
.await
.map_err(|e| format!("Kunne ikke parse AI Gateway-respons: {e}"))?;
let content = chat_resp
.choices
.first()
.and_then(|c| c.message.content.as_deref())
.ok_or("AI Gateway returnerte ingen content")?;
Ok((content.to_string(), chat_resp.usage, chat_resp.model))
}