Implementer synops-respond CLI-verktøy (oppgave 21.8)
Bryter ut prosesseringslogikken fra maskinrommet/src/agent.rs til et selvstendig CLI-verktøy: synops-respond. Følger unix-filosofien der maskinrommet orkestrerer og CLI-verktøy gjør jobben. Ansvarsdeling: - maskinrommet beholder: kill switch, rate limiting, loop-prevensjon, STDB-skriving (sanntidsvisning for frontend) - synops-respond håndterer: kontekst-henting fra PG, prompt-bygging, claude CLI-kall med retry, PG-skriving (node, edges, logging) agent.rs er nå en tynn dispatcher (~140 linjer, ned fra ~305) som validerer sikkerhet og spawner synops-respond, likt mønsteret fra summarize.rs sin delegering til synops-summarize. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
595ac1a4d9
commit
b6cd2b4571
6 changed files with 568 additions and 211 deletions
|
|
@ -4,7 +4,7 @@
|
|||
|
||||
Claude deltar i samtaler som en agent-node i grafen. Når en bruker sender
|
||||
en melding i en kommunikasjonsnode der Claude er deltaker, trigger maskinrommet
|
||||
en `agent_respond`-jobb som kaller `claude -p` CLI og skriver svaret tilbake.
|
||||
en `agent_respond`-jobb som delegerer til `synops-respond` CLI-verktøyet.
|
||||
|
||||
## Arkitektur
|
||||
|
||||
|
|
@ -14,12 +14,21 @@ Bruker sender melding (via frontend)
|
|||
→ sjekker: har kommunikasjonsnoden en agent-deltaker?
|
||||
→ ja: enqueue agent_respond-jobb (prioritet 8)
|
||||
→ jobbkø-worker plukker opp (poll hvert 2s)
|
||||
→ kaller: claude -p "<prompt>" --output-format json --dangerously-skip-permissions
|
||||
→ parser JSON-respons, henter result-feltet
|
||||
→ skriver svar som content-node (STDB instant + PG async)
|
||||
→ maskinrommet: kill switch, rate limit, loop-prevensjon
|
||||
→ spawner: synops-respond --communication-id ... --write
|
||||
→ henter meldingshistorikk + kontekst fra PG
|
||||
→ bygger prompt
|
||||
→ kaller: claude -p "<prompt>" --output-format json
|
||||
→ oppretter svar-node i PG, logger ai_usage + resource_usage
|
||||
→ returnerer JSON med reply_node_id + response_text
|
||||
→ maskinrommet: skriver til STDB (sanntidsvisning)
|
||||
→ frontend viser melding i sanntid via STDB WebSocket
|
||||
```
|
||||
|
||||
Ansvarsdeling (unix-filosofi):
|
||||
- **Maskinrommet:** Auth, kill switch, rate limiting, loop-prevensjon, STDB-skriving
|
||||
- **synops-respond:** Kontekst-henting, prompt-bygging, claude-kall, PG-skriving
|
||||
|
||||
Latens: ~3-5 sekunder fra melding til svar.
|
||||
|
||||
## Noder og tabeller
|
||||
|
|
|
|||
|
|
@ -1,38 +1,32 @@
|
|||
// Agent — Claude som chat-deltaker.
|
||||
// Agent-dispatcher — delegerer prosessering til synops-respond CLI.
|
||||
//
|
||||
// Håndterer `agent_respond`-jobber fra jobbkøen.
|
||||
// Kaller `claude -p` direkte som subprocess — maskinrommet kjører
|
||||
// på hosten (ikke i Docker) og har tilgang til claude CLI.
|
||||
// Maskinrommet beholder: kill switch, rate limiting, loop-prevensjon,
|
||||
// og STDB-skriving (sanntidsvisning). Alt annet (kontekst-henting,
|
||||
// prompt-bygging, claude-kall, PG-skriving) gjøres av synops-respond.
|
||||
//
|
||||
// Jobbtype: "agent_respond"
|
||||
// Payload: { "communication_id", "message_id", "agent_node_id", "sender_node_id" }
|
||||
//
|
||||
// Ref: docs/retninger/unix_filosofi.md, docs/infra/claude_agent.md
|
||||
|
||||
use std::process::Stdio;
|
||||
|
||||
use sqlx::PgPool;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::jobs::JobRow;
|
||||
use crate::resource_usage;
|
||||
use crate::stdb::StdbClient;
|
||||
|
||||
#[derive(Debug)]
|
||||
struct AgentConfig {
|
||||
max_context_messages: i64,
|
||||
max_consecutive_agent_messages: i64,
|
||||
rate_limit_per_hour: i64,
|
||||
}
|
||||
|
||||
#[derive(sqlx::FromRow, Debug)]
|
||||
struct MessageRow {
|
||||
#[allow(dead_code)]
|
||||
id: Uuid,
|
||||
content: Option<String>,
|
||||
created_by: Uuid,
|
||||
#[allow(dead_code)]
|
||||
created_at: chrono::DateTime<chrono::Utc>,
|
||||
}
|
||||
|
||||
#[derive(sqlx::FromRow, Debug)]
|
||||
struct ParticipantRow {
|
||||
id: Uuid,
|
||||
title: Option<String>,
|
||||
node_kind: String,
|
||||
/// Synops-respond binary path.
|
||||
fn respond_bin() -> String {
|
||||
std::env::var("SYNOPS_RESPOND_BIN")
|
||||
.unwrap_or_else(|_| "synops-respond".to_string())
|
||||
}
|
||||
|
||||
pub async fn handle_agent_respond(
|
||||
|
|
@ -63,6 +57,8 @@ pub async fn handle_agent_respond(
|
|||
|
||||
let config = load_agent_config(db, agent_node_id).await?;
|
||||
|
||||
// --- Sikkerhetskontroller (forblir i maskinrommet) ---
|
||||
|
||||
// Kill switch
|
||||
let is_active: bool = sqlx::query_scalar(
|
||||
"SELECT is_active FROM agent_identities WHERE node_id = $1",
|
||||
|
|
@ -90,200 +86,87 @@ pub async fn handle_agent_respond(
|
|||
return Ok(serde_json::json!({"status": "skipped", "reason": "loop_prevention"}));
|
||||
}
|
||||
|
||||
// Hent meldingshistorikk
|
||||
let mut messages = sqlx::query_as::<_, MessageRow>(
|
||||
"SELECT n.id, n.content, n.created_by, n.created_at FROM nodes n JOIN edges e ON e.source_id = n.id WHERE e.target_id = $1 AND e.edge_type = 'belongs_to' AND n.node_kind = 'content' ORDER BY n.created_at DESC LIMIT $2",
|
||||
).bind(communication_id).bind(config.max_context_messages)
|
||||
.fetch_all(db).await.map_err(|e| format!("DB-feil: {e}"))?;
|
||||
messages.reverse();
|
||||
if messages.is_empty() {
|
||||
return Ok(serde_json::json!({"status": "skipped", "reason": "no_messages"}));
|
||||
// --- Deleger prosessering til synops-respond ---
|
||||
|
||||
let bin = respond_bin();
|
||||
let mut cmd = tokio::process::Command::new(&bin);
|
||||
|
||||
cmd.arg("--communication-id").arg(communication_id.to_string())
|
||||
.arg("--message-id").arg(message_id.to_string())
|
||||
.arg("--agent-node-id").arg(agent_node_id.to_string())
|
||||
.arg("--sender-node-id").arg(sender_node_id.to_string())
|
||||
.arg("--job-id").arg(job.id.to_string())
|
||||
.arg("--write");
|
||||
|
||||
// Sett miljøvariabler CLI-verktøyet trenger
|
||||
let db_url = std::env::var("DATABASE_URL")
|
||||
.map_err(|_| "DATABASE_URL ikke satt".to_string())?;
|
||||
cmd.env("DATABASE_URL", &db_url);
|
||||
|
||||
// Videresend claude-relaterte env-variabler
|
||||
if let Ok(v) = std::env::var("CLAUDE_PATH") {
|
||||
cmd.env("CLAUDE_PATH", v);
|
||||
}
|
||||
if let Ok(v) = std::env::var("PROJECT_DIR") {
|
||||
cmd.env("PROJECT_DIR", v);
|
||||
}
|
||||
|
||||
// Kontekst
|
||||
let comm_title: String = sqlx::query_scalar::<_, Option<String>>(
|
||||
"SELECT title FROM nodes WHERE id = $1",
|
||||
).bind(communication_id).fetch_optional(db).await
|
||||
.map_err(|e| format!("DB-feil: {e}"))?.flatten()
|
||||
.unwrap_or_else(|| "Samtale".to_string());
|
||||
cmd.stdout(Stdio::piped())
|
||||
.stderr(Stdio::piped());
|
||||
|
||||
let participants = sqlx::query_as::<_, ParticipantRow>(
|
||||
"SELECT n.id, n.title, n.node_kind FROM nodes n JOIN edges e ON e.source_id = n.id WHERE e.target_id = $1 AND e.edge_type IN ('owner', 'member_of')",
|
||||
).bind(communication_id).fetch_all(db).await
|
||||
.map_err(|e| format!("DB-feil: {e}"))?;
|
||||
tracing::info!(bin = %bin, "Starter synops-respond");
|
||||
|
||||
let permission: String = sqlx::query_scalar(
|
||||
"SELECT permission FROM agent_permissions WHERE user_node_id = $1 AND agent_node_id = $2",
|
||||
).bind(sender_node_id).bind(agent_node_id)
|
||||
.fetch_optional(db).await.map_err(|e| format!("DB-feil: {e}"))?
|
||||
.unwrap_or_else(|| "none".to_string());
|
||||
let child = cmd.spawn()
|
||||
.map_err(|e| format!("Kunne ikke starte {bin}: {e}"))?;
|
||||
let output = child.wait_with_output().await
|
||||
.map_err(|e| format!("Feil ved kjøring av {bin}: {e}"))?;
|
||||
|
||||
// Bygg prompt
|
||||
let name_map: std::collections::HashMap<Uuid, String> = participants.iter()
|
||||
.map(|p| (p.id, p.title.clone().unwrap_or_else(|| p.node_kind.clone()))).collect();
|
||||
let stderr = String::from_utf8_lossy(&output.stderr);
|
||||
if !stderr.is_empty() {
|
||||
tracing::info!(stderr = %stderr, "synops-respond stderr");
|
||||
}
|
||||
|
||||
let participant_names: String = participants.iter()
|
||||
.filter(|p| p.id != agent_node_id)
|
||||
.map(|p| p.title.as_deref().unwrap_or("Ukjent"))
|
||||
.collect::<Vec<_>>().join(", ");
|
||||
if !output.status.success() {
|
||||
let code = output.status.code().unwrap_or(-1);
|
||||
return Err(format!("synops-respond feilet (exit {code}): {stderr}"));
|
||||
}
|
||||
|
||||
let mut conversation = String::new();
|
||||
for m in &messages {
|
||||
let name = name_map.get(&m.created_by).map(|s| s.as_str()).unwrap_or("Ukjent");
|
||||
let content = m.content.as_deref().unwrap_or("");
|
||||
if m.created_by == agent_node_id {
|
||||
conversation.push_str(&format!("Claude: {content}\n"));
|
||||
} else {
|
||||
conversation.push_str(&format!("{name}: {content}\n"));
|
||||
// Parse stdout som JSON
|
||||
let stdout = String::from_utf8_lossy(&output.stdout);
|
||||
let result: serde_json::Value = serde_json::from_str(&stdout)
|
||||
.map_err(|e| format!("Kunne ikke parse synops-respond output: {e}"))?;
|
||||
|
||||
// --- STDB-skriving for sanntidsvisning (forblir i maskinrommet) ---
|
||||
|
||||
if result["status"].as_str() == Some("completed") {
|
||||
if let Some(reply_node_id) = result["reply_node_id"].as_str() {
|
||||
let response_text = result["response_text"].as_str().unwrap_or("");
|
||||
let agent_str = agent_node_id.to_string();
|
||||
let comm_str = communication_id.to_string();
|
||||
let edge_id = Uuid::now_v7().to_string();
|
||||
let empty = serde_json::json!({}).to_string();
|
||||
|
||||
if let Err(e) = stdb.create_node(
|
||||
reply_node_id, "content", "", response_text, "hidden", &empty, &agent_str,
|
||||
).await {
|
||||
tracing::warn!(error = %e, "STDB create_node feilet (PG er allerede skrevet)");
|
||||
}
|
||||
|
||||
if let Err(e) = stdb.create_edge(
|
||||
&edge_id, reply_node_id, &comm_str, "belongs_to", &empty, false, &agent_str,
|
||||
).await {
|
||||
tracing::warn!(error = %e, "STDB create_edge feilet (PG er allerede skrevet)");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let perm_desc = match permission.as_str() {
|
||||
"direct" => "Brukeren har 'direct'-tilgang.",
|
||||
"propose" => "Brukeren har 'propose'-tilgang.",
|
||||
_ => "",
|
||||
};
|
||||
|
||||
// Sjekk om denne chatten diskuterer en spec-node (discusses-edge)
|
||||
let spec_context: String = match sqlx::query_scalar::<_, Option<String>>(
|
||||
"SELECT n.content FROM nodes n JOIN edges e ON e.source_id = $1 AND e.target_id = n.id WHERE e.edge_type = 'discusses' LIMIT 1",
|
||||
).bind(communication_id).fetch_optional(db).await {
|
||||
Ok(Some(Some(content))) if !content.is_empty() => {
|
||||
let truncated = if content.len() > 4000 { &content[..4000] } else { &content };
|
||||
format!("\n--- Gjeldende spesifikasjon ---\n{truncated}\n--- Slutt spesifikasjon ---\n\nDu har tilgang til spesifikasjonen over. Gi konkret feedback: hva er implementert, hva er planlagt, hva er teknisk vanskelig. Vær ærlig om begrensninger.\n")
|
||||
}
|
||||
_ => String::new(),
|
||||
};
|
||||
|
||||
let prompt = format!(
|
||||
r#"Du er Claude, en AI-assistent integrert i Synops-plattformen.
|
||||
Du deltar i samtalen "{comm_title}" med {participant_names}.
|
||||
Svar på norsk med mindre brukeren skriver på engelsk.
|
||||
{perm_desc}
|
||||
Svar konsist. Bruk vanlig tekst uten markdown-overskrifter.
|
||||
Svar KUN med meldingsteksten.
|
||||
{spec_context}
|
||||
--- Samtalehistorikk ---
|
||||
{conversation}--- Svar ---"#
|
||||
tracing::info!(
|
||||
status = result["status"].as_str().unwrap_or("unknown"),
|
||||
reply_node_id = result["reply_node_id"].as_str().unwrap_or("n/a"),
|
||||
"synops-respond fullført"
|
||||
);
|
||||
|
||||
// Kall claude CLI med retry ved API-feil (500/529)
|
||||
let claude_path = std::env::var("CLAUDE_PATH").unwrap_or_else(|_| "claude".to_string());
|
||||
let project_dir = std::env::var("PROJECT_DIR").unwrap_or_else(|_| "/home/vegard/synops".to_string());
|
||||
|
||||
tracing::info!(prompt_len = prompt.len(), "Kaller claude CLI");
|
||||
|
||||
let max_retries = 3u32;
|
||||
let mut response_text = String::new();
|
||||
|
||||
for attempt in 0..=max_retries {
|
||||
let output = tokio::process::Command::new(&claude_path)
|
||||
.arg("-p")
|
||||
.arg(&prompt)
|
||||
.arg("--output-format")
|
||||
.arg("json")
|
||||
.arg("--dangerously-skip-permissions")
|
||||
.env("CLAUDE_CODE_DISABLE_NONESSENTIAL_TRAFFIC", "1")
|
||||
.current_dir(&project_dir)
|
||||
.output()
|
||||
.await
|
||||
.map_err(|e| format!("Kunne ikke starte claude: {e}"))?;
|
||||
|
||||
let stderr = String::from_utf8_lossy(&output.stderr).to_string();
|
||||
let stdout = String::from_utf8_lossy(&output.stdout).to_string();
|
||||
|
||||
// Sjekk om dette er en retrybar API-feil (500/529)
|
||||
let is_api_error = !output.status.success()
|
||||
&& (stderr.contains("500") || stderr.contains("529")
|
||||
|| stderr.contains("overloaded") || stderr.contains("Internal Server Error"));
|
||||
|
||||
if is_api_error && attempt < max_retries {
|
||||
let delay = std::time::Duration::from_secs(2u64.pow(attempt + 1)); // 2, 4, 8 sek
|
||||
tracing::warn!(
|
||||
attempt = attempt + 1,
|
||||
delay_secs = delay.as_secs(),
|
||||
"Claude API-feil, prøver igjen"
|
||||
);
|
||||
tokio::time::sleep(delay).await;
|
||||
continue;
|
||||
}
|
||||
|
||||
if !output.status.success() {
|
||||
if is_api_error {
|
||||
// Alle retries brukt opp — gi vennlig melding i stedet for rå feil
|
||||
tracing::error!(attempts = max_retries + 1, "Claude API utilgjengelig etter alle forsøk");
|
||||
response_text = "Beklager, jeg er midlertidig utilgjengelig — Anthropic sitt API svarer ikke akkurat nå. Prøv igjen om litt.".to_string();
|
||||
break;
|
||||
}
|
||||
return Err(format!("claude feilet ({}): {}", output.status, &stderr[..stderr.len().min(500)]));
|
||||
}
|
||||
|
||||
response_text = match serde_json::from_str::<serde_json::Value>(&stdout) {
|
||||
Ok(json) => json["result"].as_str().unwrap_or("").to_string(),
|
||||
Err(_) => stdout.trim().to_string(),
|
||||
};
|
||||
break;
|
||||
}
|
||||
|
||||
if response_text.is_empty() {
|
||||
return Err("Tom respons fra claude".to_string());
|
||||
}
|
||||
|
||||
tracing::info!(response_len = response_text.len(), "Fikk svar fra claude");
|
||||
|
||||
// Skriv svar
|
||||
let reply_id = Uuid::now_v7();
|
||||
let edge_id = Uuid::now_v7();
|
||||
let agent_str = agent_node_id.to_string();
|
||||
let reply_str = reply_id.to_string();
|
||||
let edge_str = edge_id.to_string();
|
||||
let comm_str = communication_id.to_string();
|
||||
let empty = serde_json::json!({}).to_string();
|
||||
|
||||
stdb.create_node(&reply_str, "content", "", &response_text, "hidden", &empty, &agent_str)
|
||||
.await.map_err(|e| format!("STDB: {e}"))?;
|
||||
stdb.create_edge(&edge_str, &reply_str, &comm_str, "belongs_to", &empty, false, &agent_str)
|
||||
.await.map_err(|e| format!("STDB: {e}"))?;
|
||||
|
||||
let metadata = serde_json::json!({});
|
||||
sqlx::query("INSERT INTO nodes (id, node_kind, content, visibility, metadata, created_by) VALUES ($1, 'content', $2, 'hidden'::visibility, $3, $4)")
|
||||
.bind(reply_id).bind(&response_text).bind(&metadata).bind(agent_node_id)
|
||||
.execute(db).await.map_err(|e| format!("PG: {e}"))?;
|
||||
sqlx::query("INSERT INTO edges (id, source_id, target_id, edge_type, metadata, system, created_by) VALUES ($1, $2, $3, 'belongs_to', '{}', false, $4)")
|
||||
.bind(edge_id).bind(reply_id).bind(communication_id).bind(agent_node_id)
|
||||
.execute(db).await.map_err(|e| format!("PG: {e}"))?;
|
||||
sqlx::query("INSERT INTO ai_usage_log (agent_node_id, communication_id, job_id, model_alias, model_actual, prompt_tokens, completion_tokens, total_tokens, job_type) VALUES ($1, $2, $3, 'claude-code', 'claude-code-cli', 0, 0, 0, 'agent_respond')")
|
||||
.bind(agent_node_id).bind(communication_id).bind(job.id)
|
||||
.execute(db).await.map_err(|e| format!("PG: {e}"))?;
|
||||
|
||||
// Logg til resource_usage_log (samlet ressurssporing)
|
||||
let collection_id = resource_usage::find_collection_for_node(db, communication_id).await;
|
||||
if let Err(e) = resource_usage::log(
|
||||
db,
|
||||
communication_id,
|
||||
Some(sender_node_id),
|
||||
collection_id,
|
||||
"ai",
|
||||
serde_json::json!({
|
||||
"model_level": "deep",
|
||||
"model_id": "claude-code-cli",
|
||||
"tokens_in": 0,
|
||||
"tokens_out": 0,
|
||||
"job_type": "agent_respond"
|
||||
}),
|
||||
)
|
||||
.await
|
||||
{
|
||||
tracing::warn!(error = %e, "Kunne ikke logge AI-ressursforbruk for agent_respond");
|
||||
}
|
||||
|
||||
tracing::info!(reply_node_id = %reply_id, "Agent-svar persistert");
|
||||
|
||||
Ok(serde_json::json!({
|
||||
"status": "completed",
|
||||
"reply_node_id": reply_id.to_string(),
|
||||
"response_len": response_text.len()
|
||||
}))
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
async fn load_agent_config(db: &PgPool, agent_node_id: Uuid) -> Result<AgentConfig, String> {
|
||||
|
|
@ -291,7 +174,6 @@ async fn load_agent_config(db: &PgPool, agent_node_id: Uuid) -> Result<AgentConf
|
|||
.bind(agent_node_id).fetch_optional(db).await
|
||||
.map_err(|e| format!("DB-feil: {e}"))?.unwrap_or(serde_json::json!({}));
|
||||
Ok(AgentConfig {
|
||||
max_context_messages: c["max_context_messages"].as_i64().unwrap_or(50),
|
||||
max_consecutive_agent_messages: c["max_consecutive_agent_messages"].as_i64().unwrap_or(3),
|
||||
rate_limit_per_hour: c["rate_limit_per_hour"].as_i64().unwrap_or(60),
|
||||
})
|
||||
|
|
|
|||
3
tasks.md
3
tasks.md
|
|
@ -248,8 +248,7 @@ kaller dem direkte. Samme verktøy, to brukere.
|
|||
- [x] 21.5 `synops-tts`: Tekst-til-tale. Input: `--text <tekst> --voice <stemme>`. Output: CAS-hash for lydfil. Erstatter `tts.rs`.
|
||||
- [x] 21.6 `synops-summarize`: AI-oppsummering. Input: `--communication-id <uuid>`. Output: sammendrag som tekst. Erstatter `summarize.rs`.
|
||||
- [x] 21.7 `synops-suggest-edges`: AI-foreslåtte edges. Input: `--node-id <uuid>`. Output: JSON med forslag (target, edge_type, confidence). Erstatter `ai_edges.rs`.
|
||||
- [~] 21.8 `synops-respond`: Claude chat-svar. Input: `--communication-id <uuid> --message-id <uuid>`. Output: svartekst. Erstatter `agent.rs` sin prosessering (auth/ratelimit forblir i maskinrommet).
|
||||
> Påbegynt: 2026-03-18T09:44
|
||||
- [x] 21.8 `synops-respond`: Claude chat-svar. Input: `--communication-id <uuid> --message-id <uuid>`. Output: svartekst. Erstatter `agent.rs` sin prosessering (auth/ratelimit forblir i maskinrommet).
|
||||
- [ ] 21.9 `synops-prune`: Opprydding av gamle noder. Input: `--dry-run` for forhåndsvisning. Erstatter `pruning.rs`.
|
||||
|
||||
### Oppslag (Claude-verktøy)
|
||||
|
|
|
|||
|
|
@ -14,6 +14,7 @@ eller maskinrommet-API. Ligger i PATH via symlink eller direkte kall.
|
|||
| `synops-tts` | Tekst-til-tale via ElevenLabs, lagrer lyd i CAS | Ferdig |
|
||||
| `synops-summarize` | AI-oppsummering av kommunikasjonsnode via LiteLLM | Ferdig |
|
||||
| `synops-suggest-edges` | AI-foreslåtte edges (topics/mentions) for en node via LiteLLM | Ferdig |
|
||||
| `synops-respond` | Claude chat-svar i kommunikasjonsnoder | Ferdig |
|
||||
|
||||
## Konvensjoner
|
||||
- Navnekonvensjon: `synops-<verb>` (f.eks. `synops-context`)
|
||||
|
|
@ -29,5 +30,5 @@ Ref: `docs/infra/agent_api.md`
|
|||
- `synops-search <query>` — søk i grafen (noder + edges)
|
||||
- `synops-tasks [--phase N] [--status S]` — oppgavestatus fra tasks.md
|
||||
- `synops-feature-status <key>` — implementeringsstatus for en feature
|
||||
- `synops-respond <comm_id> <tekst>` — send svar i en chat
|
||||
- ~~`synops-respond`~~ — implementert (se tabell over)
|
||||
- `synops-update-spec <node_id>` — oppdater spec-node (stdin)
|
||||
|
|
|
|||
19
tools/synops-respond/Cargo.toml
Normal file
19
tools/synops-respond/Cargo.toml
Normal file
|
|
@ -0,0 +1,19 @@
|
|||
[package]
|
||||
name = "synops-respond"
|
||||
version = "0.1.0"
|
||||
edition = "2024"
|
||||
|
||||
[[bin]]
|
||||
name = "synops-respond"
|
||||
path = "src/main.rs"
|
||||
|
||||
[dependencies]
|
||||
clap = { version = "4", features = ["derive"] }
|
||||
tokio = { version = "1", features = ["full"] }
|
||||
sqlx = { version = "0.8", features = ["runtime-tokio", "tls-rustls", "postgres", "uuid", "chrono", "json"] }
|
||||
serde = { version = "1", features = ["derive"] }
|
||||
serde_json = "1"
|
||||
uuid = { version = "1", features = ["v7", "serde"] }
|
||||
chrono = { version = "0.4", features = ["serde"] }
|
||||
tracing = "0.1"
|
||||
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
|
||||
447
tools/synops-respond/src/main.rs
Normal file
447
tools/synops-respond/src/main.rs
Normal file
|
|
@ -0,0 +1,447 @@
|
|||
// synops-respond — Claude chat-svar for kommunikasjonsnoder.
|
||||
//
|
||||
// Henter meldingshistorikk og kontekst fra PG, bygger prompt,
|
||||
// kaller claude CLI, og returnerer svartekst. Med --write:
|
||||
// oppretter svar-node og edges i PG + logger ressursforbruk.
|
||||
//
|
||||
// STDB-skriving (sanntidsvisning) gjøres av maskinrommet etter
|
||||
// at dette verktøyet returnerer — maskinrommet eier STDB-skriving.
|
||||
//
|
||||
// Auth, ratelimit, kill switch og loop-prevensjon forblir i maskinrommet.
|
||||
//
|
||||
// Miljøvariabler:
|
||||
// DATABASE_URL — PostgreSQL-tilkobling (påkrevd)
|
||||
// CLAUDE_PATH — Sti til claude CLI (default: "claude")
|
||||
// PROJECT_DIR — Arbeidskatalog for claude (default: "/home/vegard/synops")
|
||||
//
|
||||
// Erstatter: prosesseringslogikken i maskinrommet/src/agent.rs
|
||||
// Ref: docs/retninger/unix_filosofi.md, docs/infra/claude_agent.md
|
||||
|
||||
use clap::Parser;
|
||||
use std::process;
|
||||
use uuid::Uuid;
|
||||
|
||||
/// Claude chat-svar for kommunikasjonsnoder.
|
||||
#[derive(Parser)]
|
||||
#[command(name = "synops-respond", about = "Generer Claude-svar i en kommunikasjonsnode")]
|
||||
struct Cli {
|
||||
/// Kommunikasjonsnode-ID der samtalen foregår
|
||||
#[arg(long)]
|
||||
communication_id: Uuid,
|
||||
|
||||
/// Meldings-ID som trigget svaret
|
||||
#[arg(long)]
|
||||
message_id: Uuid,
|
||||
|
||||
/// Agent-node-ID (Claude sin node)
|
||||
#[arg(long)]
|
||||
agent_node_id: Uuid,
|
||||
|
||||
/// Avsender-node-ID (brukeren som sendte meldingen)
|
||||
#[arg(long)]
|
||||
sender_node_id: Uuid,
|
||||
|
||||
/// Jobb-ID fra maskinrommet (for ai_usage_log)
|
||||
#[arg(long)]
|
||||
job_id: Option<Uuid>,
|
||||
|
||||
/// Skriv svar-node og edges til database (uten: kun generering + stdout)
|
||||
#[arg(long)]
|
||||
write: bool,
|
||||
}
|
||||
|
||||
// --- Database-rader ---
|
||||
|
||||
#[derive(sqlx::FromRow)]
|
||||
struct MessageRow {
|
||||
#[allow(dead_code)]
|
||||
id: Uuid,
|
||||
content: Option<String>,
|
||||
created_by: Uuid,
|
||||
#[allow(dead_code)]
|
||||
created_at: chrono::DateTime<chrono::Utc>,
|
||||
}
|
||||
|
||||
#[derive(sqlx::FromRow)]
|
||||
struct ParticipantRow {
|
||||
id: Uuid,
|
||||
title: Option<String>,
|
||||
node_kind: String,
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
tracing_subscriber::fmt()
|
||||
.with_env_filter(
|
||||
tracing_subscriber::EnvFilter::try_from_default_env()
|
||||
.unwrap_or_else(|_| "synops_respond=info".parse().unwrap()),
|
||||
)
|
||||
.with_target(false)
|
||||
.with_writer(std::io::stderr)
|
||||
.init();
|
||||
|
||||
let cli = Cli::parse();
|
||||
|
||||
if let Err(e) = run(cli).await {
|
||||
eprintln!("Feil: {e}");
|
||||
process::exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
async fn run(cli: Cli) -> Result<(), String> {
|
||||
let db_url = std::env::var("DATABASE_URL")
|
||||
.map_err(|_| "DATABASE_URL er ikke satt".to_string())?;
|
||||
|
||||
let db = sqlx::postgres::PgPoolOptions::new()
|
||||
.max_connections(2)
|
||||
.connect(&db_url)
|
||||
.await
|
||||
.map_err(|e| format!("Kunne ikke koble til database: {e}"))?;
|
||||
|
||||
let communication_id = cli.communication_id;
|
||||
let agent_node_id = cli.agent_node_id;
|
||||
let sender_node_id = cli.sender_node_id;
|
||||
|
||||
// 1. Hent agent-config for max_context_messages
|
||||
let config: serde_json::Value = sqlx::query_scalar(
|
||||
"SELECT config FROM agent_identities WHERE node_id = $1",
|
||||
)
|
||||
.bind(agent_node_id)
|
||||
.fetch_optional(&db)
|
||||
.await
|
||||
.map_err(|e| format!("DB-feil: {e}"))?
|
||||
.unwrap_or(serde_json::json!({}));
|
||||
|
||||
let max_context_messages = config["max_context_messages"].as_i64().unwrap_or(50);
|
||||
|
||||
// 2. Hent meldingshistorikk
|
||||
let mut messages = sqlx::query_as::<_, MessageRow>(
|
||||
"SELECT n.id, n.content, n.created_by, n.created_at \
|
||||
FROM nodes n JOIN edges e ON e.source_id = n.id \
|
||||
WHERE e.target_id = $1 AND e.edge_type = 'belongs_to' AND n.node_kind = 'content' \
|
||||
ORDER BY n.created_at DESC LIMIT $2",
|
||||
)
|
||||
.bind(communication_id)
|
||||
.bind(max_context_messages)
|
||||
.fetch_all(&db)
|
||||
.await
|
||||
.map_err(|e| format!("DB-feil: {e}"))?;
|
||||
|
||||
messages.reverse();
|
||||
|
||||
if messages.is_empty() {
|
||||
let result = serde_json::json!({
|
||||
"status": "skipped",
|
||||
"reason": "no_messages"
|
||||
});
|
||||
println!("{}", serde_json::to_string_pretty(&result).unwrap());
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// 3. Hent kontekst: tittel, deltakere, tillatelser
|
||||
let comm_title: String = sqlx::query_scalar::<_, Option<String>>(
|
||||
"SELECT title FROM nodes WHERE id = $1",
|
||||
)
|
||||
.bind(communication_id)
|
||||
.fetch_optional(&db)
|
||||
.await
|
||||
.map_err(|e| format!("DB-feil: {e}"))?
|
||||
.flatten()
|
||||
.unwrap_or_else(|| "Samtale".to_string());
|
||||
|
||||
let participants = sqlx::query_as::<_, ParticipantRow>(
|
||||
"SELECT n.id, n.title, n.node_kind FROM nodes n \
|
||||
JOIN edges e ON e.source_id = n.id \
|
||||
WHERE e.target_id = $1 AND e.edge_type IN ('owner', 'member_of')",
|
||||
)
|
||||
.bind(communication_id)
|
||||
.fetch_all(&db)
|
||||
.await
|
||||
.map_err(|e| format!("DB-feil: {e}"))?;
|
||||
|
||||
let permission: String = sqlx::query_scalar(
|
||||
"SELECT permission FROM agent_permissions WHERE user_node_id = $1 AND agent_node_id = $2",
|
||||
)
|
||||
.bind(sender_node_id)
|
||||
.bind(agent_node_id)
|
||||
.fetch_optional(&db)
|
||||
.await
|
||||
.map_err(|e| format!("DB-feil: {e}"))?
|
||||
.unwrap_or_else(|| "none".to_string());
|
||||
|
||||
// 4. Bygg prompt
|
||||
let name_map: std::collections::HashMap<Uuid, String> = participants
|
||||
.iter()
|
||||
.map(|p| (p.id, p.title.clone().unwrap_or_else(|| p.node_kind.clone())))
|
||||
.collect();
|
||||
|
||||
let participant_names: String = participants
|
||||
.iter()
|
||||
.filter(|p| p.id != agent_node_id)
|
||||
.map(|p| p.title.as_deref().unwrap_or("Ukjent"))
|
||||
.collect::<Vec<_>>()
|
||||
.join(", ");
|
||||
|
||||
let mut conversation = String::new();
|
||||
for m in &messages {
|
||||
let name = name_map.get(&m.created_by).map(|s| s.as_str()).unwrap_or("Ukjent");
|
||||
let content = m.content.as_deref().unwrap_or("");
|
||||
if m.created_by == agent_node_id {
|
||||
conversation.push_str(&format!("Claude: {content}\n"));
|
||||
} else {
|
||||
conversation.push_str(&format!("{name}: {content}\n"));
|
||||
}
|
||||
}
|
||||
|
||||
let perm_desc = match permission.as_str() {
|
||||
"direct" => "Brukeren har 'direct'-tilgang.",
|
||||
"propose" => "Brukeren har 'propose'-tilgang.",
|
||||
_ => "",
|
||||
};
|
||||
|
||||
// Sjekk om chatten diskuterer en spec-node
|
||||
let spec_context: String = match sqlx::query_scalar::<_, Option<String>>(
|
||||
"SELECT n.content FROM nodes n \
|
||||
JOIN edges e ON e.source_id = $1 AND e.target_id = n.id \
|
||||
WHERE e.edge_type = 'discusses' LIMIT 1",
|
||||
)
|
||||
.bind(communication_id)
|
||||
.fetch_optional(&db)
|
||||
.await
|
||||
{
|
||||
Ok(Some(Some(content))) if !content.is_empty() => {
|
||||
let truncated = if content.len() > 4000 {
|
||||
&content[..4000]
|
||||
} else {
|
||||
&content
|
||||
};
|
||||
format!(
|
||||
"\n--- Gjeldende spesifikasjon ---\n{truncated}\n--- Slutt spesifikasjon ---\n\n\
|
||||
Du har tilgang til spesifikasjonen over. Gi konkret feedback: hva er implementert, \
|
||||
hva er planlagt, hva er teknisk vanskelig. Vær ærlig om begrensninger.\n"
|
||||
)
|
||||
}
|
||||
_ => String::new(),
|
||||
};
|
||||
|
||||
let prompt = format!(
|
||||
r#"Du er Claude, en AI-assistent integrert i Synops-plattformen.
|
||||
Du deltar i samtalen "{comm_title}" med {participant_names}.
|
||||
Svar på norsk med mindre brukeren skriver på engelsk.
|
||||
{perm_desc}
|
||||
Svar konsist. Bruk vanlig tekst uten markdown-overskrifter.
|
||||
Svar KUN med meldingsteksten.
|
||||
{spec_context}
|
||||
--- Samtalehistorikk ---
|
||||
{conversation}--- Svar ---"#
|
||||
);
|
||||
|
||||
// 5. Kall claude CLI med retry
|
||||
let claude_path = std::env::var("CLAUDE_PATH").unwrap_or_else(|_| "claude".to_string());
|
||||
let project_dir =
|
||||
std::env::var("PROJECT_DIR").unwrap_or_else(|_| "/home/vegard/synops".to_string());
|
||||
|
||||
tracing::info!(prompt_len = prompt.len(), "Kaller claude CLI");
|
||||
|
||||
let response_text = call_claude(&claude_path, &project_dir, &prompt).await?;
|
||||
|
||||
if response_text.is_empty() {
|
||||
return Err("Tom respons fra claude".to_string());
|
||||
}
|
||||
|
||||
tracing::info!(response_len = response_text.len(), "Fikk svar fra claude");
|
||||
|
||||
// 6. Skriv til database hvis --write
|
||||
let mut result = serde_json::json!({
|
||||
"status": "completed",
|
||||
"response_text": response_text,
|
||||
"response_len": response_text.len(),
|
||||
});
|
||||
|
||||
if cli.write {
|
||||
let reply_node_id = write_to_db(
|
||||
&db,
|
||||
communication_id,
|
||||
agent_node_id,
|
||||
sender_node_id,
|
||||
cli.job_id,
|
||||
&response_text,
|
||||
)
|
||||
.await?;
|
||||
|
||||
result["reply_node_id"] = serde_json::Value::String(reply_node_id.to_string());
|
||||
}
|
||||
|
||||
// 7. Output JSON til stdout
|
||||
println!(
|
||||
"{}",
|
||||
serde_json::to_string_pretty(&result)
|
||||
.map_err(|e| format!("JSON-serialisering feilet: {e}"))?
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Kall claude CLI med retry ved API-feil (500/529).
|
||||
async fn call_claude(
|
||||
claude_path: &str,
|
||||
project_dir: &str,
|
||||
prompt: &str,
|
||||
) -> Result<String, String> {
|
||||
let max_retries = 3u32;
|
||||
|
||||
for attempt in 0..=max_retries {
|
||||
let output = tokio::process::Command::new(claude_path)
|
||||
.arg("-p")
|
||||
.arg(prompt)
|
||||
.arg("--output-format")
|
||||
.arg("json")
|
||||
.arg("--dangerously-skip-permissions")
|
||||
.env("CLAUDE_CODE_DISABLE_NONESSENTIAL_TRAFFIC", "1")
|
||||
.current_dir(project_dir)
|
||||
.output()
|
||||
.await
|
||||
.map_err(|e| format!("Kunne ikke starte claude: {e}"))?;
|
||||
|
||||
let stderr = String::from_utf8_lossy(&output.stderr).to_string();
|
||||
let stdout = String::from_utf8_lossy(&output.stdout).to_string();
|
||||
|
||||
let is_api_error = !output.status.success()
|
||||
&& (stderr.contains("500")
|
||||
|| stderr.contains("529")
|
||||
|| stderr.contains("overloaded")
|
||||
|| stderr.contains("Internal Server Error"));
|
||||
|
||||
if is_api_error && attempt < max_retries {
|
||||
let delay = std::time::Duration::from_secs(2u64.pow(attempt + 1));
|
||||
tracing::warn!(
|
||||
attempt = attempt + 1,
|
||||
delay_secs = delay.as_secs(),
|
||||
"Claude API-feil, prøver igjen"
|
||||
);
|
||||
tokio::time::sleep(delay).await;
|
||||
continue;
|
||||
}
|
||||
|
||||
if !output.status.success() {
|
||||
if is_api_error {
|
||||
tracing::error!(
|
||||
attempts = max_retries + 1,
|
||||
"Claude API utilgjengelig etter alle forsøk"
|
||||
);
|
||||
return Ok(
|
||||
"Beklager, jeg er midlertidig utilgjengelig — Anthropic sitt API svarer \
|
||||
ikke akkurat nå. Prøv igjen om litt."
|
||||
.to_string(),
|
||||
);
|
||||
}
|
||||
return Err(format!(
|
||||
"claude feilet ({}): {}",
|
||||
output.status,
|
||||
&stderr[..stderr.len().min(500)]
|
||||
));
|
||||
}
|
||||
|
||||
let text = match serde_json::from_str::<serde_json::Value>(&stdout) {
|
||||
Ok(json) => json["result"].as_str().unwrap_or("").to_string(),
|
||||
Err(_) => stdout.trim().to_string(),
|
||||
};
|
||||
|
||||
return Ok(text);
|
||||
}
|
||||
|
||||
Err("Alle forsøk brukt opp".to_string())
|
||||
}
|
||||
|
||||
/// Opprett svar-node, belongs_to-edge, ai_usage_log og resource_usage_log i PG.
|
||||
async fn write_to_db(
|
||||
db: &sqlx::PgPool,
|
||||
communication_id: Uuid,
|
||||
agent_node_id: Uuid,
|
||||
sender_node_id: Uuid,
|
||||
job_id: Option<Uuid>,
|
||||
response_text: &str,
|
||||
) -> Result<Uuid, String> {
|
||||
let reply_id = Uuid::now_v7();
|
||||
let edge_id = Uuid::now_v7();
|
||||
let metadata = serde_json::json!({});
|
||||
|
||||
// Svar-node
|
||||
sqlx::query(
|
||||
"INSERT INTO nodes (id, node_kind, content, visibility, metadata, created_by) \
|
||||
VALUES ($1, 'content', $2, 'hidden'::visibility, $3, $4)",
|
||||
)
|
||||
.bind(reply_id)
|
||||
.bind(response_text)
|
||||
.bind(&metadata)
|
||||
.bind(agent_node_id)
|
||||
.execute(db)
|
||||
.await
|
||||
.map_err(|e| format!("PG insert svar-node feilet: {e}"))?;
|
||||
|
||||
tracing::info!(reply_node_id = %reply_id, "Svar-node opprettet");
|
||||
|
||||
// belongs_to-edge: svar → kommunikasjonsnode
|
||||
sqlx::query(
|
||||
"INSERT INTO edges (id, source_id, target_id, edge_type, metadata, system, created_by) \
|
||||
VALUES ($1, $2, $3, 'belongs_to', '{}', false, $4)",
|
||||
)
|
||||
.bind(edge_id)
|
||||
.bind(reply_id)
|
||||
.bind(communication_id)
|
||||
.bind(agent_node_id)
|
||||
.execute(db)
|
||||
.await
|
||||
.map_err(|e| format!("PG insert belongs_to-edge feilet: {e}"))?;
|
||||
|
||||
// ai_usage_log
|
||||
sqlx::query(
|
||||
"INSERT INTO ai_usage_log \
|
||||
(agent_node_id, communication_id, job_id, model_alias, model_actual, \
|
||||
prompt_tokens, completion_tokens, total_tokens, job_type) \
|
||||
VALUES ($1, $2, $3, 'claude-code', 'claude-code-cli', 0, 0, 0, 'agent_respond')",
|
||||
)
|
||||
.bind(agent_node_id)
|
||||
.bind(communication_id)
|
||||
.bind(job_id)
|
||||
.execute(db)
|
||||
.await
|
||||
.map_err(|e| format!("PG insert ai_usage_log feilet: {e}"))?;
|
||||
|
||||
// resource_usage_log
|
||||
let collection_id: Option<Uuid> = sqlx::query_scalar(
|
||||
"SELECT e.target_id FROM edges e \
|
||||
JOIN nodes n ON n.id = e.target_id \
|
||||
WHERE e.source_id = $1 AND e.edge_type = 'belongs_to' AND n.node_kind = 'collection' \
|
||||
LIMIT 1",
|
||||
)
|
||||
.bind(communication_id)
|
||||
.fetch_optional(db)
|
||||
.await
|
||||
.ok()
|
||||
.flatten();
|
||||
|
||||
if let Err(e) = sqlx::query(
|
||||
"INSERT INTO resource_usage_log \
|
||||
(target_node_id, triggered_by, collection_id, resource_type, detail) \
|
||||
VALUES ($1, $2, $3, $4, $5)",
|
||||
)
|
||||
.bind(communication_id)
|
||||
.bind(Some(sender_node_id))
|
||||
.bind(collection_id)
|
||||
.bind("ai")
|
||||
.bind(serde_json::json!({
|
||||
"model_level": "deep",
|
||||
"model_id": "claude-code-cli",
|
||||
"tokens_in": 0,
|
||||
"tokens_out": 0,
|
||||
"job_type": "agent_respond"
|
||||
}))
|
||||
.execute(db)
|
||||
.await
|
||||
{
|
||||
tracing::warn!(error = %e, "Kunne ikke logge AI-ressursforbruk");
|
||||
}
|
||||
|
||||
Ok(reply_id)
|
||||
}
|
||||
Loading…
Add table
Reference in a new issue