// AI-prosessering — hent kilde-content + preset-prompt, kall AI Gateway, // og utfør direction-spesifikk logikk. // // Jobbtype: "ai_process" // Payload: { // "source_node_id": "", // "ai_preset_id": "", // "direction": "node_to_tool" | "tool_to_node", // "requested_by": "" // } // // Flyten: // 1. Hent kilde-node content fra PG // 2. Hent AI-preset prompt + modellprofil fra PG // 3. Map modellprofil → LiteLLM-alias (flash → sidelinja/rutine, standard → sidelinja/resonering) // 4. Send til AI Gateway (LiteLLM) // 5. Logg forbruk i ai_usage_log // 6. Direction-logikk: // - tool_to_node: lagre original som revisjon i node_revisions, oppdater node content // - node_to_tool: opprett ny node med AI-output, opprett derived_from + processed_by edges // // Ref: docs/features/ai_verktoy.md, docs/infra/ai_gateway.md use serde::{Deserialize, Serialize}; use sqlx::PgPool; use uuid::Uuid; use crate::jobs::JobRow; use crate::resource_usage; #[derive(sqlx::FromRow)] #[allow(dead_code)] struct SourceNodeRow { content: Option, title: Option, node_kind: String, visibility: String, metadata: serde_json::Value, } #[derive(sqlx::FromRow)] struct PresetRow { title: Option, metadata: Option, } /// OpenAI-kompatibel chat completion request. #[derive(Serialize)] struct ChatRequest { model: String, messages: Vec, temperature: f32, } #[derive(Serialize)] struct ChatMessage { role: String, content: String, } /// OpenAI-kompatibel chat completion response. #[derive(Deserialize)] struct ChatResponse { choices: Vec, #[serde(default)] usage: Option, #[serde(default)] model: Option, } #[derive(Deserialize, Clone)] struct UsageInfo { #[serde(default)] prompt_tokens: i64, #[serde(default)] completion_tokens: i64, } #[derive(Deserialize)] struct Choice { message: MessageContent, } #[derive(Deserialize)] struct MessageContent { content: Option, } /// Mapper modellprofil til LiteLLM-alias. /// Ref: docs/features/ai_verktoy.md § 4, docs/infra/ai_gateway.md § 3.4 fn model_profile_to_alias(profile: &str) -> &'static str { match profile { "flash" => "sidelinja/rutine", "standard" => "sidelinja/resonering", _ => "sidelinja/rutine", // fallback til billigste } } /// Håndterer ai_process-jobb. pub async fn handle_ai_process( job: &JobRow, db: &PgPool, ) -> Result { let source_node_id: Uuid = job .payload .get("source_node_id") .and_then(|v| v.as_str()) .and_then(|s| s.parse().ok()) .ok_or("Mangler source_node_id i payload")?; let ai_preset_id: Uuid = job .payload .get("ai_preset_id") .and_then(|v| v.as_str()) .and_then(|s| s.parse().ok()) .ok_or("Mangler ai_preset_id i payload")?; let direction = job .payload .get("direction") .and_then(|v| v.as_str()) .ok_or("Mangler direction i payload")?; let requested_by: Uuid = job .payload .get("requested_by") .and_then(|v| v.as_str()) .and_then(|s| s.parse().ok()) .ok_or("Mangler requested_by i payload")?; // 1. Hent kilde-node (inkl. visibility og metadata for direction-logikk) let source = sqlx::query_as::<_, SourceNodeRow>( "SELECT content, title, node_kind, visibility::text AS visibility, metadata FROM nodes WHERE id = $1", ) .bind(source_node_id) .fetch_optional(db) .await .map_err(|e| format!("PG-feil ved henting av kilde-node: {e}"))? .ok_or("Kilde-node finnes ikke")?; let source_content = source .content .as_ref() .filter(|c| !c.is_empty()) .ok_or("Kilde-noden har ikke innhold å behandle")? .clone(); // 2. Hent AI-preset let preset = sqlx::query_as::<_, PresetRow>( "SELECT title, metadata FROM nodes WHERE id = $1 AND node_kind = 'ai_preset'", ) .bind(ai_preset_id) .fetch_optional(db) .await .map_err(|e| format!("PG-feil ved henting av AI-preset: {e}"))? .ok_or("AI-preset finnes ikke")?; let metadata = preset .metadata .ok_or("AI-preset mangler metadata")?; let prompt = metadata .get("prompt") .and_then(|v| v.as_str()) .ok_or("AI-preset mangler prompt i metadata")?; let model_profile = metadata .get("model_profile") .and_then(|v| v.as_str()) .unwrap_or("flash"); // 3. Map modellprofil → LiteLLM-alias let model_alias = model_profile_to_alias(model_profile); tracing::info!( source_node_id = %source_node_id, ai_preset_id = %ai_preset_id, direction = %direction, model_alias = %model_alias, preset_title = ?preset.title, source_content_len = source_content.len(), "Starter AI-prosessering" ); // 4. Kall AI Gateway let (ai_output, usage, actual_model) = call_ai_gateway(model_alias, prompt, &source_content).await?; tracing::info!( source_node_id = %source_node_id, output_len = ai_output.len(), actual_model = ?actual_model, "AI-prosessering fullført" ); // 5. Logg forbruk i ai_usage_log let collection_id = resource_usage::find_collection_for_node(db, source_node_id).await; let (tokens_in, tokens_out) = usage .as_ref() .map(|u| (u.prompt_tokens, u.completion_tokens)) .unwrap_or((0, 0)); let total_tokens = tokens_in + tokens_out; // ai_usage_log — detaljert AI-forbrukslogg if let Err(e) = sqlx::query( r#" INSERT INTO ai_usage_log (collection_node_id, job_id, model_alias, model_actual, prompt_tokens, completion_tokens, total_tokens, job_type) VALUES ($1, $2, $3, $4, $5, $6, $7, 'ai_process') "#, ) .bind(collection_id) .bind(job.id) .bind(model_alias) .bind(actual_model.as_deref()) .bind(tokens_in as i32) .bind(tokens_out as i32) .bind(total_tokens as i32) .execute(db) .await { tracing::warn!(error = %e, "Kunne ikke logge AI-forbruk i ai_usage_log"); } // resource_usage_log — generell ressurslogging if let Err(e) = resource_usage::log( db, source_node_id, Some(requested_by), collection_id, "ai", serde_json::json!({ "model_level": model_profile, "model_id": actual_model.unwrap_or_else(|| "unknown".to_string()), "model_alias": model_alias, "tokens_in": tokens_in, "tokens_out": tokens_out, "job_type": "ai_process", "preset_id": ai_preset_id.to_string(), "direction": direction }), ) .await { tracing::warn!(error = %e, "Kunne ikke logge AI-ressursforbruk"); } // 6. Direction-logikk match direction { "tool_to_node" => { handle_tool_to_node( db, job, source_node_id, ai_preset_id, requested_by, &source, &source_content, &ai_output, ).await?; tracing::info!( source_node_id = %source_node_id, "tool_to_node: original lagret som revisjon, node oppdatert med AI-output" ); Ok(serde_json::json!({ "status": "completed", "source_node_id": source_node_id.to_string(), "ai_preset_id": ai_preset_id.to_string(), "direction": "tool_to_node", "ai_output": ai_output, "tokens_in": tokens_in, "tokens_out": tokens_out, "total_tokens": total_tokens })) } "node_to_tool" => { let new_node_id = handle_node_to_tool( db, source_node_id, ai_preset_id, requested_by, &source, &ai_output, preset.title.as_deref(), ).await?; tracing::info!( source_node_id = %source_node_id, new_node_id = %new_node_id, "node_to_tool: ny node opprettet med derived_from + processed_by edges" ); Ok(serde_json::json!({ "status": "completed", "source_node_id": source_node_id.to_string(), "new_node_id": new_node_id.to_string(), "ai_preset_id": ai_preset_id.to_string(), "direction": "node_to_tool", "ai_output": ai_output, "tokens_in": tokens_in, "tokens_out": tokens_out, "total_tokens": total_tokens })) } other => Err(format!("Ugyldig direction: {other}")), } } /// tool_to_node: "Penselen" — AI-verktøyet brukes PÅ noden. /// Lagrer original content som revisjon, oppdaterer noden med AI-output. /// Ref: docs/features/ai_verktoy.md § 2.2 async fn handle_tool_to_node( db: &PgPool, job: &JobRow, source_node_id: Uuid, ai_preset_id: Uuid, requested_by: Uuid, source: &SourceNodeRow, original_content: &str, ai_output: &str, ) -> Result<(), String> { // 1. Lagre originalt innhold som revisjon sqlx::query( r#" INSERT INTO node_revisions (node_id, content, title, metadata, revision_type, created_by, ai_preset_id, job_id) VALUES ($1, $2, $3, $4, 'ai_edit', $5, $6, $7) "#, ) .bind(source_node_id) .bind(original_content) .bind(source.title.as_deref()) .bind(&source.metadata) .bind(requested_by) .bind(ai_preset_id) .bind(job.id) .execute(db) .await .map_err(|e| format!("Kunne ikke lagre revisjon: {e}"))?; // 2. Oppdater node content i PG (NOTIFY-trigger sender sanntidsoppdatering) sqlx::query( "UPDATE nodes SET content = $2 WHERE id = $1", ) .bind(source_node_id) .bind(ai_output) .execute(db) .await .map_err(|e| format!("PG update node content feilet: {e}"))?; Ok(()) } /// node_to_tool: "Kverna" — noden sendes GJENNOM verktøyet. /// Oppretter ny node med AI-output, med derived_from-edge til kilde /// og processed_by-edge til AI-preset. /// Ref: docs/features/ai_verktoy.md § 2.2 async fn handle_node_to_tool( db: &PgPool, source_node_id: Uuid, ai_preset_id: Uuid, requested_by: Uuid, source: &SourceNodeRow, ai_output: &str, preset_title: Option<&str>, ) -> Result { let new_node_id = Uuid::now_v7(); let new_title = format!( "{} → {}", source.title.as_deref().unwrap_or("Uten tittel"), preset_title.unwrap_or("AI"), ); let new_metadata = serde_json::json!({ "ai_generated": true, "source_node_id": source_node_id.to_string(), "ai_preset_id": ai_preset_id.to_string() }); // 1. Opprett ny node i PG (NOTIFY-trigger sender sanntidsoppdatering) sqlx::query( r#" INSERT INTO nodes (id, node_kind, title, content, visibility, metadata, created_by) VALUES ($1, 'content', $2, $3, $4::visibility, $5, $6) "#, ) .bind(new_node_id) .bind(&new_title) .bind(ai_output) .bind(&source.visibility) .bind(&new_metadata) .bind(requested_by) .execute(db) .await .map_err(|e| format!("PG insert ny node feilet: {e}"))?; // 2. Opprett derived_from-edge: ny node → kilde-node let derived_edge_id = Uuid::now_v7(); sqlx::query( r#" INSERT INTO edges (id, source_id, target_id, edge_type, metadata, system, created_by) VALUES ($1, $2, $3, 'derived_from', '{}', false, $4) "#, ) .bind(derived_edge_id) .bind(new_node_id) .bind(source_node_id) .bind(requested_by) .execute(db) .await .map_err(|e| format!("PG insert derived_from-edge feilet: {e}"))?; // 3. Opprett processed_by-edge: ny node → AI-preset let processed_edge_id = Uuid::now_v7(); sqlx::query( r#" INSERT INTO edges (id, source_id, target_id, edge_type, metadata, system, created_by) VALUES ($1, $2, $3, 'processed_by', '{}', false, $4) "#, ) .bind(processed_edge_id) .bind(new_node_id) .bind(ai_preset_id) .bind(requested_by) .execute(db) .await .map_err(|e| format!("PG insert processed_by-edge feilet: {e}"))?; Ok(new_node_id) } /// Kall AI Gateway (LiteLLM) for tekstbehandling. /// Returnerer (output_text, usage, actual_model_name). async fn call_ai_gateway( model_alias: &str, system_prompt: &str, user_content: &str, ) -> Result<(String, Option, Option), String> { let gateway_url = std::env::var("AI_GATEWAY_URL") .unwrap_or_else(|_| "http://localhost:4000".to_string()); let api_key = std::env::var("LITELLM_MASTER_KEY") .map_err(|_| "LITELLM_MASTER_KEY ikke satt — kan ikke kalle AI Gateway".to_string())?; let request = ChatRequest { model: model_alias.to_string(), messages: vec![ ChatMessage { role: "system".to_string(), content: system_prompt.to_string(), }, ChatMessage { role: "user".to_string(), content: user_content.to_string(), }, ], temperature: 0.3, }; let client = reqwest::Client::new(); let url = format!("{gateway_url}/v1/chat/completions"); let resp = client .post(&url) .header("Authorization", format!("Bearer {api_key}")) .header("Content-Type", "application/json") .json(&request) .timeout(std::time::Duration::from_secs(120)) .send() .await .map_err(|e| format!("AI Gateway-kall feilet: {e}"))?; if !resp.status().is_success() { let status = resp.status(); let body = resp.text().await.unwrap_or_default(); return Err(format!("AI Gateway returnerte {status}: {body}")); } let chat_resp: ChatResponse = resp .json() .await .map_err(|e| format!("Kunne ikke parse AI Gateway-respons: {e}"))?; let content = chat_resp .choices .first() .and_then(|c| c.message.content.as_deref()) .ok_or("AI Gateway returnerte ingen content")?; Ok((content.to_string(), chat_resp.usage, chat_resp.model)) }