// synops-respond — Claude chat-svar for kommunikasjonsnoder. // // Henter meldingshistorikk og kontekst fra PG, bygger prompt, // kaller claude CLI, og returnerer svartekst. Med --write: // oppretter svar-node og edges i PG + logger ressursforbruk. // // STDB-skriving (sanntidsvisning) gjøres av maskinrommet etter // at dette verktøyet returnerer — maskinrommet eier STDB-skriving. // // Auth, ratelimit, kill switch og loop-prevensjon forblir i maskinrommet. // // Miljøvariabler: // DATABASE_URL — PostgreSQL-tilkobling (påkrevd) // CLAUDE_PATH — Sti til claude CLI (default: "claude") // PROJECT_DIR — Arbeidskatalog for claude (default: "/home/vegard/synops") // // Erstatter: prosesseringslogikken i maskinrommet/src/agent.rs // Ref: docs/retninger/unix_filosofi.md, docs/infra/claude_agent.md use clap::Parser; use std::process; use uuid::Uuid; /// Claude chat-svar for kommunikasjonsnoder. #[derive(Parser)] #[command(name = "synops-respond", about = "Generer Claude-svar i en kommunikasjonsnode")] struct Cli { /// Kommunikasjonsnode-ID der samtalen foregår #[arg(long)] communication_id: Uuid, /// Meldings-ID som trigget svaret #[arg(long)] message_id: Uuid, /// Agent-node-ID (Claude sin node) #[arg(long)] agent_node_id: Uuid, /// Avsender-node-ID (brukeren som sendte meldingen) #[arg(long)] sender_node_id: Uuid, /// Jobb-ID fra maskinrommet (for ai_usage_log) #[arg(long)] job_id: Option, /// Skriv svar-node og edges til database (uten: kun generering + stdout) #[arg(long)] write: bool, } // --- Database-rader --- #[derive(sqlx::FromRow)] struct MessageRow { #[allow(dead_code)] id: Uuid, content: Option, created_by: Uuid, #[allow(dead_code)] created_at: chrono::DateTime, } #[derive(sqlx::FromRow)] struct ParticipantRow { id: Uuid, title: Option, node_kind: String, } #[tokio::main] async fn main() { synops_common::logging::init("synops_respond"); let cli = Cli::parse(); if let Err(e) = run(cli).await { eprintln!("Feil: {e}"); process::exit(1); } } async fn run(cli: Cli) -> Result<(), String> { let db = synops_common::db::connect().await?; let communication_id = cli.communication_id; let agent_node_id = cli.agent_node_id; let sender_node_id = cli.sender_node_id; // 1. Hent agent-config for max_context_messages let config: serde_json::Value = sqlx::query_scalar( "SELECT config FROM agent_identities WHERE node_id = $1", ) .bind(agent_node_id) .fetch_optional(&db) .await .map_err(|e| format!("DB-feil: {e}"))? .unwrap_or(serde_json::json!({})); let max_context_messages = config["max_context_messages"].as_i64().unwrap_or(50); // 2. Hent meldingshistorikk let mut messages = sqlx::query_as::<_, MessageRow>( "SELECT n.id, n.content, n.created_by, n.created_at \ FROM nodes n JOIN edges e ON e.source_id = n.id \ WHERE e.target_id = $1 AND e.edge_type = 'belongs_to' AND n.node_kind = 'content' \ ORDER BY n.created_at DESC LIMIT $2", ) .bind(communication_id) .bind(max_context_messages) .fetch_all(&db) .await .map_err(|e| format!("DB-feil: {e}"))?; messages.reverse(); if messages.is_empty() { let result = serde_json::json!({ "status": "skipped", "reason": "no_messages" }); println!("{}", serde_json::to_string_pretty(&result).unwrap()); return Ok(()); } // 3. Hent kontekst: tittel, deltakere, tillatelser let comm_title: String = sqlx::query_scalar::<_, Option>( "SELECT title FROM nodes WHERE id = $1", ) .bind(communication_id) .fetch_optional(&db) .await .map_err(|e| format!("DB-feil: {e}"))? .flatten() .unwrap_or_else(|| "Samtale".to_string()); let participants = sqlx::query_as::<_, ParticipantRow>( "SELECT n.id, n.title, n.node_kind FROM nodes n \ JOIN edges e ON e.source_id = n.id \ WHERE e.target_id = $1 AND e.edge_type IN ('owner', 'member_of')", ) .bind(communication_id) .fetch_all(&db) .await .map_err(|e| format!("DB-feil: {e}"))?; let permission: String = sqlx::query_scalar( "SELECT permission FROM agent_permissions WHERE user_node_id = $1 AND agent_node_id = $2", ) .bind(sender_node_id) .bind(agent_node_id) .fetch_optional(&db) .await .map_err(|e| format!("DB-feil: {e}"))? .unwrap_or_else(|| "none".to_string()); // 4. Bygg prompt let name_map: std::collections::HashMap = participants .iter() .map(|p| (p.id, p.title.clone().unwrap_or_else(|| p.node_kind.clone()))) .collect(); let participant_names: String = participants .iter() .filter(|p| p.id != agent_node_id) .map(|p| p.title.as_deref().unwrap_or("Ukjent")) .collect::>() .join(", "); let mut conversation = String::new(); for m in &messages { let name = name_map.get(&m.created_by).map(|s| s.as_str()).unwrap_or("Ukjent"); let content = m.content.as_deref().unwrap_or(""); if m.created_by == agent_node_id { conversation.push_str(&format!("Claude: {content}\n")); } else { conversation.push_str(&format!("{name}: {content}\n")); } } let perm_desc = match permission.as_str() { "direct" => "Brukeren har 'direct'-tilgang.", "propose" => "Brukeren har 'propose'-tilgang.", _ => "", }; // Sjekk om chatten diskuterer en spec-node let spec_context: String = match sqlx::query_scalar::<_, Option>( "SELECT n.content FROM nodes n \ JOIN edges e ON e.source_id = $1 AND e.target_id = n.id \ WHERE e.edge_type = 'discusses' LIMIT 1", ) .bind(communication_id) .fetch_optional(&db) .await { Ok(Some(Some(content))) if !content.is_empty() => { let truncated = if content.len() > 4000 { &content[..4000] } else { &content }; format!( "\n--- Gjeldende spesifikasjon ---\n{truncated}\n--- Slutt spesifikasjon ---\n\n\ Du har tilgang til spesifikasjonen over. Gi konkret feedback: hva er implementert, \ hva er planlagt, hva er teknisk vanskelig. Vær ærlig om begrensninger.\n" ) } _ => String::new(), }; let prompt = format!( r#"Du er Claude, en AI-assistent integrert i Synops-plattformen. Du deltar i samtalen "{comm_title}" med {participant_names}. Svar på norsk med mindre brukeren skriver på engelsk. {perm_desc} Svar konsist. Bruk vanlig tekst uten markdown-overskrifter. Svar KUN med meldingsteksten. {spec_context} --- Samtalehistorikk --- {conversation}--- Svar ---"# ); // 5. Kall claude CLI med retry let claude_path = std::env::var("CLAUDE_PATH").unwrap_or_else(|_| "claude".to_string()); let project_dir = std::env::var("PROJECT_DIR").unwrap_or_else(|_| "/home/vegard/synops".to_string()); tracing::info!(prompt_len = prompt.len(), "Kaller claude CLI"); let response_text = call_claude(&claude_path, &project_dir, &prompt).await?; if response_text.is_empty() { return Err("Tom respons fra claude".to_string()); } tracing::info!(response_len = response_text.len(), "Fikk svar fra claude"); // 6. Skriv til database hvis --write let mut result = serde_json::json!({ "status": "completed", "response_text": response_text, "response_len": response_text.len(), }); if cli.write { let reply_node_id = write_to_db( &db, communication_id, agent_node_id, sender_node_id, cli.job_id, &response_text, ) .await?; result["reply_node_id"] = serde_json::Value::String(reply_node_id.to_string()); } // 7. Output JSON til stdout println!( "{}", serde_json::to_string_pretty(&result) .map_err(|e| format!("JSON-serialisering feilet: {e}"))? ); Ok(()) } /// Kall claude CLI med retry ved API-feil (500/529). async fn call_claude( claude_path: &str, project_dir: &str, prompt: &str, ) -> Result { let max_retries = 3u32; for attempt in 0..=max_retries { let output = tokio::process::Command::new(claude_path) .arg("-p") .arg(prompt) .arg("--output-format") .arg("json") .arg("--dangerously-skip-permissions") .env("CLAUDE_CODE_DISABLE_NONESSENTIAL_TRAFFIC", "1") .current_dir(project_dir) .output() .await .map_err(|e| format!("Kunne ikke starte claude: {e}"))?; let stderr = String::from_utf8_lossy(&output.stderr).to_string(); let stdout = String::from_utf8_lossy(&output.stdout).to_string(); let is_api_error = !output.status.success() && (stderr.contains("500") || stderr.contains("529") || stderr.contains("overloaded") || stderr.contains("Internal Server Error")); if is_api_error && attempt < max_retries { let delay = std::time::Duration::from_secs(2u64.pow(attempt + 1)); tracing::warn!( attempt = attempt + 1, delay_secs = delay.as_secs(), "Claude API-feil, prøver igjen" ); tokio::time::sleep(delay).await; continue; } if !output.status.success() { if is_api_error { tracing::error!( attempts = max_retries + 1, "Claude API utilgjengelig etter alle forsøk" ); return Ok( "Beklager, jeg er midlertidig utilgjengelig — Anthropic sitt API svarer \ ikke akkurat nå. Prøv igjen om litt." .to_string(), ); } return Err(format!( "claude feilet ({}): {}", output.status, &stderr[..stderr.len().min(500)] )); } let text = match serde_json::from_str::(&stdout) { Ok(json) => json["result"].as_str().unwrap_or("").to_string(), Err(_) => stdout.trim().to_string(), }; return Ok(text); } Err("Alle forsøk brukt opp".to_string()) } /// Opprett svar-node, belongs_to-edge, ai_usage_log og resource_usage_log i PG. async fn write_to_db( db: &sqlx::PgPool, communication_id: Uuid, agent_node_id: Uuid, sender_node_id: Uuid, job_id: Option, response_text: &str, ) -> Result { let reply_id = Uuid::now_v7(); let edge_id = Uuid::now_v7(); let metadata = serde_json::json!({}); // Svar-node sqlx::query( "INSERT INTO nodes (id, node_kind, content, visibility, metadata, created_by) \ VALUES ($1, 'content', $2, 'hidden'::visibility, $3, $4)", ) .bind(reply_id) .bind(response_text) .bind(&metadata) .bind(agent_node_id) .execute(db) .await .map_err(|e| format!("PG insert svar-node feilet: {e}"))?; tracing::info!(reply_node_id = %reply_id, "Svar-node opprettet"); // belongs_to-edge: svar → kommunikasjonsnode sqlx::query( "INSERT INTO edges (id, source_id, target_id, edge_type, metadata, system, created_by) \ VALUES ($1, $2, $3, 'belongs_to', '{}', false, $4)", ) .bind(edge_id) .bind(reply_id) .bind(communication_id) .bind(agent_node_id) .execute(db) .await .map_err(|e| format!("PG insert belongs_to-edge feilet: {e}"))?; // ai_usage_log sqlx::query( "INSERT INTO ai_usage_log \ (agent_node_id, communication_id, job_id, model_alias, model_actual, \ prompt_tokens, completion_tokens, total_tokens, job_type) \ VALUES ($1, $2, $3, 'claude-code', 'claude-code-cli', 0, 0, 0, 'agent_respond')", ) .bind(agent_node_id) .bind(communication_id) .bind(job_id) .execute(db) .await .map_err(|e| format!("PG insert ai_usage_log feilet: {e}"))?; // resource_usage_log let collection_id: Option = sqlx::query_scalar( "SELECT e.target_id FROM edges e \ JOIN nodes n ON n.id = e.target_id \ WHERE e.source_id = $1 AND e.edge_type = 'belongs_to' AND n.node_kind = 'collection' \ LIMIT 1", ) .bind(communication_id) .fetch_optional(db) .await .ok() .flatten(); if let Err(e) = sqlx::query( "INSERT INTO resource_usage_log \ (target_node_id, triggered_by, collection_id, resource_type, detail) \ VALUES ($1, $2, $3, $4, $5)", ) .bind(communication_id) .bind(Some(sender_node_id)) .bind(collection_id) .bind("ai") .bind(serde_json::json!({ "model_level": "deep", "model_id": "claude-code-cli", "tokens_in": 0, "tokens_out": 0, "job_type": "agent_respond" })) .execute(db) .await { tracing::warn!(error = %e, "Kunne ikke logge AI-ressursforbruk"); } Ok(reply_id) }