Implementer daemon/vaktmester-modus i synops-agent
synops-agent daemon: bakgrunnsprosess som poller PG for oppgaver og meldinger. Hovedfunksjoner: - Vaktmester-chat: finner/oppretter kommunikasjonsnode, poller nye meldinger, svarer via LLM - Prefix-kommandoer: /proposal, /task, /bug, /gjør - Modellvalg: /claude, /grok, /gemini, /lokal, /billig - Task-polling: plukker open tasks, kjører via agent-session - Kill switch: respekterer agent_identities.is_active - Heartbeat-fil + PID-fil for overvåking - SIGTERM/SIGINT-håndtering med graceful shutdown - Stale task-frigjøring ved hver poll-runde
This commit is contained in:
parent
9a1ca08d26
commit
450a07273a
3 changed files with 883 additions and 19 deletions
553
tools/synops-agent/src/daemon.rs
Normal file
553
tools/synops-agent/src/daemon.rs
Normal file
|
|
@ -0,0 +1,553 @@
|
|||
//! Daemon/vaktmester-modus for synops-agent.
|
||||
//!
|
||||
//! Kjører som bakgrunnsprosess:
|
||||
//! - Poller task-noder fra PG med konfigurerbart intervall
|
||||
//! - Mottar meldinger fra vaktmester-chat (kommunikasjonsnode)
|
||||
//! - Prefix-kommandoer: /proposal, /task, /bug, /gjør
|
||||
//! - Modellvalg: /claude, /grok, /gemini, /lokal, /billig (kun admin)
|
||||
//! - Spawn Claude Code for tunge oppgaver
|
||||
//! - Heartbeat-fil for overvåking
|
||||
//! - PID-fil + SIGTERM-håndtering
|
||||
|
||||
use chrono::Utc;
|
||||
use sqlx::PgPool;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::graph;
|
||||
use crate::provider::{ApiKeys, create_provider};
|
||||
|
||||
/// Daemon configuration.
|
||||
pub struct DaemonConfig {
|
||||
/// Poll interval in seconds.
|
||||
pub interval_secs: u64,
|
||||
/// Default model spec (e.g. "openrouter/anthropic/claude-sonnet-4").
|
||||
pub default_model: String,
|
||||
/// Working directory for agent sessions.
|
||||
pub working_dir: PathBuf,
|
||||
/// Path for heartbeat file.
|
||||
pub heartbeat_path: PathBuf,
|
||||
/// Path for PID file.
|
||||
pub pid_path: PathBuf,
|
||||
/// Max cost per task in USD.
|
||||
pub max_cost_per_task: Option<f64>,
|
||||
/// Max iterations per task.
|
||||
pub max_iterations: usize,
|
||||
/// Verbose logging.
|
||||
pub verbose: bool,
|
||||
}
|
||||
|
||||
impl Default for DaemonConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
interval_secs: 30,
|
||||
default_model: "openrouter/anthropic/claude-sonnet-4".into(),
|
||||
working_dir: PathBuf::from("/home/vegard/synops"),
|
||||
heartbeat_path: PathBuf::from("/tmp/synops-agent-daemon.heartbeat"),
|
||||
pid_path: PathBuf::from("/tmp/synops-agent-daemon.pid"),
|
||||
max_cost_per_task: Some(0.50),
|
||||
max_iterations: 50,
|
||||
verbose: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Run the daemon loop.
|
||||
pub async fn run(pool: PgPool, config: DaemonConfig) -> Result<(), Box<dyn std::error::Error>> {
|
||||
// Write PID file
|
||||
let pid = std::process::id();
|
||||
std::fs::write(&config.pid_path, pid.to_string())?;
|
||||
tracing::info!(pid, pid_file = %config.pid_path.display(), "PID-fil skrevet");
|
||||
|
||||
// Set up shutdown signal
|
||||
let shutdown = Arc::new(AtomicBool::new(false));
|
||||
let shutdown_clone = shutdown.clone();
|
||||
|
||||
// SIGTERM via tokio signal
|
||||
tokio::spawn(async move {
|
||||
let _ = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
|
||||
.expect("kunne ikke lytte på SIGTERM")
|
||||
.recv()
|
||||
.await;
|
||||
tracing::info!("SIGTERM mottatt — avslutter daemon");
|
||||
shutdown_clone.store(true, Ordering::SeqCst);
|
||||
});
|
||||
|
||||
// SIGINT (Ctrl+C)
|
||||
let shutdown_clone2 = shutdown.clone();
|
||||
tokio::spawn(async move {
|
||||
let _ = tokio::signal::ctrl_c().await;
|
||||
tracing::info!("SIGINT mottatt — avslutter daemon");
|
||||
shutdown_clone2.store(true, Ordering::SeqCst);
|
||||
});
|
||||
|
||||
// Find or create vaktmester chat
|
||||
let vaktmester_chat_id = graph::find_or_create_vaktmester_chat(&pool).await?;
|
||||
tracing::info!(chat_id = %vaktmester_chat_id, "Vaktmester-chat aktiv");
|
||||
|
||||
let mut last_poll = Utc::now();
|
||||
let api_keys = ApiKeys::from_env();
|
||||
|
||||
tracing::info!(
|
||||
interval = config.interval_secs,
|
||||
model = config.default_model.as_str(),
|
||||
"Daemon startet"
|
||||
);
|
||||
|
||||
write_heartbeat(&config.heartbeat_path);
|
||||
|
||||
loop {
|
||||
if shutdown.load(Ordering::SeqCst) {
|
||||
break;
|
||||
}
|
||||
|
||||
// Check kill switch
|
||||
match graph::is_agent_active(&pool).await {
|
||||
Ok(false) => {
|
||||
tracing::warn!("Agent deaktivert (kill switch) — venter");
|
||||
tokio::time::sleep(tokio::time::Duration::from_secs(config.interval_secs)).await;
|
||||
write_heartbeat(&config.heartbeat_path);
|
||||
continue;
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!(error = %e, "Kunne ikke sjekke kill switch");
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
|
||||
// 1. Poll vaktmester-chat for new messages
|
||||
match graph::poll_chat_messages(&pool, vaktmester_chat_id, last_poll).await {
|
||||
Ok(messages) if !messages.is_empty() => {
|
||||
tracing::info!(count = messages.len(), "Nye meldinger i vaktmester-chat");
|
||||
for msg in &messages {
|
||||
if let Err(e) = handle_chat_message(
|
||||
&pool,
|
||||
vaktmester_chat_id,
|
||||
msg,
|
||||
&config,
|
||||
&api_keys,
|
||||
)
|
||||
.await
|
||||
{
|
||||
tracing::error!(error = %e, msg_id = %msg.id, "Feil ved behandling av melding");
|
||||
let _ = graph::write_chat_reply(
|
||||
&pool,
|
||||
vaktmester_chat_id,
|
||||
&format!("Feil: {e}"),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
// Update last_poll to latest message time
|
||||
if let Some(latest) = messages.last() {
|
||||
last_poll = latest.created_at;
|
||||
}
|
||||
}
|
||||
Ok(_) => {} // no new messages
|
||||
Err(e) => {
|
||||
tracing::warn!(error = %e, "Chat-polling feilet");
|
||||
}
|
||||
}
|
||||
|
||||
// 2. Poll for open tasks
|
||||
match graph::pick_task(&pool).await {
|
||||
Ok(Some(task)) => {
|
||||
tracing::info!(
|
||||
task_id = %task.id,
|
||||
title = task.title.as_deref().unwrap_or("(uten tittel)"),
|
||||
"Plukket task"
|
||||
);
|
||||
run_task(&pool, &task, &config, &api_keys).await;
|
||||
}
|
||||
Ok(None) => {} // no tasks
|
||||
Err(e) => {
|
||||
tracing::warn!(error = %e, "Task-polling feilet");
|
||||
}
|
||||
}
|
||||
|
||||
// 3. Release stale tasks
|
||||
match graph::release_stale_tasks(&pool).await {
|
||||
Ok(0) => {}
|
||||
Ok(n) => tracing::warn!(count = n, "Frigjorde stale tasks"),
|
||||
Err(e) => tracing::warn!(error = %e, "Stale-task frigjøring feilet"),
|
||||
}
|
||||
|
||||
write_heartbeat(&config.heartbeat_path);
|
||||
|
||||
// Sleep with shutdown check
|
||||
for _ in 0..config.interval_secs {
|
||||
if shutdown.load(Ordering::SeqCst) {
|
||||
break;
|
||||
}
|
||||
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
|
||||
}
|
||||
}
|
||||
|
||||
// Cleanup
|
||||
tracing::info!("Daemon avslutter — rydder opp");
|
||||
let _ = std::fs::remove_file(&config.pid_path);
|
||||
let _ = std::fs::remove_file(&config.heartbeat_path);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Handle a single chat message: parse prefix commands or respond with LLM.
|
||||
async fn handle_chat_message(
|
||||
pool: &PgPool,
|
||||
chat_id: Uuid,
|
||||
msg: &graph::ChatMessage,
|
||||
config: &DaemonConfig,
|
||||
api_keys: &ApiKeys,
|
||||
) -> Result<(), Box<dyn std::error::Error>> {
|
||||
let text = msg.content.trim();
|
||||
tracing::info!(from = ?msg.created_by, "Melding: {}", truncate(text, 100));
|
||||
|
||||
// Parse prefix commands
|
||||
if let Some(rest) = text.strip_prefix("/proposal ") {
|
||||
return handle_proposal(pool, chat_id, rest.trim()).await;
|
||||
}
|
||||
if let Some(rest) = text.strip_prefix("/task ") {
|
||||
return handle_task_create(pool, chat_id, rest.trim()).await;
|
||||
}
|
||||
if let Some(rest) = text.strip_prefix("/bug ") {
|
||||
return handle_bug(pool, chat_id, rest.trim()).await;
|
||||
}
|
||||
if let Some(rest) = text.strip_prefix("/gjør ") {
|
||||
return handle_do(pool, chat_id, rest.trim(), &config.default_model, config, api_keys).await;
|
||||
}
|
||||
|
||||
// Model-selection commands (admin only — all messages in vaktmester chat are admin)
|
||||
if let Some(rest) = text.strip_prefix("/claude ") {
|
||||
return handle_claude_code(pool, chat_id, rest.trim(), config).await;
|
||||
}
|
||||
if let Some(rest) = text.strip_prefix("/grok ") {
|
||||
return handle_do(pool, chat_id, rest.trim(), "xai/grok-3", config, api_keys).await;
|
||||
}
|
||||
if let Some(rest) = text.strip_prefix("/gemini ") {
|
||||
return handle_do(pool, chat_id, rest.trim(), "gemini/gemini-2.5-flash", config, api_keys).await;
|
||||
}
|
||||
if let Some(rest) = text.strip_prefix("/lokal ") {
|
||||
return handle_do(pool, chat_id, rest.trim(), "ollama/llama3", config, api_keys).await;
|
||||
}
|
||||
if let Some(rest) = text.strip_prefix("/billig ") {
|
||||
return handle_do(pool, chat_id, rest.trim(), "gemini/gemini-2.5-flash", config, api_keys).await;
|
||||
}
|
||||
|
||||
// Free text — respond with default model
|
||||
handle_do(pool, chat_id, text, &config.default_model, config, api_keys).await
|
||||
}
|
||||
|
||||
/// /proposal — create a proposal node.
|
||||
async fn handle_proposal(
|
||||
pool: &PgPool,
|
||||
chat_id: Uuid,
|
||||
description: &str,
|
||||
) -> Result<(), Box<dyn std::error::Error>> {
|
||||
// Use first line as title, rest as body
|
||||
let (title, body) = split_title_body(description);
|
||||
let id = graph::create_proposal(pool, &title, &body).await?;
|
||||
let reply = format!("Proposal opprettet: {} ({})", title, short_id(id));
|
||||
graph::write_chat_reply(pool, chat_id, &reply).await?;
|
||||
tracing::info!(id = %id, title = title.as_str(), "Proposal opprettet");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// /task — create a task node.
|
||||
async fn handle_task_create(
|
||||
pool: &PgPool,
|
||||
chat_id: Uuid,
|
||||
description: &str,
|
||||
) -> Result<(), Box<dyn std::error::Error>> {
|
||||
let (title, body) = split_title_body(description);
|
||||
let id = graph::create_task(pool, &title, &body, 5).await?;
|
||||
let reply = format!("Task opprettet: {} ({})", title, short_id(id));
|
||||
graph::write_chat_reply(pool, chat_id, &reply).await?;
|
||||
tracing::info!(id = %id, title = title.as_str(), "Task opprettet");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// /bug — add to fikseliste (as task with high priority).
|
||||
async fn handle_bug(
|
||||
pool: &PgPool,
|
||||
chat_id: Uuid,
|
||||
description: &str,
|
||||
) -> Result<(), Box<dyn std::error::Error>> {
|
||||
let (title, body) = split_title_body(description);
|
||||
let title = format!("[BUG] {}", title);
|
||||
let id = graph::create_task(pool, &title, &body, 2).await?;
|
||||
let reply = format!("Bug registrert: {} ({})", title, short_id(id));
|
||||
graph::write_chat_reply(pool, chat_id, &reply).await?;
|
||||
tracing::info!(id = %id, "Bug registrert");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// /gjør or free text — run with an LLM.
|
||||
async fn handle_do(
|
||||
pool: &PgPool,
|
||||
chat_id: Uuid,
|
||||
task: &str,
|
||||
model: &str,
|
||||
config: &DaemonConfig,
|
||||
api_keys: &ApiKeys,
|
||||
) -> Result<(), Box<dyn std::error::Error>> {
|
||||
let reply_start = format!("Kjører med {} ...", short_model(model));
|
||||
graph::write_chat_reply(pool, chat_id, &reply_start).await?;
|
||||
|
||||
// Create a one-off agent session for this task
|
||||
let provider = create_provider(model, api_keys, None)?;
|
||||
|
||||
let system_prompt = crate::build_system_prompt(None, &config.working_dir).await;
|
||||
let messages = vec![
|
||||
crate::provider::Message {
|
||||
role: "system".into(),
|
||||
content: Some(system_prompt),
|
||||
tool_calls: None,
|
||||
tool_call_id: None,
|
||||
},
|
||||
crate::provider::Message {
|
||||
role: "user".into(),
|
||||
content: Some(task.to_string()),
|
||||
tool_calls: None,
|
||||
tool_call_id: None,
|
||||
},
|
||||
];
|
||||
|
||||
let compaction_config = crate::context::CompactionConfig {
|
||||
context_window: provider.context_window(),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let mut session = crate::AgentSession {
|
||||
provider,
|
||||
messages,
|
||||
tool_defs: crate::tools::tool_definitions(),
|
||||
total_usage: std::collections::HashMap::new(),
|
||||
total_cost: 0.0,
|
||||
total_iterations: 0,
|
||||
retry_config: crate::provider::RetryConfig {
|
||||
max_retries: 3,
|
||||
..Default::default()
|
||||
},
|
||||
compaction_config,
|
||||
working_dir: config.working_dir.clone(),
|
||||
max_iterations: config.max_iterations,
|
||||
max_cost: config.max_cost_per_task,
|
||||
verbose: config.verbose,
|
||||
interrupted: Arc::new(AtomicBool::new(false)),
|
||||
pg_pool: Some(pool.clone()),
|
||||
};
|
||||
|
||||
let result = session.run_turn().await;
|
||||
|
||||
// Extract the last assistant text as the reply
|
||||
let response_text = session
|
||||
.messages
|
||||
.iter()
|
||||
.rev()
|
||||
.find(|m| m.role == "assistant" && m.content.is_some())
|
||||
.and_then(|m| m.content.clone())
|
||||
.unwrap_or_else(|| "(ingen respons)".into());
|
||||
|
||||
let cost_info = format!(" (${:.4})", session.total_cost);
|
||||
|
||||
match result {
|
||||
Ok(crate::TurnResult::Done) => {
|
||||
let reply = format!("{}\n\n_Ferdig{}_", response_text, cost_info);
|
||||
graph::write_chat_reply(pool, chat_id, &reply).await?;
|
||||
}
|
||||
Ok(crate::TurnResult::BudgetExhausted) => {
|
||||
let reply = format!(
|
||||
"{}\n\n_Budsjettgrense nådd{}. Oppgaven er ikke fullført._",
|
||||
response_text, cost_info
|
||||
);
|
||||
graph::write_chat_reply(pool, chat_id, &reply).await?;
|
||||
}
|
||||
Ok(crate::TurnResult::MaxIterations) => {
|
||||
let reply = format!(
|
||||
"{}\n\n_Maks iterasjoner nådd{}._",
|
||||
response_text, cost_info
|
||||
);
|
||||
graph::write_chat_reply(pool, chat_id, &reply).await?;
|
||||
}
|
||||
Ok(crate::TurnResult::Interrupted) => {
|
||||
graph::write_chat_reply(pool, chat_id, "Avbrutt.").await?;
|
||||
}
|
||||
Err(e) => {
|
||||
let reply = format!("Feil: {}", e);
|
||||
graph::write_chat_reply(pool, chat_id, &reply).await?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// /claude — spawn Claude Code for heavy tasks.
|
||||
async fn handle_claude_code(
|
||||
pool: &PgPool,
|
||||
chat_id: Uuid,
|
||||
task: &str,
|
||||
config: &DaemonConfig,
|
||||
) -> Result<(), Box<dyn std::error::Error>> {
|
||||
graph::write_chat_reply(pool, chat_id, "Spawner Claude Code ...").await?;
|
||||
|
||||
match crate::spawn_claude_code(task, &config.working_dir).await {
|
||||
Ok(()) => {
|
||||
graph::write_chat_reply(pool, chat_id, "Claude Code ferdig.").await?;
|
||||
}
|
||||
Err(e) => {
|
||||
graph::write_chat_reply(pool, chat_id, &format!("Claude Code feilet: {e}")).await?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Run a picked task through an agent session.
|
||||
async fn run_task(
|
||||
pool: &PgPool,
|
||||
task: &graph::TaskInfo,
|
||||
config: &DaemonConfig,
|
||||
api_keys: &ApiKeys,
|
||||
) {
|
||||
let task_desc = task
|
||||
.content
|
||||
.as_deref()
|
||||
.or(task.title.as_deref())
|
||||
.unwrap_or("(ingen beskrivelse)");
|
||||
|
||||
// Report start
|
||||
let _ = graph::write_task_message(pool, task.id, &format!("Starter: {}", truncate(task_desc, 200))).await;
|
||||
|
||||
let provider = match create_provider(&config.default_model, api_keys, None) {
|
||||
Ok(p) => p,
|
||||
Err(e) => {
|
||||
tracing::error!(error = %e, "Kunne ikke opprette provider for task");
|
||||
let _ = graph::update_task_status(pool, task.id, "failed", Some(&format!("Provider-feil: {e}"))).await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let system_prompt = crate::build_system_prompt(None, &config.working_dir).await;
|
||||
let messages = vec![
|
||||
crate::provider::Message {
|
||||
role: "system".into(),
|
||||
content: Some(system_prompt),
|
||||
tool_calls: None,
|
||||
tool_call_id: None,
|
||||
},
|
||||
crate::provider::Message {
|
||||
role: "user".into(),
|
||||
content: Some(task_desc.to_string()),
|
||||
tool_calls: None,
|
||||
tool_call_id: None,
|
||||
},
|
||||
];
|
||||
|
||||
let compaction_config = crate::context::CompactionConfig {
|
||||
context_window: provider.context_window(),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let mut session = crate::AgentSession {
|
||||
provider,
|
||||
messages,
|
||||
tool_defs: crate::tools::tool_definitions(),
|
||||
total_usage: std::collections::HashMap::new(),
|
||||
total_cost: 0.0,
|
||||
total_iterations: 0,
|
||||
retry_config: crate::provider::RetryConfig {
|
||||
max_retries: 3,
|
||||
..Default::default()
|
||||
},
|
||||
compaction_config,
|
||||
working_dir: config.working_dir.clone(),
|
||||
max_iterations: config.max_iterations,
|
||||
max_cost: config.max_cost_per_task,
|
||||
verbose: config.verbose,
|
||||
interrupted: Arc::new(AtomicBool::new(false)),
|
||||
pg_pool: Some(pool.clone()),
|
||||
};
|
||||
|
||||
let result = session.run_turn().await;
|
||||
|
||||
let summary = session
|
||||
.messages
|
||||
.iter()
|
||||
.rev()
|
||||
.find(|m| m.role == "assistant" && m.content.is_some())
|
||||
.and_then(|m| m.content.clone())
|
||||
.unwrap_or_default();
|
||||
|
||||
let cost_str = format!("${:.4}", session.total_cost);
|
||||
|
||||
match result {
|
||||
Ok(crate::TurnResult::Done) => {
|
||||
let result_msg = format!("Ferdig ({}). {}", cost_str, truncate(&summary, 500));
|
||||
let _ = graph::update_task_status(pool, task.id, "done", Some(&result_msg)).await;
|
||||
let _ = graph::write_task_message(pool, task.id, &result_msg).await;
|
||||
tracing::info!(task_id = %task.id, cost = cost_str.as_str(), "Task ferdig");
|
||||
}
|
||||
Ok(crate::TurnResult::BudgetExhausted) => {
|
||||
let result_msg = format!("Budsjett brukt opp ({}). Delvis ferdig.", cost_str);
|
||||
let _ = graph::update_task_status(pool, task.id, "failed", Some(&result_msg)).await;
|
||||
let _ = graph::write_task_message(pool, task.id, &result_msg).await;
|
||||
tracing::warn!(task_id = %task.id, "Task: budsjett oppbrukt");
|
||||
}
|
||||
Ok(crate::TurnResult::MaxIterations) => {
|
||||
let result_msg = format!("Maks iterasjoner nådd ({}). Delvis ferdig.", cost_str);
|
||||
let _ = graph::update_task_status(pool, task.id, "failed", Some(&result_msg)).await;
|
||||
let _ = graph::write_task_message(pool, task.id, &result_msg).await;
|
||||
}
|
||||
Ok(crate::TurnResult::Interrupted) => {
|
||||
let _ = graph::update_task_status(pool, task.id, "failed", Some("Avbrutt")).await;
|
||||
}
|
||||
Err(e) => {
|
||||
let result_msg = format!("Feil: {}", e);
|
||||
let _ = graph::update_task_status(pool, task.id, "failed", Some(&result_msg)).await;
|
||||
let _ = graph::write_task_message(pool, task.id, &result_msg).await;
|
||||
tracing::error!(task_id = %task.id, error = %e, "Task feilet");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Write heartbeat timestamp to file.
|
||||
fn write_heartbeat(path: &Path) {
|
||||
let _ = std::fs::write(path, Utc::now().to_rfc3339());
|
||||
}
|
||||
|
||||
/// Split "first line\nrest" into (title, body).
|
||||
fn split_title_body(text: &str) -> (String, String) {
|
||||
if let Some(newline) = text.find('\n') {
|
||||
let title = text[..newline].trim().to_string();
|
||||
let body = text[newline + 1..].trim().to_string();
|
||||
(title, body)
|
||||
} else {
|
||||
(text.trim().to_string(), String::new())
|
||||
}
|
||||
}
|
||||
|
||||
/// Short model name for display.
|
||||
fn short_model(model: &str) -> &str {
|
||||
model.rsplit('/').next().unwrap_or(model)
|
||||
}
|
||||
|
||||
/// Truncate string for display.
|
||||
fn truncate(s: &str, max: usize) -> &str {
|
||||
if s.len() > max {
|
||||
// Find char boundary
|
||||
let mut end = max;
|
||||
while end > 0 && !s.is_char_boundary(end) {
|
||||
end -= 1;
|
||||
}
|
||||
&s[..end]
|
||||
} else {
|
||||
s
|
||||
}
|
||||
}
|
||||
|
||||
/// Short UUID for display (first 8 chars).
|
||||
fn short_id(id: Uuid) -> String {
|
||||
id.to_string()[..8].to_string()
|
||||
}
|
||||
|
|
@ -375,6 +375,245 @@ async fn find_or_create_discussion(pool: &PgPool, task_id: Uuid) -> Result<Uuid,
|
|||
Ok(disc_id)
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Daemon: chat polling and node creation
|
||||
// ============================================================================
|
||||
|
||||
/// Find or create the vaktmester communication node.
|
||||
///
|
||||
/// Looks for a communication node titled "Vaktmester" owned by the agent.
|
||||
/// If none exists, creates one and makes the agent a member.
|
||||
pub async fn find_or_create_vaktmester_chat(pool: &PgPool) -> Result<Uuid, String> {
|
||||
let agent_id = Uuid::parse_str(AGENT_NODE_ID).unwrap();
|
||||
|
||||
// Look for existing vaktmester chat
|
||||
let existing = sqlx::query_scalar::<_, Uuid>(
|
||||
r#"
|
||||
SELECT n.id FROM nodes n
|
||||
JOIN edges e ON e.target_id = n.id AND e.source_id = $1 AND e.edge_type = 'member_of'
|
||||
WHERE n.node_kind = 'communication'
|
||||
AND n.metadata->>'type' = 'vaktmester'
|
||||
LIMIT 1
|
||||
"#,
|
||||
)
|
||||
.bind(agent_id)
|
||||
.fetch_optional(pool)
|
||||
.await
|
||||
.map_err(|e| format!("find vaktmester chat: {e}"))?;
|
||||
|
||||
if let Some(id) = existing {
|
||||
return Ok(id);
|
||||
}
|
||||
|
||||
// Create vaktmester communication node
|
||||
let chat_id = Uuid::now_v7();
|
||||
sqlx::query(
|
||||
r#"
|
||||
INSERT INTO nodes (id, node_kind, title, visibility, metadata, created_by)
|
||||
VALUES ($1, 'communication', 'Vaktmester', 'discoverable',
|
||||
'{"type":"vaktmester"}', $2)
|
||||
"#,
|
||||
)
|
||||
.bind(chat_id)
|
||||
.bind(agent_id)
|
||||
.execute(pool)
|
||||
.await
|
||||
.map_err(|e| format!("create vaktmester node: {e}"))?;
|
||||
|
||||
// Agent is member_of
|
||||
sqlx::query(
|
||||
r#"
|
||||
INSERT INTO edges (source_id, target_id, edge_type, system, created_by)
|
||||
VALUES ($1, $2, 'member_of', true, $1)
|
||||
"#,
|
||||
)
|
||||
.bind(agent_id)
|
||||
.bind(chat_id)
|
||||
.execute(pool)
|
||||
.await
|
||||
.map_err(|e| format!("agent join vaktmester: {e}"))?;
|
||||
|
||||
tracing::info!(chat_id = %chat_id, "Opprettet vaktmester-chat");
|
||||
Ok(chat_id)
|
||||
}
|
||||
|
||||
/// A chat message from the vaktmester chat.
|
||||
#[derive(Debug)]
|
||||
pub struct ChatMessage {
|
||||
pub id: Uuid,
|
||||
pub content: String,
|
||||
pub created_by: Option<Uuid>,
|
||||
pub created_at: DateTime<Utc>,
|
||||
}
|
||||
|
||||
#[derive(sqlx::FromRow)]
|
||||
struct ChatMessageRow {
|
||||
id: Uuid,
|
||||
content: Option<String>,
|
||||
created_by: Option<Uuid>,
|
||||
created_at: DateTime<Utc>,
|
||||
}
|
||||
|
||||
/// Poll for new messages in a communication node since a given timestamp.
|
||||
///
|
||||
/// Returns messages not sent by the agent itself, ordered by time.
|
||||
pub async fn poll_chat_messages(
|
||||
pool: &PgPool,
|
||||
chat_id: Uuid,
|
||||
since: DateTime<Utc>,
|
||||
) -> Result<Vec<ChatMessage>, String> {
|
||||
let agent_id = Uuid::parse_str(AGENT_NODE_ID).unwrap();
|
||||
|
||||
let rows = sqlx::query_as::<_, ChatMessageRow>(
|
||||
r#"
|
||||
SELECT n.id, n.content, n.created_by, n.created_at
|
||||
FROM nodes n
|
||||
JOIN edges e ON e.source_id = n.id AND e.target_id = $1 AND e.edge_type = 'belongs_to'
|
||||
WHERE n.node_kind = 'message'
|
||||
AND n.created_at > $2
|
||||
AND (n.created_by IS NULL OR n.created_by != $3)
|
||||
ORDER BY n.created_at ASC
|
||||
"#,
|
||||
)
|
||||
.bind(chat_id)
|
||||
.bind(since)
|
||||
.bind(agent_id)
|
||||
.fetch_all(pool)
|
||||
.await
|
||||
.map_err(|e| format!("poll_chat_messages: {e}"))?;
|
||||
|
||||
Ok(rows
|
||||
.into_iter()
|
||||
.filter_map(|r| {
|
||||
Some(ChatMessage {
|
||||
id: r.id,
|
||||
content: r.content?,
|
||||
created_by: r.created_by,
|
||||
created_at: r.created_at,
|
||||
})
|
||||
})
|
||||
.collect())
|
||||
}
|
||||
|
||||
/// Write a reply message in a communication node.
|
||||
pub async fn write_chat_reply(
|
||||
pool: &PgPool,
|
||||
chat_id: Uuid,
|
||||
message: &str,
|
||||
) -> Result<Uuid, String> {
|
||||
let agent_id = Uuid::parse_str(AGENT_NODE_ID).unwrap();
|
||||
let msg_id = Uuid::now_v7();
|
||||
|
||||
sqlx::query(
|
||||
r#"
|
||||
INSERT INTO nodes (id, node_kind, content, visibility, metadata, created_by)
|
||||
VALUES ($1, 'message', $2, 'hidden', '{}', $3)
|
||||
"#,
|
||||
)
|
||||
.bind(msg_id)
|
||||
.bind(message)
|
||||
.bind(agent_id)
|
||||
.execute(pool)
|
||||
.await
|
||||
.map_err(|e| format!("write_chat_reply (node): {e}"))?;
|
||||
|
||||
sqlx::query(
|
||||
r#"
|
||||
INSERT INTO edges (source_id, target_id, edge_type, system, created_by)
|
||||
VALUES ($1, $2, 'belongs_to', true, $3)
|
||||
"#,
|
||||
)
|
||||
.bind(msg_id)
|
||||
.bind(chat_id)
|
||||
.bind(agent_id)
|
||||
.execute(pool)
|
||||
.await
|
||||
.map_err(|e| format!("write_chat_reply (edge): {e}"))?;
|
||||
|
||||
// Notify frontend via PG NOTIFY
|
||||
let _ = sqlx::query("SELECT pg_notify('node_changes', $1)")
|
||||
.bind(serde_json::json!({
|
||||
"type": "message",
|
||||
"node_id": msg_id.to_string(),
|
||||
"communication_id": chat_id.to_string(),
|
||||
}).to_string())
|
||||
.execute(pool)
|
||||
.await;
|
||||
|
||||
Ok(msg_id)
|
||||
}
|
||||
|
||||
/// Create a proposal node.
|
||||
pub async fn create_proposal(
|
||||
pool: &PgPool,
|
||||
title: &str,
|
||||
description: &str,
|
||||
) -> Result<Uuid, String> {
|
||||
let agent_id = Uuid::parse_str(AGENT_NODE_ID).unwrap();
|
||||
let id = Uuid::now_v7();
|
||||
|
||||
sqlx::query(
|
||||
r#"
|
||||
INSERT INTO nodes (id, node_kind, title, content, visibility, metadata, created_by)
|
||||
VALUES ($1, 'proposal', $2, $3, 'internal',
|
||||
'{"status":"draft"}', $4)
|
||||
"#,
|
||||
)
|
||||
.bind(id)
|
||||
.bind(title)
|
||||
.bind(description)
|
||||
.bind(agent_id)
|
||||
.execute(pool)
|
||||
.await
|
||||
.map_err(|e| format!("create_proposal: {e}"))?;
|
||||
|
||||
Ok(id)
|
||||
}
|
||||
|
||||
/// Create a task node.
|
||||
pub async fn create_task(
|
||||
pool: &PgPool,
|
||||
title: &str,
|
||||
description: &str,
|
||||
priority: i32,
|
||||
) -> Result<Uuid, String> {
|
||||
let agent_id = Uuid::parse_str(AGENT_NODE_ID).unwrap();
|
||||
let id = Uuid::now_v7();
|
||||
|
||||
sqlx::query(
|
||||
r#"
|
||||
INSERT INTO nodes (id, node_kind, title, content, visibility, metadata, created_by)
|
||||
VALUES ($1, 'task', $2, $3, 'internal',
|
||||
jsonb_build_object('status', 'open', 'priority', $4), $5)
|
||||
"#,
|
||||
)
|
||||
.bind(id)
|
||||
.bind(title)
|
||||
.bind(description)
|
||||
.bind(priority)
|
||||
.bind(agent_id)
|
||||
.execute(pool)
|
||||
.await
|
||||
.map_err(|e| format!("create_task: {e}"))?;
|
||||
|
||||
Ok(id)
|
||||
}
|
||||
|
||||
/// Check if the agent is active (kill switch).
|
||||
pub async fn is_agent_active(pool: &PgPool) -> Result<bool, String> {
|
||||
let active: Option<bool> = sqlx::query_scalar(
|
||||
r#"
|
||||
SELECT is_active FROM agent_identities
|
||||
WHERE agent_key = 'claude-main'
|
||||
"#,
|
||||
)
|
||||
.fetch_optional(pool)
|
||||
.await
|
||||
.map_err(|e| format!("is_agent_active: {e}"))?;
|
||||
|
||||
Ok(active.unwrap_or(true))
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Types
|
||||
// ============================================================================
|
||||
|
|
|
|||
|
|
@ -9,6 +9,7 @@
|
|||
//! synops-agent -i --model gemini/gemini-2.5-flash
|
||||
|
||||
mod context;
|
||||
mod daemon;
|
||||
mod git;
|
||||
mod graph;
|
||||
mod provider;
|
||||
|
|
@ -29,6 +30,9 @@ use std::sync::atomic::{AtomicBool, Ordering};
|
|||
#[derive(Parser)]
|
||||
#[command(name = "synops-agent", about = "Modell-agnostisk agent-runtime for Synops")]
|
||||
struct Cli {
|
||||
#[command(subcommand)]
|
||||
command: Option<Command>,
|
||||
|
||||
/// Modell: "provider/model" (f.eks. "openrouter/anthropic/claude-sonnet-4")
|
||||
#[arg(short, long, default_value = "openrouter/anthropic/claude-sonnet-4")]
|
||||
model: String,
|
||||
|
|
@ -102,24 +106,62 @@ struct Cli {
|
|||
branch: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(clap::Subcommand)]
|
||||
enum Command {
|
||||
/// Kjør som daemon/vaktmester (bakgrunnsprosess)
|
||||
Daemon {
|
||||
/// Poll-intervall i sekunder (default: 30)
|
||||
#[arg(long, default_value = "30")]
|
||||
interval: u64,
|
||||
|
||||
/// Modell for oppgaver (default: openrouter/anthropic/claude-sonnet-4)
|
||||
#[arg(long, default_value = "openrouter/anthropic/claude-sonnet-4")]
|
||||
model: String,
|
||||
|
||||
/// Arbeidsmappe (default: /home/vegard/synops)
|
||||
#[arg(short = 'd', long, default_value = "/home/vegard/synops")]
|
||||
working_dir: PathBuf,
|
||||
|
||||
/// Sti til heartbeat-fil
|
||||
#[arg(long, default_value = "/tmp/synops-agent-daemon.heartbeat")]
|
||||
heartbeat: PathBuf,
|
||||
|
||||
/// Sti til PID-fil
|
||||
#[arg(long, default_value = "/tmp/synops-agent-daemon.pid")]
|
||||
pid_file: PathBuf,
|
||||
|
||||
/// Maks kostnad per oppgave i USD
|
||||
#[arg(long, default_value = "0.50")]
|
||||
max_cost_per_task: f64,
|
||||
|
||||
/// Maks iterasjoner per oppgave
|
||||
#[arg(long, default_value = "50")]
|
||||
max_iterations: usize,
|
||||
|
||||
/// Vis detaljert logging
|
||||
#[arg(long)]
|
||||
verbose: bool,
|
||||
},
|
||||
}
|
||||
|
||||
/// Shared state for the agent session.
|
||||
struct AgentSession {
|
||||
provider: Box<dyn LlmProvider>,
|
||||
messages: Vec<Message>,
|
||||
tool_defs: Vec<provider::ToolDef>,
|
||||
total_usage: HashMap<String, TokenUsage>,
|
||||
total_cost: f64,
|
||||
total_iterations: usize,
|
||||
retry_config: RetryConfig,
|
||||
compaction_config: CompactionConfig,
|
||||
working_dir: PathBuf,
|
||||
max_iterations: usize,
|
||||
max_cost: Option<f64>,
|
||||
verbose: bool,
|
||||
pub(crate) struct AgentSession {
|
||||
pub(crate) provider: Box<dyn LlmProvider>,
|
||||
pub(crate) messages: Vec<Message>,
|
||||
pub(crate) tool_defs: Vec<provider::ToolDef>,
|
||||
pub(crate) total_usage: HashMap<String, TokenUsage>,
|
||||
pub(crate) total_cost: f64,
|
||||
pub(crate) total_iterations: usize,
|
||||
pub(crate) retry_config: RetryConfig,
|
||||
pub(crate) compaction_config: CompactionConfig,
|
||||
pub(crate) working_dir: PathBuf,
|
||||
pub(crate) max_iterations: usize,
|
||||
pub(crate) max_cost: Option<f64>,
|
||||
pub(crate) verbose: bool,
|
||||
/// Set to true by Ctrl+C handler during a turn; checked by tool execution.
|
||||
interrupted: Arc<AtomicBool>,
|
||||
pub(crate) interrupted: Arc<AtomicBool>,
|
||||
/// Optional PG pool for graph integration (synops_query tool, task picking).
|
||||
pg_pool: Option<PgPool>,
|
||||
pub(crate) pg_pool: Option<PgPool>,
|
||||
}
|
||||
|
||||
/// Whether to use plan mode for a task.
|
||||
|
|
@ -132,7 +174,7 @@ enum PlanDecision {
|
|||
}
|
||||
|
||||
/// Result of running one turn of the agent loop.
|
||||
enum TurnResult {
|
||||
pub(crate) enum TurnResult {
|
||||
/// Agent finished (no more tool calls).
|
||||
Done,
|
||||
/// Budget exhausted.
|
||||
|
|
@ -145,7 +187,7 @@ enum TurnResult {
|
|||
|
||||
impl AgentSession {
|
||||
/// Run the agent loop for the current messages until the model stops calling tools.
|
||||
async fn run_turn(&mut self) -> Result<TurnResult, Box<dyn std::error::Error>> {
|
||||
pub(crate) async fn run_turn(&mut self) -> Result<TurnResult, Box<dyn std::error::Error>> {
|
||||
let mut iteration = 0;
|
||||
self.interrupted.store(false, Ordering::SeqCst);
|
||||
|
||||
|
|
@ -482,6 +524,36 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|||
|
||||
let cli = Cli::parse();
|
||||
|
||||
// Handle daemon subcommand
|
||||
if let Some(Command::Daemon {
|
||||
interval,
|
||||
model,
|
||||
working_dir,
|
||||
heartbeat,
|
||||
pid_file,
|
||||
max_cost_per_task,
|
||||
max_iterations,
|
||||
verbose,
|
||||
}) = cli.command
|
||||
{
|
||||
let pool = synops_common::db::connect()
|
||||
.await
|
||||
.map_err(|e| format!("PG-tilkobling påkrevd for daemon: {e}"))?;
|
||||
|
||||
let config = daemon::DaemonConfig {
|
||||
interval_secs: interval,
|
||||
default_model: model,
|
||||
working_dir,
|
||||
heartbeat_path: heartbeat,
|
||||
pid_path: pid_file,
|
||||
max_cost_per_task: Some(max_cost_per_task),
|
||||
max_iterations,
|
||||
verbose,
|
||||
};
|
||||
|
||||
return daemon::run(pool, config).await.map_err(Into::into);
|
||||
}
|
||||
|
||||
// Validate: need either --task or --interactive
|
||||
if cli.task.is_none() && !cli.interactive {
|
||||
eprintln!("Feil: Enten --task eller --interactive er påkrevd.");
|
||||
|
|
@ -877,7 +949,7 @@ fn dirs_history_path() -> PathBuf {
|
|||
}
|
||||
|
||||
/// Build system prompt with context about the project and git state.
|
||||
async fn build_system_prompt(custom: Option<&str>, working_dir: &Path) -> String {
|
||||
pub(crate) async fn build_system_prompt(custom: Option<&str>, working_dir: &Path) -> String {
|
||||
let mut prompt = String::new();
|
||||
|
||||
prompt.push_str("Du er synops-agent, en autonom utviklerassistent for Synops-plattformen.\n");
|
||||
|
|
@ -925,7 +997,7 @@ async fn build_system_prompt(custom: Option<&str>, working_dir: &Path) -> String
|
|||
}
|
||||
|
||||
/// Spawn Claude Code for heavy tasks (uses paid subscription).
|
||||
async fn spawn_claude_code(
|
||||
pub(crate) async fn spawn_claude_code(
|
||||
task: &str,
|
||||
working_dir: &Path,
|
||||
) -> Result<(), Box<dyn std::error::Error>> {
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue