282 lines
12 KiB
Rust
282 lines
12 KiB
Rust
// Agent — Claude som chat-deltaker.
|
|
//
|
|
// Håndterer `agent_respond`-jobber fra jobbkøen.
|
|
// Kaller `claude -p` direkte som subprocess — maskinrommet kjører
|
|
// på hosten (ikke i Docker) og har tilgang til claude CLI.
|
|
|
|
use sqlx::PgPool;
|
|
use uuid::Uuid;
|
|
|
|
use crate::jobs::JobRow;
|
|
use crate::stdb::StdbClient;
|
|
|
|
#[derive(Debug)]
|
|
struct AgentConfig {
|
|
max_context_messages: i64,
|
|
max_consecutive_agent_messages: i64,
|
|
rate_limit_per_hour: i64,
|
|
}
|
|
|
|
#[derive(sqlx::FromRow, Debug)]
|
|
struct MessageRow {
|
|
#[allow(dead_code)]
|
|
id: Uuid,
|
|
content: Option<String>,
|
|
created_by: Uuid,
|
|
#[allow(dead_code)]
|
|
created_at: chrono::DateTime<chrono::Utc>,
|
|
}
|
|
|
|
#[derive(sqlx::FromRow, Debug)]
|
|
struct ParticipantRow {
|
|
id: Uuid,
|
|
title: Option<String>,
|
|
node_kind: String,
|
|
}
|
|
|
|
pub async fn handle_agent_respond(
|
|
job: &JobRow,
|
|
db: &PgPool,
|
|
stdb: &StdbClient,
|
|
) -> Result<serde_json::Value, String> {
|
|
let communication_id: Uuid = job.payload["communication_id"]
|
|
.as_str().and_then(|s| s.parse().ok())
|
|
.ok_or("Mangler communication_id")?;
|
|
let message_id: Uuid = job.payload["message_id"]
|
|
.as_str().and_then(|s| s.parse().ok())
|
|
.ok_or("Mangler message_id")?;
|
|
let agent_node_id: Uuid = job.payload["agent_node_id"]
|
|
.as_str().and_then(|s| s.parse().ok())
|
|
.ok_or("Mangler agent_node_id")?;
|
|
let sender_node_id: Uuid = job.payload["sender_node_id"]
|
|
.as_str().and_then(|s| s.parse().ok())
|
|
.ok_or("Mangler sender_node_id")?;
|
|
|
|
tracing::info!(
|
|
communication_id = %communication_id,
|
|
message_id = %message_id,
|
|
agent_node_id = %agent_node_id,
|
|
sender_node_id = %sender_node_id,
|
|
"Starter agent_respond"
|
|
);
|
|
|
|
let config = load_agent_config(db, agent_node_id).await?;
|
|
|
|
// Kill switch
|
|
let is_active: bool = sqlx::query_scalar(
|
|
"SELECT is_active FROM agent_identities WHERE node_id = $1",
|
|
).bind(agent_node_id).fetch_optional(db).await
|
|
.map_err(|e| format!("DB-feil: {e}"))?.unwrap_or(false);
|
|
if !is_active {
|
|
return Ok(serde_json::json!({"status": "skipped", "reason": "agent_inactive"}));
|
|
}
|
|
|
|
// Rate limiting
|
|
let count: i64 = sqlx::query_scalar::<_, Option<i64>>(
|
|
"SELECT COUNT(*) FROM ai_usage_log WHERE agent_node_id = $1 AND created_at > now() - interval '1 hour'",
|
|
).bind(agent_node_id).fetch_one(db).await
|
|
.map_err(|e| format!("DB-feil: {e}"))?.unwrap_or(0);
|
|
if count >= config.rate_limit_per_hour {
|
|
return Ok(serde_json::json!({"status": "skipped", "reason": "rate_limit"}));
|
|
}
|
|
|
|
// Loop-prevensjon
|
|
let recent: Vec<Uuid> = sqlx::query_scalar(
|
|
"SELECT n.created_by FROM nodes n JOIN edges e ON e.source_id = n.id WHERE e.target_id = $1 AND e.edge_type = 'belongs_to' ORDER BY n.created_at DESC LIMIT $2",
|
|
).bind(communication_id).bind(config.max_consecutive_agent_messages)
|
|
.fetch_all(db).await.map_err(|e| format!("DB-feil: {e}"))?;
|
|
if !recent.is_empty() && recent.iter().all(|s| *s == agent_node_id) {
|
|
return Ok(serde_json::json!({"status": "skipped", "reason": "loop_prevention"}));
|
|
}
|
|
|
|
// Hent meldingshistorikk
|
|
let mut messages = sqlx::query_as::<_, MessageRow>(
|
|
"SELECT n.id, n.content, n.created_by, n.created_at FROM nodes n JOIN edges e ON e.source_id = n.id WHERE e.target_id = $1 AND e.edge_type = 'belongs_to' AND n.node_kind = 'content' ORDER BY n.created_at DESC LIMIT $2",
|
|
).bind(communication_id).bind(config.max_context_messages)
|
|
.fetch_all(db).await.map_err(|e| format!("DB-feil: {e}"))?;
|
|
messages.reverse();
|
|
if messages.is_empty() {
|
|
return Ok(serde_json::json!({"status": "skipped", "reason": "no_messages"}));
|
|
}
|
|
|
|
// Kontekst
|
|
let comm_title: String = sqlx::query_scalar::<_, Option<String>>(
|
|
"SELECT title FROM nodes WHERE id = $1",
|
|
).bind(communication_id).fetch_optional(db).await
|
|
.map_err(|e| format!("DB-feil: {e}"))?.flatten()
|
|
.unwrap_or_else(|| "Samtale".to_string());
|
|
|
|
let participants = sqlx::query_as::<_, ParticipantRow>(
|
|
"SELECT n.id, n.title, n.node_kind FROM nodes n JOIN edges e ON e.source_id = n.id WHERE e.target_id = $1 AND e.edge_type IN ('owner', 'member_of')",
|
|
).bind(communication_id).fetch_all(db).await
|
|
.map_err(|e| format!("DB-feil: {e}"))?;
|
|
|
|
let permission: String = sqlx::query_scalar(
|
|
"SELECT permission FROM agent_permissions WHERE user_node_id = $1 AND agent_node_id = $2",
|
|
).bind(sender_node_id).bind(agent_node_id)
|
|
.fetch_optional(db).await.map_err(|e| format!("DB-feil: {e}"))?
|
|
.unwrap_or_else(|| "none".to_string());
|
|
|
|
// Bygg prompt
|
|
let name_map: std::collections::HashMap<Uuid, String> = participants.iter()
|
|
.map(|p| (p.id, p.title.clone().unwrap_or_else(|| p.node_kind.clone()))).collect();
|
|
|
|
let participant_names: String = participants.iter()
|
|
.filter(|p| p.id != agent_node_id)
|
|
.map(|p| p.title.as_deref().unwrap_or("Ukjent"))
|
|
.collect::<Vec<_>>().join(", ");
|
|
|
|
let mut conversation = String::new();
|
|
for m in &messages {
|
|
let name = name_map.get(&m.created_by).map(|s| s.as_str()).unwrap_or("Ukjent");
|
|
let content = m.content.as_deref().unwrap_or("");
|
|
if m.created_by == agent_node_id {
|
|
conversation.push_str(&format!("Claude: {content}\n"));
|
|
} else {
|
|
conversation.push_str(&format!("{name}: {content}\n"));
|
|
}
|
|
}
|
|
|
|
let perm_desc = match permission.as_str() {
|
|
"direct" => "Brukeren har 'direct'-tilgang.",
|
|
"propose" => "Brukeren har 'propose'-tilgang.",
|
|
_ => "",
|
|
};
|
|
|
|
// Sjekk om denne chatten diskuterer en spec-node (discusses-edge)
|
|
let spec_context: String = match sqlx::query_scalar::<_, Option<String>>(
|
|
"SELECT n.content FROM nodes n JOIN edges e ON e.source_id = $1 AND e.target_id = n.id WHERE e.edge_type = 'discusses' LIMIT 1",
|
|
).bind(communication_id).fetch_optional(db).await {
|
|
Ok(Some(Some(content))) if !content.is_empty() => {
|
|
let truncated = if content.len() > 4000 { &content[..4000] } else { &content };
|
|
format!("\n--- Gjeldende spesifikasjon ---\n{truncated}\n--- Slutt spesifikasjon ---\n\nDu har tilgang til spesifikasjonen over. Gi konkret feedback: hva er implementert, hva er planlagt, hva er teknisk vanskelig. Vær ærlig om begrensninger.\n")
|
|
}
|
|
_ => String::new(),
|
|
};
|
|
|
|
let prompt = format!(
|
|
r#"Du er Claude, en AI-assistent integrert i Synops-plattformen.
|
|
Du deltar i samtalen "{comm_title}" med {participant_names}.
|
|
Svar på norsk med mindre brukeren skriver på engelsk.
|
|
{perm_desc}
|
|
Svar konsist. Bruk vanlig tekst uten markdown-overskrifter.
|
|
Svar KUN med meldingsteksten.
|
|
{spec_context}
|
|
--- Samtalehistorikk ---
|
|
{conversation}--- Svar ---"#
|
|
);
|
|
|
|
// Kall claude CLI med retry ved API-feil (500/529)
|
|
let claude_path = std::env::var("CLAUDE_PATH").unwrap_or_else(|_| "claude".to_string());
|
|
let project_dir = std::env::var("PROJECT_DIR").unwrap_or_else(|_| "/home/vegard/synops".to_string());
|
|
|
|
tracing::info!(prompt_len = prompt.len(), "Kaller claude CLI");
|
|
|
|
let max_retries = 3u32;
|
|
let mut response_text = String::new();
|
|
|
|
for attempt in 0..=max_retries {
|
|
let output = tokio::process::Command::new(&claude_path)
|
|
.arg("-p")
|
|
.arg(&prompt)
|
|
.arg("--output-format")
|
|
.arg("json")
|
|
.arg("--dangerously-skip-permissions")
|
|
.env("CLAUDE_CODE_DISABLE_NONESSENTIAL_TRAFFIC", "1")
|
|
.current_dir(&project_dir)
|
|
.output()
|
|
.await
|
|
.map_err(|e| format!("Kunne ikke starte claude: {e}"))?;
|
|
|
|
let stderr = String::from_utf8_lossy(&output.stderr).to_string();
|
|
let stdout = String::from_utf8_lossy(&output.stdout).to_string();
|
|
|
|
// Sjekk om dette er en retrybar API-feil (500/529)
|
|
let is_api_error = !output.status.success()
|
|
&& (stderr.contains("500") || stderr.contains("529")
|
|
|| stderr.contains("overloaded") || stderr.contains("Internal Server Error"));
|
|
|
|
if is_api_error && attempt < max_retries {
|
|
let delay = std::time::Duration::from_secs(2u64.pow(attempt + 1)); // 2, 4, 8 sek
|
|
tracing::warn!(
|
|
attempt = attempt + 1,
|
|
delay_secs = delay.as_secs(),
|
|
"Claude API-feil, prøver igjen"
|
|
);
|
|
tokio::time::sleep(delay).await;
|
|
continue;
|
|
}
|
|
|
|
if !output.status.success() {
|
|
if is_api_error {
|
|
// Alle retries brukt opp — gi vennlig melding i stedet for rå feil
|
|
tracing::error!(attempts = max_retries + 1, "Claude API utilgjengelig etter alle forsøk");
|
|
response_text = "Beklager, jeg er midlertidig utilgjengelig — Anthropic sitt API svarer ikke akkurat nå. Prøv igjen om litt.".to_string();
|
|
break;
|
|
}
|
|
return Err(format!("claude feilet ({}): {}", output.status, &stderr[..stderr.len().min(500)]));
|
|
}
|
|
|
|
response_text = match serde_json::from_str::<serde_json::Value>(&stdout) {
|
|
Ok(json) => json["result"].as_str().unwrap_or("").to_string(),
|
|
Err(_) => stdout.trim().to_string(),
|
|
};
|
|
break;
|
|
}
|
|
|
|
if response_text.is_empty() {
|
|
return Err("Tom respons fra claude".to_string());
|
|
}
|
|
|
|
tracing::info!(response_len = response_text.len(), "Fikk svar fra claude");
|
|
|
|
// Skriv svar
|
|
let reply_id = Uuid::now_v7();
|
|
let edge_id = Uuid::now_v7();
|
|
let agent_str = agent_node_id.to_string();
|
|
let reply_str = reply_id.to_string();
|
|
let edge_str = edge_id.to_string();
|
|
let comm_str = communication_id.to_string();
|
|
let empty = serde_json::json!({}).to_string();
|
|
|
|
stdb.create_node(&reply_str, "content", "", &response_text, "hidden", &empty, &agent_str)
|
|
.await.map_err(|e| format!("STDB: {e}"))?;
|
|
stdb.create_edge(&edge_str, &reply_str, &comm_str, "belongs_to", &empty, false, &agent_str)
|
|
.await.map_err(|e| format!("STDB: {e}"))?;
|
|
|
|
let metadata = serde_json::json!({});
|
|
sqlx::query("INSERT INTO nodes (id, node_kind, content, visibility, metadata, created_by) VALUES ($1, 'content', $2, 'hidden'::visibility, $3, $4)")
|
|
.bind(reply_id).bind(&response_text).bind(&metadata).bind(agent_node_id)
|
|
.execute(db).await.map_err(|e| format!("PG: {e}"))?;
|
|
sqlx::query("INSERT INTO edges (id, source_id, target_id, edge_type, metadata, system, created_by) VALUES ($1, $2, $3, 'belongs_to', '{}', false, $4)")
|
|
.bind(edge_id).bind(reply_id).bind(communication_id).bind(agent_node_id)
|
|
.execute(db).await.map_err(|e| format!("PG: {e}"))?;
|
|
sqlx::query("INSERT INTO ai_usage_log (agent_node_id, communication_id, job_id, model_alias, model_actual, prompt_tokens, completion_tokens, total_tokens, job_type) VALUES ($1, $2, $3, 'claude-code', 'claude-code-cli', 0, 0, 0, 'agent_respond')")
|
|
.bind(agent_node_id).bind(communication_id).bind(job.id)
|
|
.execute(db).await.map_err(|e| format!("PG: {e}"))?;
|
|
|
|
tracing::info!(reply_node_id = %reply_id, "Agent-svar persistert");
|
|
|
|
Ok(serde_json::json!({
|
|
"status": "completed",
|
|
"reply_node_id": reply_id.to_string(),
|
|
"response_len": response_text.len()
|
|
}))
|
|
}
|
|
|
|
async fn load_agent_config(db: &PgPool, agent_node_id: Uuid) -> Result<AgentConfig, String> {
|
|
let c: serde_json::Value = sqlx::query_scalar("SELECT config FROM agent_identities WHERE node_id = $1")
|
|
.bind(agent_node_id).fetch_optional(db).await
|
|
.map_err(|e| format!("DB-feil: {e}"))?.unwrap_or(serde_json::json!({}));
|
|
Ok(AgentConfig {
|
|
max_context_messages: c["max_context_messages"].as_i64().unwrap_or(50),
|
|
max_consecutive_agent_messages: c["max_consecutive_agent_messages"].as_i64().unwrap_or(3),
|
|
rate_limit_per_hour: c["rate_limit_per_hour"].as_i64().unwrap_or(60),
|
|
})
|
|
}
|
|
|
|
pub async fn find_agent_participant(db: &PgPool, communication_id: Uuid) -> Result<Option<Uuid>, sqlx::Error> {
|
|
sqlx::query_scalar(
|
|
"SELECT n.id FROM nodes n JOIN edges e ON e.source_id = n.id JOIN agent_identities ai ON ai.node_id = n.id WHERE e.target_id = $1 AND e.edge_type IN ('owner', 'member_of') AND n.node_kind = 'agent' AND ai.is_active = true LIMIT 1",
|
|
).bind(communication_id).fetch_optional(db).await
|
|
}
|