Når en bruker limer inn en URL i chatten, gjenkjenner synops-respond URL-en automatisk, kaller synops-clip --write for å hente, parse og oppsummere artikkelen, og inkluderer resultatet i prompten slik at Claude kan presentere oppsummeringen naturlig. Ved betalingsmur: Claude informerer brukeren og ber om innlimt innhold. Maks 3 URL-er per melding, 60s timeout per klipp. Endringer: - synops-respond: URL-deteksjon (regex), synops-clip-kall, prompt-kontekst - maskinrommet/agent.rs: videresend env-variabler for synops-clip - maskinrommet-env.sh: SYNOPS_CLIP_SCRIPTS env-variabel - docs/infra/claude_agent.md: dokumentert URL-klipping-flyten
170 lines
6.5 KiB
Rust
170 lines
6.5 KiB
Rust
// Agent-dispatcher — delegerer prosessering til synops-respond CLI.
|
|
//
|
|
// Maskinrommet beholder: kill switch, rate limiting, loop-prevensjon.
|
|
// Alt annet (kontekst-henting, prompt-bygging, claude-kall, PG-skriving)
|
|
// gjøres av synops-respond. PG NOTIFY-triggere sender sanntidsoppdateringer.
|
|
//
|
|
// Jobbtype: "agent_respond"
|
|
// Payload: { "communication_id", "message_id", "agent_node_id", "sender_node_id" }
|
|
//
|
|
// Ref: docs/retninger/unix_filosofi.md, docs/infra/claude_agent.md
|
|
|
|
use std::process::Stdio;
|
|
|
|
use sqlx::PgPool;
|
|
use uuid::Uuid;
|
|
|
|
use crate::jobs::JobRow;
|
|
|
|
#[derive(Debug)]
|
|
struct AgentConfig {
|
|
max_consecutive_agent_messages: i64,
|
|
rate_limit_per_hour: i64,
|
|
}
|
|
|
|
/// Synops-respond binary path.
|
|
fn respond_bin() -> String {
|
|
std::env::var("SYNOPS_RESPOND_BIN")
|
|
.unwrap_or_else(|_| "synops-respond".to_string())
|
|
}
|
|
|
|
pub async fn handle_agent_respond(
|
|
job: &JobRow,
|
|
db: &PgPool,
|
|
) -> Result<serde_json::Value, String> {
|
|
let communication_id: Uuid = job.payload["communication_id"]
|
|
.as_str().and_then(|s| s.parse().ok())
|
|
.ok_or("Mangler communication_id")?;
|
|
let message_id: Uuid = job.payload["message_id"]
|
|
.as_str().and_then(|s| s.parse().ok())
|
|
.ok_or("Mangler message_id")?;
|
|
let agent_node_id: Uuid = job.payload["agent_node_id"]
|
|
.as_str().and_then(|s| s.parse().ok())
|
|
.ok_or("Mangler agent_node_id")?;
|
|
let sender_node_id: Uuid = job.payload["sender_node_id"]
|
|
.as_str().and_then(|s| s.parse().ok())
|
|
.ok_or("Mangler sender_node_id")?;
|
|
|
|
tracing::info!(
|
|
communication_id = %communication_id,
|
|
message_id = %message_id,
|
|
agent_node_id = %agent_node_id,
|
|
sender_node_id = %sender_node_id,
|
|
"Starter agent_respond"
|
|
);
|
|
|
|
let config = load_agent_config(db, agent_node_id).await?;
|
|
|
|
// --- Sikkerhetskontroller (forblir i maskinrommet) ---
|
|
|
|
// Kill switch
|
|
let is_active: bool = sqlx::query_scalar(
|
|
"SELECT is_active FROM agent_identities WHERE node_id = $1",
|
|
).bind(agent_node_id).fetch_optional(db).await
|
|
.map_err(|e| format!("DB-feil: {e}"))?.unwrap_or(false);
|
|
if !is_active {
|
|
return Ok(serde_json::json!({"status": "skipped", "reason": "agent_inactive"}));
|
|
}
|
|
|
|
// Rate limiting
|
|
let count: i64 = sqlx::query_scalar::<_, Option<i64>>(
|
|
"SELECT COUNT(*) FROM ai_usage_log WHERE agent_node_id = $1 AND created_at > now() - interval '1 hour'",
|
|
).bind(agent_node_id).fetch_one(db).await
|
|
.map_err(|e| format!("DB-feil: {e}"))?.unwrap_or(0);
|
|
if count >= config.rate_limit_per_hour {
|
|
return Ok(serde_json::json!({"status": "skipped", "reason": "rate_limit"}));
|
|
}
|
|
|
|
// Loop-prevensjon
|
|
let recent: Vec<Uuid> = sqlx::query_scalar(
|
|
"SELECT n.created_by FROM nodes n JOIN edges e ON e.source_id = n.id WHERE e.target_id = $1 AND e.edge_type = 'belongs_to' ORDER BY n.created_at DESC LIMIT $2",
|
|
).bind(communication_id).bind(config.max_consecutive_agent_messages)
|
|
.fetch_all(db).await.map_err(|e| format!("DB-feil: {e}"))?;
|
|
if !recent.is_empty() && recent.iter().all(|s| *s == agent_node_id) {
|
|
return Ok(serde_json::json!({"status": "skipped", "reason": "loop_prevention"}));
|
|
}
|
|
|
|
// --- Deleger prosessering til synops-respond ---
|
|
|
|
let bin = respond_bin();
|
|
let mut cmd = tokio::process::Command::new(&bin);
|
|
|
|
cmd.arg("--communication-id").arg(communication_id.to_string())
|
|
.arg("--message-id").arg(message_id.to_string())
|
|
.arg("--agent-node-id").arg(agent_node_id.to_string())
|
|
.arg("--sender-node-id").arg(sender_node_id.to_string())
|
|
.arg("--job-id").arg(job.id.to_string())
|
|
.arg("--write");
|
|
|
|
// Sett miljøvariabler CLI-verktøyet trenger
|
|
let db_url = std::env::var("DATABASE_URL")
|
|
.map_err(|_| "DATABASE_URL ikke satt".to_string())?;
|
|
cmd.env("DATABASE_URL", &db_url);
|
|
|
|
// Videresend claude-relaterte env-variabler
|
|
if let Ok(v) = std::env::var("CLAUDE_PATH") {
|
|
cmd.env("CLAUDE_PATH", v);
|
|
}
|
|
if let Ok(v) = std::env::var("PROJECT_DIR") {
|
|
cmd.env("PROJECT_DIR", v);
|
|
}
|
|
|
|
// Videresend env-variabler for synops-clip (URL-klipping i chat)
|
|
for key in &["AI_GATEWAY_URL", "LITELLM_MASTER_KEY", "SYNOPS_CLIP_SCRIPTS"] {
|
|
if let Ok(v) = std::env::var(key) {
|
|
cmd.env(key, v);
|
|
}
|
|
}
|
|
|
|
cmd.stdout(Stdio::piped())
|
|
.stderr(Stdio::piped());
|
|
|
|
tracing::info!(bin = %bin, "Starter synops-respond");
|
|
|
|
let child = cmd.spawn()
|
|
.map_err(|e| format!("Kunne ikke starte {bin}: {e}"))?;
|
|
let output = child.wait_with_output().await
|
|
.map_err(|e| format!("Feil ved kjøring av {bin}: {e}"))?;
|
|
|
|
let stderr = String::from_utf8_lossy(&output.stderr);
|
|
if !stderr.is_empty() {
|
|
tracing::info!(stderr = %stderr, "synops-respond stderr");
|
|
}
|
|
|
|
if !output.status.success() {
|
|
let code = output.status.code().unwrap_or(-1);
|
|
return Err(format!("synops-respond feilet (exit {code}): {stderr}"));
|
|
}
|
|
|
|
// Parse stdout som JSON
|
|
let stdout = String::from_utf8_lossy(&output.stdout);
|
|
let result: serde_json::Value = serde_json::from_str(&stdout)
|
|
.map_err(|e| format!("Kunne ikke parse synops-respond output: {e}"))?;
|
|
|
|
// PG-skriving gjøres av synops-respond med --write.
|
|
// PG NOTIFY-triggere sender sanntidsoppdateringer til WebSocket-klienter.
|
|
|
|
tracing::info!(
|
|
status = result["status"].as_str().unwrap_or("unknown"),
|
|
reply_node_id = result["reply_node_id"].as_str().unwrap_or("n/a"),
|
|
"synops-respond fullført"
|
|
);
|
|
|
|
Ok(result)
|
|
}
|
|
|
|
async fn load_agent_config(db: &PgPool, agent_node_id: Uuid) -> Result<AgentConfig, String> {
|
|
let c: serde_json::Value = sqlx::query_scalar("SELECT config FROM agent_identities WHERE node_id = $1")
|
|
.bind(agent_node_id).fetch_optional(db).await
|
|
.map_err(|e| format!("DB-feil: {e}"))?.unwrap_or(serde_json::json!({}));
|
|
Ok(AgentConfig {
|
|
max_consecutive_agent_messages: c["max_consecutive_agent_messages"].as_i64().unwrap_or(3),
|
|
rate_limit_per_hour: c["rate_limit_per_hour"].as_i64().unwrap_or(60),
|
|
})
|
|
}
|
|
|
|
pub async fn find_agent_participant(db: &PgPool, communication_id: Uuid) -> Result<Option<Uuid>, sqlx::Error> {
|
|
sqlx::query_scalar(
|
|
"SELECT n.id FROM nodes n JOIN edges e ON e.source_id = n.id JOIN agent_identities ai ON ai.node_id = n.id WHERE e.target_id = $1 AND e.edge_type IN ('owner', 'member_of') AND n.node_kind = 'agent' AND ai.is_active = true LIMIT 1",
|
|
).bind(communication_id).fetch_optional(db).await
|
|
}
|