synops/maskinrommet/src/agent.rs
vegard f22465d72b @bot URL-klipping i chat: synops-clip-integrasjon (oppgave 25.3)
Når en bruker limer inn en URL i chatten, gjenkjenner synops-respond
URL-en automatisk, kaller synops-clip --write for å hente, parse og
oppsummere artikkelen, og inkluderer resultatet i prompten slik at
Claude kan presentere oppsummeringen naturlig.

Ved betalingsmur: Claude informerer brukeren og ber om innlimt innhold.
Maks 3 URL-er per melding, 60s timeout per klipp.

Endringer:
- synops-respond: URL-deteksjon (regex), synops-clip-kall, prompt-kontekst
- maskinrommet/agent.rs: videresend env-variabler for synops-clip
- maskinrommet-env.sh: SYNOPS_CLIP_SCRIPTS env-variabel
- docs/infra/claude_agent.md: dokumentert URL-klipping-flyten
2026-03-18 18:41:33 +00:00

170 lines
6.5 KiB
Rust

// Agent-dispatcher — delegerer prosessering til synops-respond CLI.
//
// Maskinrommet beholder: kill switch, rate limiting, loop-prevensjon.
// Alt annet (kontekst-henting, prompt-bygging, claude-kall, PG-skriving)
// gjøres av synops-respond. PG NOTIFY-triggere sender sanntidsoppdateringer.
//
// Jobbtype: "agent_respond"
// Payload: { "communication_id", "message_id", "agent_node_id", "sender_node_id" }
//
// Ref: docs/retninger/unix_filosofi.md, docs/infra/claude_agent.md
use std::process::Stdio;
use sqlx::PgPool;
use uuid::Uuid;
use crate::jobs::JobRow;
#[derive(Debug)]
struct AgentConfig {
max_consecutive_agent_messages: i64,
rate_limit_per_hour: i64,
}
/// Synops-respond binary path.
fn respond_bin() -> String {
std::env::var("SYNOPS_RESPOND_BIN")
.unwrap_or_else(|_| "synops-respond".to_string())
}
pub async fn handle_agent_respond(
job: &JobRow,
db: &PgPool,
) -> Result<serde_json::Value, String> {
let communication_id: Uuid = job.payload["communication_id"]
.as_str().and_then(|s| s.parse().ok())
.ok_or("Mangler communication_id")?;
let message_id: Uuid = job.payload["message_id"]
.as_str().and_then(|s| s.parse().ok())
.ok_or("Mangler message_id")?;
let agent_node_id: Uuid = job.payload["agent_node_id"]
.as_str().and_then(|s| s.parse().ok())
.ok_or("Mangler agent_node_id")?;
let sender_node_id: Uuid = job.payload["sender_node_id"]
.as_str().and_then(|s| s.parse().ok())
.ok_or("Mangler sender_node_id")?;
tracing::info!(
communication_id = %communication_id,
message_id = %message_id,
agent_node_id = %agent_node_id,
sender_node_id = %sender_node_id,
"Starter agent_respond"
);
let config = load_agent_config(db, agent_node_id).await?;
// --- Sikkerhetskontroller (forblir i maskinrommet) ---
// Kill switch
let is_active: bool = sqlx::query_scalar(
"SELECT is_active FROM agent_identities WHERE node_id = $1",
).bind(agent_node_id).fetch_optional(db).await
.map_err(|e| format!("DB-feil: {e}"))?.unwrap_or(false);
if !is_active {
return Ok(serde_json::json!({"status": "skipped", "reason": "agent_inactive"}));
}
// Rate limiting
let count: i64 = sqlx::query_scalar::<_, Option<i64>>(
"SELECT COUNT(*) FROM ai_usage_log WHERE agent_node_id = $1 AND created_at > now() - interval '1 hour'",
).bind(agent_node_id).fetch_one(db).await
.map_err(|e| format!("DB-feil: {e}"))?.unwrap_or(0);
if count >= config.rate_limit_per_hour {
return Ok(serde_json::json!({"status": "skipped", "reason": "rate_limit"}));
}
// Loop-prevensjon
let recent: Vec<Uuid> = sqlx::query_scalar(
"SELECT n.created_by FROM nodes n JOIN edges e ON e.source_id = n.id WHERE e.target_id = $1 AND e.edge_type = 'belongs_to' ORDER BY n.created_at DESC LIMIT $2",
).bind(communication_id).bind(config.max_consecutive_agent_messages)
.fetch_all(db).await.map_err(|e| format!("DB-feil: {e}"))?;
if !recent.is_empty() && recent.iter().all(|s| *s == agent_node_id) {
return Ok(serde_json::json!({"status": "skipped", "reason": "loop_prevention"}));
}
// --- Deleger prosessering til synops-respond ---
let bin = respond_bin();
let mut cmd = tokio::process::Command::new(&bin);
cmd.arg("--communication-id").arg(communication_id.to_string())
.arg("--message-id").arg(message_id.to_string())
.arg("--agent-node-id").arg(agent_node_id.to_string())
.arg("--sender-node-id").arg(sender_node_id.to_string())
.arg("--job-id").arg(job.id.to_string())
.arg("--write");
// Sett miljøvariabler CLI-verktøyet trenger
let db_url = std::env::var("DATABASE_URL")
.map_err(|_| "DATABASE_URL ikke satt".to_string())?;
cmd.env("DATABASE_URL", &db_url);
// Videresend claude-relaterte env-variabler
if let Ok(v) = std::env::var("CLAUDE_PATH") {
cmd.env("CLAUDE_PATH", v);
}
if let Ok(v) = std::env::var("PROJECT_DIR") {
cmd.env("PROJECT_DIR", v);
}
// Videresend env-variabler for synops-clip (URL-klipping i chat)
for key in &["AI_GATEWAY_URL", "LITELLM_MASTER_KEY", "SYNOPS_CLIP_SCRIPTS"] {
if let Ok(v) = std::env::var(key) {
cmd.env(key, v);
}
}
cmd.stdout(Stdio::piped())
.stderr(Stdio::piped());
tracing::info!(bin = %bin, "Starter synops-respond");
let child = cmd.spawn()
.map_err(|e| format!("Kunne ikke starte {bin}: {e}"))?;
let output = child.wait_with_output().await
.map_err(|e| format!("Feil ved kjøring av {bin}: {e}"))?;
let stderr = String::from_utf8_lossy(&output.stderr);
if !stderr.is_empty() {
tracing::info!(stderr = %stderr, "synops-respond stderr");
}
if !output.status.success() {
let code = output.status.code().unwrap_or(-1);
return Err(format!("synops-respond feilet (exit {code}): {stderr}"));
}
// Parse stdout som JSON
let stdout = String::from_utf8_lossy(&output.stdout);
let result: serde_json::Value = serde_json::from_str(&stdout)
.map_err(|e| format!("Kunne ikke parse synops-respond output: {e}"))?;
// PG-skriving gjøres av synops-respond med --write.
// PG NOTIFY-triggere sender sanntidsoppdateringer til WebSocket-klienter.
tracing::info!(
status = result["status"].as_str().unwrap_or("unknown"),
reply_node_id = result["reply_node_id"].as_str().unwrap_or("n/a"),
"synops-respond fullført"
);
Ok(result)
}
async fn load_agent_config(db: &PgPool, agent_node_id: Uuid) -> Result<AgentConfig, String> {
let c: serde_json::Value = sqlx::query_scalar("SELECT config FROM agent_identities WHERE node_id = $1")
.bind(agent_node_id).fetch_optional(db).await
.map_err(|e| format!("DB-feil: {e}"))?.unwrap_or(serde_json::json!({}));
Ok(AgentConfig {
max_consecutive_agent_messages: c["max_consecutive_agent_messages"].as_i64().unwrap_or(3),
rate_limit_per_hour: c["rate_limit_per_hour"].as_i64().unwrap_or(60),
})
}
pub async fn find_agent_participant(db: &PgPool, communication_id: Uuid) -> Result<Option<Uuid>, sqlx::Error> {
sqlx::query_scalar(
"SELECT n.id FROM nodes n JOIN edges e ON e.source_id = n.id JOIN agent_identities ai ON ai.node_id = n.id WHERE e.target_id = $1 AND e.edge_type IN ('owner', 'member_of') AND n.node_kind = 'agent' AND ai.is_active = true LIMIT 1",
).bind(communication_id).fetch_optional(db).await
}