// Agent-dispatcher — delegerer prosessering til synops-respond CLI. // // Maskinrommet beholder: kill switch, rate limiting, loop-prevensjon, // og STDB-skriving (sanntidsvisning). Alt annet (kontekst-henting, // prompt-bygging, claude-kall, PG-skriving) gjøres av synops-respond. // // Jobbtype: "agent_respond" // Payload: { "communication_id", "message_id", "agent_node_id", "sender_node_id" } // // Ref: docs/retninger/unix_filosofi.md, docs/infra/claude_agent.md use std::process::Stdio; use sqlx::PgPool; use uuid::Uuid; use crate::jobs::JobRow; use crate::stdb::StdbClient; #[derive(Debug)] struct AgentConfig { max_consecutive_agent_messages: i64, rate_limit_per_hour: i64, } /// Synops-respond binary path. fn respond_bin() -> String { std::env::var("SYNOPS_RESPOND_BIN") .unwrap_or_else(|_| "synops-respond".to_string()) } pub async fn handle_agent_respond( job: &JobRow, db: &PgPool, stdb: &StdbClient, ) -> Result { let communication_id: Uuid = job.payload["communication_id"] .as_str().and_then(|s| s.parse().ok()) .ok_or("Mangler communication_id")?; let message_id: Uuid = job.payload["message_id"] .as_str().and_then(|s| s.parse().ok()) .ok_or("Mangler message_id")?; let agent_node_id: Uuid = job.payload["agent_node_id"] .as_str().and_then(|s| s.parse().ok()) .ok_or("Mangler agent_node_id")?; let sender_node_id: Uuid = job.payload["sender_node_id"] .as_str().and_then(|s| s.parse().ok()) .ok_or("Mangler sender_node_id")?; tracing::info!( communication_id = %communication_id, message_id = %message_id, agent_node_id = %agent_node_id, sender_node_id = %sender_node_id, "Starter agent_respond" ); let config = load_agent_config(db, agent_node_id).await?; // --- Sikkerhetskontroller (forblir i maskinrommet) --- // Kill switch let is_active: bool = sqlx::query_scalar( "SELECT is_active FROM agent_identities WHERE node_id = $1", ).bind(agent_node_id).fetch_optional(db).await .map_err(|e| format!("DB-feil: {e}"))?.unwrap_or(false); if !is_active { return Ok(serde_json::json!({"status": "skipped", "reason": "agent_inactive"})); } // Rate limiting let count: i64 = sqlx::query_scalar::<_, Option>( "SELECT COUNT(*) FROM ai_usage_log WHERE agent_node_id = $1 AND created_at > now() - interval '1 hour'", ).bind(agent_node_id).fetch_one(db).await .map_err(|e| format!("DB-feil: {e}"))?.unwrap_or(0); if count >= config.rate_limit_per_hour { return Ok(serde_json::json!({"status": "skipped", "reason": "rate_limit"})); } // Loop-prevensjon let recent: Vec = sqlx::query_scalar( "SELECT n.created_by FROM nodes n JOIN edges e ON e.source_id = n.id WHERE e.target_id = $1 AND e.edge_type = 'belongs_to' ORDER BY n.created_at DESC LIMIT $2", ).bind(communication_id).bind(config.max_consecutive_agent_messages) .fetch_all(db).await.map_err(|e| format!("DB-feil: {e}"))?; if !recent.is_empty() && recent.iter().all(|s| *s == agent_node_id) { return Ok(serde_json::json!({"status": "skipped", "reason": "loop_prevention"})); } // --- Deleger prosessering til synops-respond --- let bin = respond_bin(); let mut cmd = tokio::process::Command::new(&bin); cmd.arg("--communication-id").arg(communication_id.to_string()) .arg("--message-id").arg(message_id.to_string()) .arg("--agent-node-id").arg(agent_node_id.to_string()) .arg("--sender-node-id").arg(sender_node_id.to_string()) .arg("--job-id").arg(job.id.to_string()) .arg("--write"); // Sett miljøvariabler CLI-verktøyet trenger let db_url = std::env::var("DATABASE_URL") .map_err(|_| "DATABASE_URL ikke satt".to_string())?; cmd.env("DATABASE_URL", &db_url); // Videresend claude-relaterte env-variabler if let Ok(v) = std::env::var("CLAUDE_PATH") { cmd.env("CLAUDE_PATH", v); } if let Ok(v) = std::env::var("PROJECT_DIR") { cmd.env("PROJECT_DIR", v); } cmd.stdout(Stdio::piped()) .stderr(Stdio::piped()); tracing::info!(bin = %bin, "Starter synops-respond"); let child = cmd.spawn() .map_err(|e| format!("Kunne ikke starte {bin}: {e}"))?; let output = child.wait_with_output().await .map_err(|e| format!("Feil ved kjøring av {bin}: {e}"))?; let stderr = String::from_utf8_lossy(&output.stderr); if !stderr.is_empty() { tracing::info!(stderr = %stderr, "synops-respond stderr"); } if !output.status.success() { let code = output.status.code().unwrap_or(-1); return Err(format!("synops-respond feilet (exit {code}): {stderr}")); } // Parse stdout som JSON let stdout = String::from_utf8_lossy(&output.stdout); let result: serde_json::Value = serde_json::from_str(&stdout) .map_err(|e| format!("Kunne ikke parse synops-respond output: {e}"))?; // --- STDB-skriving for sanntidsvisning (forblir i maskinrommet) --- if result["status"].as_str() == Some("completed") { if let Some(reply_node_id) = result["reply_node_id"].as_str() { let response_text = result["response_text"].as_str().unwrap_or(""); let agent_str = agent_node_id.to_string(); let comm_str = communication_id.to_string(); let edge_id = Uuid::now_v7().to_string(); let empty = serde_json::json!({}).to_string(); if let Err(e) = stdb.create_node( reply_node_id, "content", "", response_text, "hidden", &empty, &agent_str, ).await { tracing::warn!(error = %e, "STDB create_node feilet (PG er allerede skrevet)"); } if let Err(e) = stdb.create_edge( &edge_id, reply_node_id, &comm_str, "belongs_to", &empty, false, &agent_str, ).await { tracing::warn!(error = %e, "STDB create_edge feilet (PG er allerede skrevet)"); } } } tracing::info!( status = result["status"].as_str().unwrap_or("unknown"), reply_node_id = result["reply_node_id"].as_str().unwrap_or("n/a"), "synops-respond fullført" ); Ok(result) } async fn load_agent_config(db: &PgPool, agent_node_id: Uuid) -> Result { let c: serde_json::Value = sqlx::query_scalar("SELECT config FROM agent_identities WHERE node_id = $1") .bind(agent_node_id).fetch_optional(db).await .map_err(|e| format!("DB-feil: {e}"))?.unwrap_or(serde_json::json!({})); Ok(AgentConfig { max_consecutive_agent_messages: c["max_consecutive_agent_messages"].as_i64().unwrap_or(3), rate_limit_per_hour: c["rate_limit_per_hour"].as_i64().unwrap_or(60), }) } pub async fn find_agent_participant(db: &PgPool, communication_id: Uuid) -> Result, sqlx::Error> { sqlx::query_scalar( "SELECT n.id FROM nodes n JOIN edges e ON e.source_id = n.id JOIN agent_identities ai ON ai.node_id = n.id WHERE e.target_id = $1 AND e.edge_type IN ('owner', 'member_of') AND n.node_kind = 'agent' AND ai.is_active = true LIMIT 1", ).bind(communication_id).fetch_optional(db).await }