synops/maskinrommet/src/ai_process.rs
vegard 2fa5d7ef2f AI-kostnadstak per bruker/samling: budsjettsjekk før AI-kall (oppgave 28.3)
Samlings- og brukernoder kan nå ha ai_budget i metadata:
  { "ai_budget": { "monthly_limit_usd": 50.0 } }

Før hvert AI-kall aggregeres inneværende måneds forbruk fra
ai_usage_log og sammenlignes med grensen. Ved overskridelse:
- AI-kallet blokkeres med feilmelding
- En work_item-node opprettes med tag "budget_exceeded"
- Work_item knyttes til samlingen via belongs_to-edge

Endringer:
- migrations/029: requested_by-kolonne i ai_usage_log + indekser
- synops-ai: --collection-id/--user-id flagg, budsjettsjekk i prompt
- maskinrommet/ai_budget.rs: delt budsjettsjekk-modul
- maskinrommet/ai_process.rs: budsjettsjekk før AI gateway-kall
- docs/infra/ai_gateway.md: oppdatert § 6.3 fra "fase 2" til implementert
2026-03-18 20:19:52 +00:00

506 lines
15 KiB
Rust

// AI-prosessering — hent kilde-content + preset-prompt, kall AI Gateway,
// og utfør direction-spesifikk logikk.
//
// Jobbtype: "ai_process"
// Payload: {
// "source_node_id": "<uuid>",
// "ai_preset_id": "<uuid>",
// "direction": "node_to_tool" | "tool_to_node",
// "requested_by": "<uuid>"
// }
//
// Flyten:
// 1. Hent kilde-node content fra PG
// 2. Hent AI-preset prompt + modellprofil fra PG
// 3. Map modellprofil → LiteLLM-alias (flash → sidelinja/rutine, standard → sidelinja/resonering)
// 4. Send til AI Gateway (LiteLLM)
// 5. Logg forbruk i ai_usage_log
// 6. Direction-logikk:
// - tool_to_node: lagre original som revisjon i node_revisions, oppdater node content
// - node_to_tool: opprett ny node med AI-output, opprett derived_from + processed_by edges
//
// Ref: docs/features/ai_verktoy.md, docs/infra/ai_gateway.md
use serde::{Deserialize, Serialize};
use sqlx::PgPool;
use uuid::Uuid;
use crate::jobs::JobRow;
use crate::resource_usage;
#[derive(sqlx::FromRow)]
#[allow(dead_code)]
struct SourceNodeRow {
content: Option<String>,
title: Option<String>,
node_kind: String,
visibility: String,
metadata: serde_json::Value,
}
#[derive(sqlx::FromRow)]
struct PresetRow {
title: Option<String>,
metadata: Option<serde_json::Value>,
}
/// OpenAI-kompatibel chat completion request.
#[derive(Serialize)]
struct ChatRequest {
model: String,
messages: Vec<ChatMessage>,
temperature: f32,
}
#[derive(Serialize)]
struct ChatMessage {
role: String,
content: String,
}
/// OpenAI-kompatibel chat completion response.
#[derive(Deserialize)]
struct ChatResponse {
choices: Vec<Choice>,
#[serde(default)]
usage: Option<UsageInfo>,
#[serde(default)]
model: Option<String>,
}
#[derive(Deserialize, Clone)]
struct UsageInfo {
#[serde(default)]
prompt_tokens: i64,
#[serde(default)]
completion_tokens: i64,
}
#[derive(Deserialize)]
struct Choice {
message: MessageContent,
}
#[derive(Deserialize)]
struct MessageContent {
content: Option<String>,
}
/// Mapper modellprofil til LiteLLM-alias.
/// Ref: docs/features/ai_verktoy.md § 4, docs/infra/ai_gateway.md § 3.4
fn model_profile_to_alias(profile: &str) -> &'static str {
match profile {
"flash" => "sidelinja/rutine",
"standard" => "sidelinja/resonering",
_ => "sidelinja/rutine", // fallback til billigste
}
}
/// Håndterer ai_process-jobb.
pub async fn handle_ai_process(
job: &JobRow,
db: &PgPool,
) -> Result<serde_json::Value, String> {
let source_node_id: Uuid = job
.payload
.get("source_node_id")
.and_then(|v| v.as_str())
.and_then(|s| s.parse().ok())
.ok_or("Mangler source_node_id i payload")?;
let ai_preset_id: Uuid = job
.payload
.get("ai_preset_id")
.and_then(|v| v.as_str())
.and_then(|s| s.parse().ok())
.ok_or("Mangler ai_preset_id i payload")?;
let direction = job
.payload
.get("direction")
.and_then(|v| v.as_str())
.ok_or("Mangler direction i payload")?;
let requested_by: Uuid = job
.payload
.get("requested_by")
.and_then(|v| v.as_str())
.and_then(|s| s.parse().ok())
.ok_or("Mangler requested_by i payload")?;
// 1. Hent kilde-node (inkl. visibility og metadata for direction-logikk)
let source = sqlx::query_as::<_, SourceNodeRow>(
"SELECT content, title, node_kind, visibility::text AS visibility, metadata FROM nodes WHERE id = $1",
)
.bind(source_node_id)
.fetch_optional(db)
.await
.map_err(|e| format!("PG-feil ved henting av kilde-node: {e}"))?
.ok_or("Kilde-node finnes ikke")?;
let source_content = source
.content
.as_ref()
.filter(|c| !c.is_empty())
.ok_or("Kilde-noden har ikke innhold å behandle")?
.clone();
// 2. Hent AI-preset
let preset = sqlx::query_as::<_, PresetRow>(
"SELECT title, metadata FROM nodes WHERE id = $1 AND node_kind = 'ai_preset'",
)
.bind(ai_preset_id)
.fetch_optional(db)
.await
.map_err(|e| format!("PG-feil ved henting av AI-preset: {e}"))?
.ok_or("AI-preset finnes ikke")?;
let metadata = preset
.metadata
.ok_or("AI-preset mangler metadata")?;
let prompt = metadata
.get("prompt")
.and_then(|v| v.as_str())
.ok_or("AI-preset mangler prompt i metadata")?;
let model_profile = metadata
.get("model_profile")
.and_then(|v| v.as_str())
.unwrap_or("flash");
// 3. Map modellprofil → LiteLLM-alias
let model_alias = model_profile_to_alias(model_profile);
// 3.5. Budsjettsjekk for samlingen (oppgave 28.3)
let collection_id = resource_usage::find_collection_for_node(db, source_node_id).await;
if let Some(coll_id) = collection_id {
if let Err(status) = crate::ai_budget::check_collection_budget(db, coll_id).await {
crate::ai_budget::create_budget_work_item(
db, coll_id, Some(requested_by), &status,
).await;
return Err(format!(
"AI-budsjett overskredet for \"{}\": forbruk ${:.2} >= grense ${:.2}",
status.node_title, status.current_cost, status.limit
));
}
}
// Budsjettsjekk for brukeren
if let Err(status) = crate::ai_budget::check_user_budget(db, requested_by).await {
let coll = collection_id.unwrap_or(source_node_id);
crate::ai_budget::create_budget_work_item(
db, coll, Some(requested_by), &status,
).await;
return Err(format!(
"AI-budsjett overskredet for bruker \"{}\": forbruk ${:.2} >= grense ${:.2}",
status.node_title, status.current_cost, status.limit
));
}
tracing::info!(
source_node_id = %source_node_id,
ai_preset_id = %ai_preset_id,
direction = %direction,
model_alias = %model_alias,
preset_title = ?preset.title,
source_content_len = source_content.len(),
"Starter AI-prosessering"
);
// 4. Kall AI Gateway
let (ai_output, usage, actual_model) =
call_ai_gateway(model_alias, prompt, &source_content).await?;
tracing::info!(
source_node_id = %source_node_id,
output_len = ai_output.len(),
actual_model = ?actual_model,
"AI-prosessering fullført"
);
// 5. Logg forbruk i ai_usage_log
// collection_id allerede hentet i budsjettsjekk (steg 3.5)
let (tokens_in, tokens_out) = usage
.as_ref()
.map(|u| (u.prompt_tokens, u.completion_tokens))
.unwrap_or((0, 0));
let total_tokens = tokens_in + tokens_out;
// ai_usage_log — detaljert AI-forbrukslogg (inkl. requested_by)
if let Err(e) = sqlx::query(
r#"
INSERT INTO ai_usage_log
(collection_node_id, job_id, requested_by, model_alias, model_actual,
prompt_tokens, completion_tokens, total_tokens, job_type)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, 'ai_process')
"#,
)
.bind(collection_id)
.bind(job.id)
.bind(requested_by)
.bind(model_alias)
.bind(actual_model.as_deref())
.bind(tokens_in as i32)
.bind(tokens_out as i32)
.bind(total_tokens as i32)
.execute(db)
.await
{
tracing::warn!(error = %e, "Kunne ikke logge AI-forbruk i ai_usage_log");
}
// resource_usage_log — generell ressurslogging
if let Err(e) = resource_usage::log(
db,
source_node_id,
Some(requested_by),
collection_id,
"ai",
serde_json::json!({
"model_level": model_profile,
"model_id": actual_model.unwrap_or_else(|| "unknown".to_string()),
"model_alias": model_alias,
"tokens_in": tokens_in,
"tokens_out": tokens_out,
"job_type": "ai_process",
"preset_id": ai_preset_id.to_string(),
"direction": direction
}),
)
.await
{
tracing::warn!(error = %e, "Kunne ikke logge AI-ressursforbruk");
}
// 6. Direction-logikk
match direction {
"tool_to_node" => {
handle_tool_to_node(
db, job, source_node_id, ai_preset_id, requested_by,
&source, &source_content, &ai_output,
).await?;
tracing::info!(
source_node_id = %source_node_id,
"tool_to_node: original lagret som revisjon, node oppdatert med AI-output"
);
Ok(serde_json::json!({
"status": "completed",
"source_node_id": source_node_id.to_string(),
"ai_preset_id": ai_preset_id.to_string(),
"direction": "tool_to_node",
"ai_output": ai_output,
"tokens_in": tokens_in,
"tokens_out": tokens_out,
"total_tokens": total_tokens
}))
}
"node_to_tool" => {
let new_node_id = handle_node_to_tool(
db, source_node_id, ai_preset_id, requested_by,
&source, &ai_output, preset.title.as_deref(),
).await?;
tracing::info!(
source_node_id = %source_node_id,
new_node_id = %new_node_id,
"node_to_tool: ny node opprettet med derived_from + processed_by edges"
);
Ok(serde_json::json!({
"status": "completed",
"source_node_id": source_node_id.to_string(),
"new_node_id": new_node_id.to_string(),
"ai_preset_id": ai_preset_id.to_string(),
"direction": "node_to_tool",
"ai_output": ai_output,
"tokens_in": tokens_in,
"tokens_out": tokens_out,
"total_tokens": total_tokens
}))
}
other => Err(format!("Ugyldig direction: {other}")),
}
}
/// tool_to_node: "Penselen" — AI-verktøyet brukes PÅ noden.
/// Lagrer original content som revisjon, oppdaterer noden med AI-output.
/// Ref: docs/features/ai_verktoy.md § 2.2
async fn handle_tool_to_node(
db: &PgPool,
job: &JobRow,
source_node_id: Uuid,
ai_preset_id: Uuid,
requested_by: Uuid,
source: &SourceNodeRow,
original_content: &str,
ai_output: &str,
) -> Result<(), String> {
// 1. Lagre originalt innhold som revisjon
sqlx::query(
r#"
INSERT INTO node_revisions (node_id, content, title, metadata, revision_type, created_by, ai_preset_id, job_id)
VALUES ($1, $2, $3, $4, 'ai_edit', $5, $6, $7)
"#,
)
.bind(source_node_id)
.bind(original_content)
.bind(source.title.as_deref())
.bind(&source.metadata)
.bind(requested_by)
.bind(ai_preset_id)
.bind(job.id)
.execute(db)
.await
.map_err(|e| format!("Kunne ikke lagre revisjon: {e}"))?;
// 2. Oppdater node content i PG (NOTIFY-trigger sender sanntidsoppdatering)
sqlx::query(
"UPDATE nodes SET content = $2 WHERE id = $1",
)
.bind(source_node_id)
.bind(ai_output)
.execute(db)
.await
.map_err(|e| format!("PG update node content feilet: {e}"))?;
Ok(())
}
/// node_to_tool: "Kverna" — noden sendes GJENNOM verktøyet.
/// Oppretter ny node med AI-output, med derived_from-edge til kilde
/// og processed_by-edge til AI-preset.
/// Ref: docs/features/ai_verktoy.md § 2.2
async fn handle_node_to_tool(
db: &PgPool,
source_node_id: Uuid,
ai_preset_id: Uuid,
requested_by: Uuid,
source: &SourceNodeRow,
ai_output: &str,
preset_title: Option<&str>,
) -> Result<Uuid, String> {
let new_node_id = Uuid::now_v7();
let new_title = format!(
"{}{}",
source.title.as_deref().unwrap_or("Uten tittel"),
preset_title.unwrap_or("AI"),
);
let new_metadata = serde_json::json!({
"ai_generated": true,
"source_node_id": source_node_id.to_string(),
"ai_preset_id": ai_preset_id.to_string()
});
// 1. Opprett ny node i PG (NOTIFY-trigger sender sanntidsoppdatering)
sqlx::query(
r#"
INSERT INTO nodes (id, node_kind, title, content, visibility, metadata, created_by)
VALUES ($1, 'content', $2, $3, $4::visibility, $5, $6)
"#,
)
.bind(new_node_id)
.bind(&new_title)
.bind(ai_output)
.bind(&source.visibility)
.bind(&new_metadata)
.bind(requested_by)
.execute(db)
.await
.map_err(|e| format!("PG insert ny node feilet: {e}"))?;
// 2. Opprett derived_from-edge: ny node → kilde-node
let derived_edge_id = Uuid::now_v7();
sqlx::query(
r#"
INSERT INTO edges (id, source_id, target_id, edge_type, metadata, system, created_by)
VALUES ($1, $2, $3, 'derived_from', '{}', false, $4)
"#,
)
.bind(derived_edge_id)
.bind(new_node_id)
.bind(source_node_id)
.bind(requested_by)
.execute(db)
.await
.map_err(|e| format!("PG insert derived_from-edge feilet: {e}"))?;
// 3. Opprett processed_by-edge: ny node → AI-preset
let processed_edge_id = Uuid::now_v7();
sqlx::query(
r#"
INSERT INTO edges (id, source_id, target_id, edge_type, metadata, system, created_by)
VALUES ($1, $2, $3, 'processed_by', '{}', false, $4)
"#,
)
.bind(processed_edge_id)
.bind(new_node_id)
.bind(ai_preset_id)
.bind(requested_by)
.execute(db)
.await
.map_err(|e| format!("PG insert processed_by-edge feilet: {e}"))?;
Ok(new_node_id)
}
/// Kall AI Gateway (LiteLLM) for tekstbehandling.
/// Returnerer (output_text, usage, actual_model_name).
async fn call_ai_gateway(
model_alias: &str,
system_prompt: &str,
user_content: &str,
) -> Result<(String, Option<UsageInfo>, Option<String>), String> {
let gateway_url = std::env::var("AI_GATEWAY_URL")
.unwrap_or_else(|_| "http://localhost:4000".to_string());
let api_key = std::env::var("LITELLM_MASTER_KEY")
.map_err(|_| "LITELLM_MASTER_KEY ikke satt — kan ikke kalle AI Gateway".to_string())?;
let request = ChatRequest {
model: model_alias.to_string(),
messages: vec![
ChatMessage {
role: "system".to_string(),
content: system_prompt.to_string(),
},
ChatMessage {
role: "user".to_string(),
content: user_content.to_string(),
},
],
temperature: 0.3,
};
let client = reqwest::Client::new();
let url = format!("{gateway_url}/v1/chat/completions");
let resp = client
.post(&url)
.header("Authorization", format!("Bearer {api_key}"))
.header("Content-Type", "application/json")
.json(&request)
.timeout(std::time::Duration::from_secs(120))
.send()
.await
.map_err(|e| format!("AI Gateway-kall feilet: {e}"))?;
if !resp.status().is_success() {
let status = resp.status();
let body = resp.text().await.unwrap_or_default();
return Err(format!("AI Gateway returnerte {status}: {body}"));
}
let chat_resp: ChatResponse = resp
.json()
.await
.map_err(|e| format!("Kunne ikke parse AI Gateway-respons: {e}"))?;
let content = chat_resp
.choices
.first()
.and_then(|c| c.message.content.as_deref())
.ok_or("AI Gateway returnerte ingen content")?;
Ok((content.to_string(), chat_resp.usage, chat_resp.model))
}