// AI-foreslåtte edges — sender innhold til LLM, foreslår mentions og topics. // // Jobbtype: "suggest_edges" // Payload: { "node_id": "" } // // Flyten: // 1. Hent nodens innhold fra PG // 2. Hent eksisterende topic-noder for gjenbruk // 3. Send til LiteLLM (OpenAI-kompatibelt API via AI Gateway) // 4. Parse LLM-svar (JSON med topics og mentions) // 5. Opprett nye topic-noder for ukjente topics // 6. Opprett mentions-edges fra innholdsnode til topic/entity-noder // // Ref: docs/infra/ai_gateway.md, docs/concepts/kunnskapsgrafen.md use serde::{Deserialize, Serialize}; use sqlx::PgPool; use uuid::Uuid; use crate::jobs::JobRow; use crate::stdb::StdbClient; /// Eksisterende topic-node fra PG. #[derive(sqlx::FromRow)] struct TopicRow { id: Uuid, title: String, } /// Kilde-node fra PG. #[derive(sqlx::FromRow)] struct SourceNode { title: Option, content: Option, created_by: Option, } /// LLM-respons: foreslåtte topics og mentions. #[derive(Deserialize, Debug)] struct AiSuggestion { /// Emnene innholdet handler om (nye eller eksisterende). #[serde(default)] topics: Vec, /// Entiteter nevnt i innholdet (personer, organisasjoner, steder). #[serde(default)] mentions: Vec, } #[derive(Deserialize, Debug)] struct MentionSuggestion { /// Navn på entiteten. name: String, /// Type: person, organisasjon, sted, konsept. #[serde(default = "default_entity_type")] entity_type: String, } fn default_entity_type() -> String { "person".to_string() } /// OpenAI-kompatibel chat completion request. #[derive(Serialize)] struct ChatRequest { model: String, messages: Vec, temperature: f32, response_format: ResponseFormat, } #[derive(Serialize)] struct ResponseFormat { r#type: String, } #[derive(Serialize)] struct ChatMessage { role: String, content: String, } /// OpenAI-kompatibel chat completion response. #[derive(Deserialize)] struct ChatResponse { choices: Vec, } #[derive(Deserialize)] struct Choice { message: MessageContent, } #[derive(Deserialize)] struct MessageContent { content: Option, } const SYSTEM_PROMPT: &str = r#"Du er en innholdsanalysator for en norsk redaksjonsplattform. Analyser teksten og ekstraher: 1. **topics**: Emner/temaer teksten handler om. Bruk korte, presise norske termer (f.eks. "skolepolitikk", "klimaendringer", "statsbudsjettet"). Maks 5 topics. 2. **mentions**: Navngitte entiteter (personer, organisasjoner, steder) som er eksplisitt nevnt. Inkluder entity_type ("person", "organisasjon", "sted", "konsept"). Returner KUN et JSON-objekt med denne strukturen: { "topics": ["emne1", "emne2"], "mentions": [{"name": "Navn", "entity_type": "person"}] } Regler: - Returner tom liste hvis teksten ikke har meningsfullt innhold (hilsener, korte svar, etc.) - Bruk eksisterende topics fra listen nedenfor der det passer, i stedet for å lage nye varianter - Ikke inkluder generiske termer som "samtale" eller "diskusjon" - Navngi entiteter med full, autoritativ form (f.eks. "Jonas Gahr Støre", ikke "Støre")"#; /// Håndterer suggest_edges-jobb. pub async fn handle_suggest_edges( job: &JobRow, db: &PgPool, stdb: &StdbClient, ) -> Result { let node_id: Uuid = job .payload .get("node_id") .and_then(|v| v.as_str()) .and_then(|s| s.parse().ok()) .ok_or("Mangler gyldig node_id i payload")?; tracing::info!(node_id = %node_id, "Starter AI edge-forslag"); // 1. Hent kildenode let source = sqlx::query_as::<_, SourceNode>( "SELECT title, content, created_by FROM nodes WHERE id = $1", ) .bind(node_id) .fetch_optional(db) .await .map_err(|e| format!("PG-feil ved henting av node: {e}"))? .ok_or_else(|| format!("Node {node_id} finnes ikke"))?; let title = source.title.unwrap_or_default(); let content = source.content.unwrap_or_default(); // Ikke analyser tomme noder eller veldig korte meldinger let text = format!("{title}\n{content}").trim().to_string(); if text.len() < 20 { tracing::info!(node_id = %node_id, len = text.len(), "For kort innhold, hopper over AI-analyse"); return Ok(serde_json::json!({ "status": "skipped", "reason": "content_too_short" })); } // 2. Hent eksisterende topic-noder for kontekst let existing_topics = sqlx::query_as::<_, TopicRow>( "SELECT id, title FROM nodes WHERE node_kind = 'topic' ORDER BY created_at DESC LIMIT 100", ) .fetch_all(db) .await .map_err(|e| format!("PG-feil ved henting av topics: {e}"))?; let topic_list: Vec<&str> = existing_topics.iter().map(|t| t.title.as_str()).collect(); // 3. Bygg prompt og kall LiteLLM let user_content = if topic_list.is_empty() { format!("Analyser følgende tekst:\n\n{text}") } else { format!( "Eksisterende topics: {}\n\nAnalyser følgende tekst:\n\n{text}", topic_list.join(", ") ) }; let suggestion = call_llm(&user_content).await?; tracing::info!( node_id = %node_id, topics = ?suggestion.topics, mentions = suggestion.mentions.len(), "LLM-forslag mottatt" ); // 4. Opprett topic-noder og mentions-edges let created_by = source.created_by.unwrap_or(node_id); let mut created_topics = 0u32; let mut created_edges = 0u32; // Prosesser topics for topic_name in &suggestion.topics { let topic_name = topic_name.trim(); if topic_name.is_empty() { continue; } // Finn eksisterende topic med case-insensitivt match let existing = existing_topics .iter() .find(|t| t.title.to_lowercase() == topic_name.to_lowercase()); let topic_id = if let Some(t) = existing { t.id } else { // Opprett ny topic-node let new_id = Uuid::now_v7(); create_topic_node(db, stdb, new_id, topic_name, created_by).await?; created_topics += 1; new_id }; // Opprett mentions-edge: innholdsnode → topic if create_mentions_edge(db, stdb, node_id, topic_id, created_by).await? { created_edges += 1; } } // Prosesser mentions (entiteter) for mention in &suggestion.mentions { let name = mention.name.trim(); if name.is_empty() { continue; } // Søk etter eksisterende entitet med samme tittel let existing_entity = sqlx::query_scalar::<_, Uuid>( "SELECT id FROM nodes WHERE node_kind = 'topic' AND LOWER(title) = LOWER($1) LIMIT 1", ) .bind(name) .fetch_optional(db) .await .map_err(|e| format!("PG-feil ved entitet-søk: {e}"))?; let entity_id = if let Some(id) = existing_entity { id } else { // Opprett ny entitet som topic-node med entity_type i metadata let new_id = Uuid::now_v7(); create_entity_node(db, stdb, new_id, name, &mention.entity_type, created_by).await?; created_topics += 1; new_id }; // Opprett mentions-edge: innholdsnode → entitet if create_mentions_edge(db, stdb, node_id, entity_id, created_by).await? { created_edges += 1; } } let result = serde_json::json!({ "status": "completed", "topics_created": created_topics, "edges_created": created_edges, "suggestions": { "topics": suggestion.topics, "mentions": suggestion.mentions.iter().map(|m| &m.name).collect::>() } }); tracing::info!( node_id = %node_id, topics_created = created_topics, edges_created = created_edges, "AI edge-forslag fullført" ); Ok(result) } /// Kall LiteLLM (OpenAI-kompatibelt API) for å analysere innhold. async fn call_llm(user_content: &str) -> Result { let gateway_url = std::env::var("AI_GATEWAY_URL") .unwrap_or_else(|_| "http://localhost:4000".to_string()); let api_key = std::env::var("LITELLM_MASTER_KEY") .unwrap_or_default(); // Bruk sidelinja/rutine (billig modell) for edge-forslag let model = std::env::var("AI_EDGES_MODEL") .unwrap_or_else(|_| "sidelinja/rutine".to_string()); let request = ChatRequest { model, messages: vec![ ChatMessage { role: "system".to_string(), content: SYSTEM_PROMPT.to_string(), }, ChatMessage { role: "user".to_string(), content: user_content.to_string(), }, ], temperature: 0.2, response_format: ResponseFormat { r#type: "json_object".to_string(), }, }; let client = reqwest::Client::new(); let url = format!("{gateway_url}/v1/chat/completions"); let resp = client .post(&url) .header("Authorization", format!("Bearer {api_key}")) .header("Content-Type", "application/json") .json(&request) .timeout(std::time::Duration::from_secs(30)) .send() .await .map_err(|e| format!("LiteLLM-kall feilet: {e}"))?; if !resp.status().is_success() { let status = resp.status(); let body = resp.text().await.unwrap_or_default(); return Err(format!("LiteLLM returnerte {status}: {body}")); } let chat_resp: ChatResponse = resp .json() .await .map_err(|e| format!("Kunne ikke parse LiteLLM-respons: {e}"))?; let content = chat_resp .choices .first() .and_then(|c| c.message.content.as_deref()) .ok_or("LiteLLM returnerte ingen content")?; // Parse JSON fra LLM-output let suggestion: AiSuggestion = serde_json::from_str(content) .map_err(|e| format!("Kunne ikke parse LLM JSON: {e}. Rå output: {content}"))?; Ok(suggestion) } /// Opprett en topic-node i PG og STDB. async fn create_topic_node( db: &PgPool, stdb: &StdbClient, id: Uuid, title: &str, created_by: Uuid, ) -> Result<(), String> { let metadata = serde_json::json!({"ai_generated": true}); // STDB først stdb.create_node( &id.to_string(), "topic", title, "", "discoverable", &metadata.to_string(), &created_by.to_string(), ) .await .map_err(|e| format!("STDB create_node (topic) feilet: {e}"))?; // PG sqlx::query( r#" INSERT INTO nodes (id, node_kind, title, content, visibility, metadata, created_by) VALUES ($1, 'topic', $2, '', 'discoverable', $3, $4) ON CONFLICT (id) DO NOTHING "#, ) .bind(id) .bind(title) .bind(&metadata) .bind(created_by) .execute(db) .await .map_err(|e| format!("PG insert topic feilet: {e}"))?; tracing::info!(topic_id = %id, title = %title, "Ny topic-node opprettet (AI)"); Ok(()) } /// Opprett en entitet-node (person, org, sted) i PG og STDB. async fn create_entity_node( db: &PgPool, stdb: &StdbClient, id: Uuid, name: &str, entity_type: &str, created_by: Uuid, ) -> Result<(), String> { let metadata = serde_json::json!({ "ai_generated": true, "entity_type": entity_type }); stdb.create_node( &id.to_string(), "topic", name, "", "discoverable", &metadata.to_string(), &created_by.to_string(), ) .await .map_err(|e| format!("STDB create_node (entity) feilet: {e}"))?; sqlx::query( r#" INSERT INTO nodes (id, node_kind, title, content, visibility, metadata, created_by) VALUES ($1, 'topic', $2, '', 'discoverable', $3, $4) ON CONFLICT (id) DO NOTHING "#, ) .bind(id) .bind(name) .bind(&metadata) .bind(created_by) .execute(db) .await .map_err(|e| format!("PG insert entity feilet: {e}"))?; tracing::info!(entity_id = %id, name = %name, entity_type = %entity_type, "Ny entitet-node opprettet (AI)"); Ok(()) } /// Opprett en mentions-edge fra innholdsnode til target. /// Returnerer true hvis ny edge ble opprettet, false hvis den allerede eksisterte. async fn create_mentions_edge( db: &PgPool, stdb: &StdbClient, source_id: Uuid, target_id: Uuid, created_by: Uuid, ) -> Result { // Sjekk om edge allerede finnes let exists = sqlx::query_scalar::<_, bool>( "SELECT EXISTS(SELECT 1 FROM edges WHERE source_id = $1 AND target_id = $2 AND edge_type = 'mentions')", ) .bind(source_id) .bind(target_id) .fetch_one(db) .await .map_err(|e| format!("PG-feil ved edge-sjekk: {e}"))?; if exists { return Ok(false); } let edge_id = Uuid::now_v7(); let metadata = serde_json::json!({"origin": "ai"}); // STDB først stdb.create_edge( &edge_id.to_string(), &source_id.to_string(), &target_id.to_string(), "mentions", &metadata.to_string(), false, &created_by.to_string(), ) .await .map_err(|e| format!("STDB create_edge (mentions) feilet: {e}"))?; // PG sqlx::query( r#" INSERT INTO edges (id, source_id, target_id, edge_type, metadata, system, created_by) VALUES ($1, $2, $3, 'mentions', $4, false, $5) ON CONFLICT (source_id, target_id, edge_type) DO NOTHING "#, ) .bind(edge_id) .bind(source_id) .bind(target_id) .bind(&metadata) .bind(created_by) .execute(db) .await .map_err(|e| format!("PG insert mentions-edge feilet: {e}"))?; tracing::info!( edge_id = %edge_id, source = %source_id, target = %target_id, "Mentions-edge opprettet (AI)" ); Ok(true) }