Implementer checkpoint og recovery i synops-agent
- Ny checkpoint-modul: lagrer sesjonsstatus (meldinger, tokens, oppgave) til JSON - --resume flagg for å gjenoppta etter krasj (sesjons-ID eller "latest") - --checkpoint-interval for å styre hvor ofte mellomtilstand lagres - Kostnadslogging til ai_usage_log i PG ved sesjonsslutt - Sesjonsrapport: modell, varighet, tokens, kostnad, filer endret - Integrert i agent-loop (periodisk checkpoint), batch-modus og daemon - Automatisk opprydding av gamle checkpoints (beholder siste 20)
This commit is contained in:
parent
dae4e0f3e2
commit
202682e2e0
4 changed files with 699 additions and 4 deletions
425
tools/synops-agent/src/checkpoint.rs
Normal file
425
tools/synops-agent/src/checkpoint.rs
Normal file
|
|
@ -0,0 +1,425 @@
|
||||||
|
//! Checkpoint og recovery — lagre og gjenoppta agent-sesjoner.
|
||||||
|
//!
|
||||||
|
//! Skriver mellomtilstand til JSON-fil slik at en krasjet sesjon
|
||||||
|
//! kan gjenopptas med `--resume`. Inkluderer:
|
||||||
|
//! - Meldingshistorikk
|
||||||
|
//! - Token-forbruk per modell
|
||||||
|
//! - Pågående oppgave
|
||||||
|
//! - Tidsstempel og sesjons-ID
|
||||||
|
//!
|
||||||
|
//! Checkpoint skrives periodisk (etter hvert N-te tool-kall) og
|
||||||
|
//! ved sesjonsslutt. Ved `--resume` lastes siste checkpoint og
|
||||||
|
//! agenten fortsetter der den slapp.
|
||||||
|
|
||||||
|
use chrono::{DateTime, Utc};
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use std::collections::HashMap;
|
||||||
|
use std::path::{Path, PathBuf};
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
use crate::provider::{Message, TokenUsage};
|
||||||
|
|
||||||
|
/// Serialiserbar sesjonsstatus for checkpoint.
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub struct Checkpoint {
|
||||||
|
/// Unik sesjons-ID.
|
||||||
|
pub session_id: String,
|
||||||
|
/// Modellspesifikasjon brukt.
|
||||||
|
pub model_spec: String,
|
||||||
|
/// Meldingshistorikk.
|
||||||
|
pub messages: Vec<Message>,
|
||||||
|
/// Token-forbruk per modell.
|
||||||
|
pub usage: HashMap<String, TokenUsageData>,
|
||||||
|
/// Total kostnad i USD.
|
||||||
|
pub total_cost: f64,
|
||||||
|
/// Antall iterasjoner utført.
|
||||||
|
pub total_iterations: usize,
|
||||||
|
/// Pågående oppgave (task-tekst).
|
||||||
|
pub task: Option<String>,
|
||||||
|
/// Task-node-ID (hvis hentet fra grafen).
|
||||||
|
pub task_node_id: Option<String>,
|
||||||
|
/// Arbeidsmappe.
|
||||||
|
pub working_dir: String,
|
||||||
|
/// Filer endret i sesjonen (fra git diff).
|
||||||
|
pub files_changed: Vec<String>,
|
||||||
|
/// Tidsstempel for checkpoint.
|
||||||
|
pub timestamp: DateTime<Utc>,
|
||||||
|
/// Tidsstempel for sesjonsstart.
|
||||||
|
pub started_at: DateTime<Utc>,
|
||||||
|
/// Status: "running", "completed", "failed", "interrupted".
|
||||||
|
pub status: String,
|
||||||
|
/// Avsluttende oppsummering (fylles ved sesjonsslutt).
|
||||||
|
pub summary: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Token-forbruk (serialiserbar versjon).
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
|
||||||
|
pub struct TokenUsageData {
|
||||||
|
pub input_tokens: u64,
|
||||||
|
pub output_tokens: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<&TokenUsage> for TokenUsageData {
|
||||||
|
fn from(u: &TokenUsage) -> Self {
|
||||||
|
Self {
|
||||||
|
input_tokens: u.input_tokens,
|
||||||
|
output_tokens: u.output_tokens,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<&TokenUsageData> for TokenUsage {
|
||||||
|
fn from(d: &TokenUsageData) -> Self {
|
||||||
|
Self {
|
||||||
|
input_tokens: d.input_tokens,
|
||||||
|
output_tokens: d.output_tokens,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Standard checkpoint-mappe.
|
||||||
|
pub fn checkpoint_dir() -> PathBuf {
|
||||||
|
let home = std::env::var("HOME").unwrap_or_else(|_| ".".into());
|
||||||
|
PathBuf::from(home).join(".synops").join("checkpoints")
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Filsti for en gitt sesjons-ID.
|
||||||
|
pub fn checkpoint_path(session_id: &str) -> PathBuf {
|
||||||
|
checkpoint_dir().join(format!("{}.json", session_id))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Sti til "latest" symlink/fil som peker på siste checkpoint.
|
||||||
|
fn latest_path() -> PathBuf {
|
||||||
|
checkpoint_dir().join("latest")
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Generer en unik sesjons-ID.
|
||||||
|
pub fn new_session_id() -> String {
|
||||||
|
let now = Utc::now();
|
||||||
|
format!(
|
||||||
|
"{}-{}",
|
||||||
|
now.format("%Y%m%d-%H%M%S"),
|
||||||
|
&Uuid::now_v7().to_string()[..8]
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Lagre checkpoint til disk.
|
||||||
|
pub fn save(checkpoint: &Checkpoint) -> Result<(), String> {
|
||||||
|
let dir = checkpoint_dir();
|
||||||
|
std::fs::create_dir_all(&dir)
|
||||||
|
.map_err(|e| format!("Kunne ikke opprette checkpoint-mappe: {e}"))?;
|
||||||
|
|
||||||
|
let path = checkpoint_path(&checkpoint.session_id);
|
||||||
|
let json = serde_json::to_string_pretty(checkpoint)
|
||||||
|
.map_err(|e| format!("Serialisering feilet: {e}"))?;
|
||||||
|
|
||||||
|
// Atomisk skriving via temp-fil
|
||||||
|
let tmp = path.with_extension("tmp");
|
||||||
|
std::fs::write(&tmp, &json)
|
||||||
|
.map_err(|e| format!("Skriving feilet: {e}"))?;
|
||||||
|
std::fs::rename(&tmp, &path)
|
||||||
|
.map_err(|e| format!("Rename feilet: {e}"))?;
|
||||||
|
|
||||||
|
// Oppdater "latest"-peker
|
||||||
|
let latest = latest_path();
|
||||||
|
let _ = std::fs::write(&latest, &checkpoint.session_id);
|
||||||
|
|
||||||
|
tracing::debug!(
|
||||||
|
session_id = checkpoint.session_id.as_str(),
|
||||||
|
path = %path.display(),
|
||||||
|
"Checkpoint lagret"
|
||||||
|
);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Last inn checkpoint fra disk.
|
||||||
|
pub fn load(session_id: &str) -> Result<Checkpoint, String> {
|
||||||
|
let path = checkpoint_path(session_id);
|
||||||
|
let json = std::fs::read_to_string(&path)
|
||||||
|
.map_err(|e| format!("Kunne ikke lese checkpoint {}: {e}", path.display()))?;
|
||||||
|
let checkpoint: Checkpoint = serde_json::from_str(&json)
|
||||||
|
.map_err(|e| format!("Ugyldig checkpoint JSON: {e}"))?;
|
||||||
|
Ok(checkpoint)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Last inn siste checkpoint (fra "latest"-peker).
|
||||||
|
pub fn load_latest() -> Result<Checkpoint, String> {
|
||||||
|
let latest = latest_path();
|
||||||
|
let session_id = std::fs::read_to_string(&latest)
|
||||||
|
.map_err(|e| format!("Ingen siste checkpoint funnet: {e}"))?;
|
||||||
|
let session_id = session_id.trim();
|
||||||
|
load(session_id)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Rydd opp gamle checkpoints (behold de siste N).
|
||||||
|
pub fn cleanup(keep: usize) -> Result<usize, String> {
|
||||||
|
let dir = checkpoint_dir();
|
||||||
|
if !dir.exists() {
|
||||||
|
return Ok(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut entries: Vec<_> = std::fs::read_dir(&dir)
|
||||||
|
.map_err(|e| format!("Kunne ikke lese checkpoint-mappe: {e}"))?
|
||||||
|
.filter_map(|e| e.ok())
|
||||||
|
.filter(|e| {
|
||||||
|
e.path()
|
||||||
|
.extension()
|
||||||
|
.is_some_and(|ext| ext == "json")
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
if entries.len() <= keep {
|
||||||
|
return Ok(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Sorter etter modifiseringstid (nyeste først)
|
||||||
|
entries.sort_by(|a, b| {
|
||||||
|
let t_a = a.metadata().and_then(|m| m.modified()).unwrap_or(std::time::SystemTime::UNIX_EPOCH);
|
||||||
|
let t_b = b.metadata().and_then(|m| m.modified()).unwrap_or(std::time::SystemTime::UNIX_EPOCH);
|
||||||
|
t_b.cmp(&t_a)
|
||||||
|
});
|
||||||
|
|
||||||
|
let mut removed = 0;
|
||||||
|
for entry in entries.iter().skip(keep) {
|
||||||
|
if std::fs::remove_file(entry.path()).is_ok() {
|
||||||
|
removed += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(removed)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Hent liste over endrede filer fra git diff (for sesjonsrapport).
|
||||||
|
pub async fn changed_files(working_dir: &Path) -> Vec<String> {
|
||||||
|
let output = tokio::process::Command::new("git")
|
||||||
|
.args(["diff", "--name-only", "HEAD"])
|
||||||
|
.current_dir(working_dir)
|
||||||
|
.output()
|
||||||
|
.await;
|
||||||
|
|
||||||
|
match output {
|
||||||
|
Ok(o) if o.status.success() => {
|
||||||
|
String::from_utf8_lossy(&o.stdout)
|
||||||
|
.lines()
|
||||||
|
.filter(|l| !l.is_empty())
|
||||||
|
.map(|l| l.to_string())
|
||||||
|
.collect()
|
||||||
|
}
|
||||||
|
_ => vec![],
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Generer en sesjonsrapport-streng.
|
||||||
|
pub fn session_report(checkpoint: &Checkpoint) -> String {
|
||||||
|
let mut report = String::new();
|
||||||
|
|
||||||
|
report.push_str(&format!("# Sesjonsrapport: {}\n\n", checkpoint.session_id));
|
||||||
|
report.push_str(&format!("Modell: {}\n", checkpoint.model_spec));
|
||||||
|
report.push_str(&format!("Status: {}\n", checkpoint.status));
|
||||||
|
report.push_str(&format!(
|
||||||
|
"Varighet: {} → {}\n",
|
||||||
|
checkpoint.started_at.format("%Y-%m-%d %H:%M:%S"),
|
||||||
|
checkpoint.timestamp.format("%Y-%m-%d %H:%M:%S"),
|
||||||
|
));
|
||||||
|
report.push_str(&format!("Iterasjoner: {}\n\n", checkpoint.total_iterations));
|
||||||
|
|
||||||
|
// Token-forbruk
|
||||||
|
report.push_str("## Token-forbruk\n\n");
|
||||||
|
report.push_str("| Modell | Input | Output | Kostnad |\n");
|
||||||
|
report.push_str("|--------|-------|--------|---------|\n");
|
||||||
|
|
||||||
|
let mut total_input = 0u64;
|
||||||
|
let mut total_output = 0u64;
|
||||||
|
|
||||||
|
for (model, usage) in &checkpoint.usage {
|
||||||
|
let cost = crate::provider::calculate_cost(
|
||||||
|
model,
|
||||||
|
&TokenUsage {
|
||||||
|
input_tokens: usage.input_tokens,
|
||||||
|
output_tokens: usage.output_tokens,
|
||||||
|
},
|
||||||
|
);
|
||||||
|
report.push_str(&format!(
|
||||||
|
"| {} | {} | {} | ${:.4} |\n",
|
||||||
|
model, usage.input_tokens, usage.output_tokens, cost
|
||||||
|
));
|
||||||
|
total_input += usage.input_tokens;
|
||||||
|
total_output += usage.output_tokens;
|
||||||
|
}
|
||||||
|
|
||||||
|
report.push_str(&format!(
|
||||||
|
"| **Totalt** | **{}** | **{}** | **${:.4}** |\n\n",
|
||||||
|
total_input, total_output, checkpoint.total_cost
|
||||||
|
));
|
||||||
|
|
||||||
|
// Oppgave
|
||||||
|
if let Some(ref task) = checkpoint.task {
|
||||||
|
report.push_str("## Oppgave\n\n");
|
||||||
|
report.push_str(task);
|
||||||
|
report.push_str("\n\n");
|
||||||
|
}
|
||||||
|
|
||||||
|
// Filer endret
|
||||||
|
if !checkpoint.files_changed.is_empty() {
|
||||||
|
report.push_str("## Filer endret\n\n");
|
||||||
|
for f in &checkpoint.files_changed {
|
||||||
|
report.push_str(&format!("- {}\n", f));
|
||||||
|
}
|
||||||
|
report.push('\n');
|
||||||
|
}
|
||||||
|
|
||||||
|
// Oppsummering
|
||||||
|
if let Some(ref summary) = checkpoint.summary {
|
||||||
|
report.push_str("## Oppsummering\n\n");
|
||||||
|
report.push_str(summary);
|
||||||
|
report.push('\n');
|
||||||
|
}
|
||||||
|
|
||||||
|
report
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Logg token-forbruk til ai_usage_log i PG.
|
||||||
|
pub async fn log_usage_to_pg(
|
||||||
|
pool: &sqlx::PgPool,
|
||||||
|
checkpoint: &Checkpoint,
|
||||||
|
) -> Result<(), String> {
|
||||||
|
let agent_id = Uuid::parse_str(crate::graph::AGENT_NODE_ID)
|
||||||
|
.map_err(|e| format!("Ugyldig agent UUID: {e}"))?;
|
||||||
|
|
||||||
|
let task_node_id = checkpoint
|
||||||
|
.task_node_id
|
||||||
|
.as_deref()
|
||||||
|
.and_then(|s| Uuid::parse_str(s).ok());
|
||||||
|
|
||||||
|
for (model, usage) in &checkpoint.usage {
|
||||||
|
let total = usage.input_tokens + usage.output_tokens;
|
||||||
|
let cost = crate::provider::calculate_cost(
|
||||||
|
model,
|
||||||
|
&TokenUsage {
|
||||||
|
input_tokens: usage.input_tokens,
|
||||||
|
output_tokens: usage.output_tokens,
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
sqlx::query(
|
||||||
|
r#"
|
||||||
|
INSERT INTO ai_usage_log
|
||||||
|
(agent_node_id, model_alias, model_actual,
|
||||||
|
prompt_tokens, completion_tokens, total_tokens,
|
||||||
|
estimated_cost, job_type)
|
||||||
|
VALUES ($1, $2, $2, $3, $4, $5, $6, $7)
|
||||||
|
"#,
|
||||||
|
)
|
||||||
|
.bind(agent_id)
|
||||||
|
.bind(model)
|
||||||
|
.bind(usage.input_tokens as i32)
|
||||||
|
.bind(usage.output_tokens as i32)
|
||||||
|
.bind(total as i32)
|
||||||
|
.bind(cost)
|
||||||
|
.bind(
|
||||||
|
if task_node_id.is_some() {
|
||||||
|
"agent_task"
|
||||||
|
} else {
|
||||||
|
"agent_session"
|
||||||
|
},
|
||||||
|
)
|
||||||
|
.execute(pool)
|
||||||
|
.await
|
||||||
|
.map_err(|e| format!("ai_usage_log insert feilet: {e}"))?;
|
||||||
|
}
|
||||||
|
|
||||||
|
tracing::info!(
|
||||||
|
session_id = checkpoint.session_id.as_str(),
|
||||||
|
models = checkpoint.usage.len(),
|
||||||
|
total_cost = format!("${:.4}", checkpoint.total_cost).as_str(),
|
||||||
|
"Kostnadslogging til PG fullført"
|
||||||
|
);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_new_session_id() {
|
||||||
|
let id = new_session_id();
|
||||||
|
assert!(id.len() > 15);
|
||||||
|
assert!(id.contains('-'));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_checkpoint_roundtrip() {
|
||||||
|
let cp = Checkpoint {
|
||||||
|
session_id: "test-123".into(),
|
||||||
|
model_spec: "openrouter/anthropic/claude-sonnet-4".into(),
|
||||||
|
messages: vec![Message {
|
||||||
|
role: "system".into(),
|
||||||
|
content: Some("Hello".into()),
|
||||||
|
tool_calls: None,
|
||||||
|
tool_call_id: None,
|
||||||
|
}],
|
||||||
|
usage: {
|
||||||
|
let mut m = HashMap::new();
|
||||||
|
m.insert("claude-sonnet-4".into(), TokenUsageData {
|
||||||
|
input_tokens: 1000,
|
||||||
|
output_tokens: 500,
|
||||||
|
});
|
||||||
|
m
|
||||||
|
},
|
||||||
|
total_cost: 0.0225,
|
||||||
|
total_iterations: 5,
|
||||||
|
task: Some("Fiks buggen".into()),
|
||||||
|
task_node_id: None,
|
||||||
|
working_dir: "/home/vegard/synops".into(),
|
||||||
|
files_changed: vec!["src/main.rs".into()],
|
||||||
|
timestamp: Utc::now(),
|
||||||
|
started_at: Utc::now(),
|
||||||
|
status: "completed".into(),
|
||||||
|
summary: Some("Buggen ble fikset.".into()),
|
||||||
|
};
|
||||||
|
|
||||||
|
let json = serde_json::to_string_pretty(&cp).unwrap();
|
||||||
|
let loaded: Checkpoint = serde_json::from_str(&json).unwrap();
|
||||||
|
|
||||||
|
assert_eq!(loaded.session_id, "test-123");
|
||||||
|
assert_eq!(loaded.messages.len(), 1);
|
||||||
|
assert_eq!(loaded.total_cost, 0.0225);
|
||||||
|
assert_eq!(loaded.files_changed.len(), 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_session_report() {
|
||||||
|
let cp = Checkpoint {
|
||||||
|
session_id: "test-report".into(),
|
||||||
|
model_spec: "test-model".into(),
|
||||||
|
messages: vec![],
|
||||||
|
usage: {
|
||||||
|
let mut m = HashMap::new();
|
||||||
|
m.insert("test-model".into(), TokenUsageData {
|
||||||
|
input_tokens: 2000,
|
||||||
|
output_tokens: 1000,
|
||||||
|
});
|
||||||
|
m
|
||||||
|
},
|
||||||
|
total_cost: 0.05,
|
||||||
|
total_iterations: 10,
|
||||||
|
task: Some("Test-oppgave".into()),
|
||||||
|
task_node_id: None,
|
||||||
|
working_dir: "/tmp".into(),
|
||||||
|
files_changed: vec!["a.rs".into(), "b.rs".into()],
|
||||||
|
timestamp: Utc::now(),
|
||||||
|
started_at: Utc::now(),
|
||||||
|
status: "completed".into(),
|
||||||
|
summary: Some("Alt gikk bra.".into()),
|
||||||
|
};
|
||||||
|
|
||||||
|
let report = session_report(&cp);
|
||||||
|
assert!(report.contains("Sesjonsrapport"));
|
||||||
|
assert!(report.contains("test-model"));
|
||||||
|
assert!(report.contains("a.rs"));
|
||||||
|
assert!(report.contains("Alt gikk bra"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -337,10 +337,27 @@ async fn handle_do(
|
||||||
verbose: config.verbose,
|
verbose: config.verbose,
|
||||||
interrupted: Arc::new(AtomicBool::new(false)),
|
interrupted: Arc::new(AtomicBool::new(false)),
|
||||||
pg_pool: Some(pool.clone()),
|
pg_pool: Some(pool.clone()),
|
||||||
|
session_id: crate::checkpoint::new_session_id(),
|
||||||
|
model_spec: model.to_string(),
|
||||||
|
task_text: Some(task.to_string()),
|
||||||
|
task_node_id: None,
|
||||||
|
checkpoint_interval: 10,
|
||||||
|
tool_calls_since_checkpoint: 0,
|
||||||
|
started_at: Utc::now(),
|
||||||
};
|
};
|
||||||
|
|
||||||
let result = session.run_turn().await;
|
let result = session.run_turn().await;
|
||||||
|
|
||||||
|
// Finalize: log usage to PG
|
||||||
|
let status = match &result {
|
||||||
|
Ok(crate::TurnResult::Done) => "completed",
|
||||||
|
Ok(crate::TurnResult::BudgetExhausted) => "budget_exhausted",
|
||||||
|
Ok(crate::TurnResult::MaxIterations) => "max_iterations",
|
||||||
|
Ok(crate::TurnResult::Interrupted) => "interrupted",
|
||||||
|
Err(_) => "failed",
|
||||||
|
};
|
||||||
|
session.finalize(status).await;
|
||||||
|
|
||||||
// Extract the last assistant text as the reply
|
// Extract the last assistant text as the reply
|
||||||
let response_text = session
|
let response_text = session
|
||||||
.messages
|
.messages
|
||||||
|
|
@ -468,10 +485,27 @@ async fn run_task(
|
||||||
verbose: config.verbose,
|
verbose: config.verbose,
|
||||||
interrupted: Arc::new(AtomicBool::new(false)),
|
interrupted: Arc::new(AtomicBool::new(false)),
|
||||||
pg_pool: Some(pool.clone()),
|
pg_pool: Some(pool.clone()),
|
||||||
|
session_id: crate::checkpoint::new_session_id(),
|
||||||
|
model_spec: config.default_model.clone(),
|
||||||
|
task_text: Some(task_desc.to_string()),
|
||||||
|
task_node_id: Some(task.id.to_string()),
|
||||||
|
checkpoint_interval: 10,
|
||||||
|
tool_calls_since_checkpoint: 0,
|
||||||
|
started_at: Utc::now(),
|
||||||
};
|
};
|
||||||
|
|
||||||
let result = session.run_turn().await;
|
let result = session.run_turn().await;
|
||||||
|
|
||||||
|
// Finalize: checkpoint + PG logging
|
||||||
|
let status = match &result {
|
||||||
|
Ok(crate::TurnResult::Done) => "completed",
|
||||||
|
Ok(crate::TurnResult::BudgetExhausted) => "budget_exhausted",
|
||||||
|
Ok(crate::TurnResult::MaxIterations) => "max_iterations",
|
||||||
|
Ok(crate::TurnResult::Interrupted) => "interrupted",
|
||||||
|
Err(_) => "failed",
|
||||||
|
};
|
||||||
|
session.finalize(status).await;
|
||||||
|
|
||||||
let summary = session
|
let summary = session
|
||||||
.messages
|
.messages
|
||||||
.iter()
|
.iter()
|
||||||
|
|
|
||||||
|
|
@ -13,7 +13,7 @@ use sqlx::PgPool;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
/// Agent node ID (claude-main).
|
/// Agent node ID (claude-main).
|
||||||
const AGENT_NODE_ID: &str = "d3eebc99-9c0b-4ef8-bb6d-6bb9bd380a44";
|
pub const AGENT_NODE_ID: &str = "d3eebc99-9c0b-4ef8-bb6d-6bb9bd380a44";
|
||||||
|
|
||||||
/// Pick the highest-priority open task and claim it atomically.
|
/// Pick the highest-priority open task and claim it atomically.
|
||||||
///
|
///
|
||||||
|
|
|
||||||
|
|
@ -8,6 +8,7 @@
|
||||||
//! synops-agent --interactive
|
//! synops-agent --interactive
|
||||||
//! synops-agent -i --model gemini/gemini-2.5-flash
|
//! synops-agent -i --model gemini/gemini-2.5-flash
|
||||||
|
|
||||||
|
mod checkpoint;
|
||||||
mod context;
|
mod context;
|
||||||
mod daemon;
|
mod daemon;
|
||||||
mod git;
|
mod git;
|
||||||
|
|
@ -104,6 +105,14 @@ struct Cli {
|
||||||
/// Opprett og bytt til en egen branch for oppgaven
|
/// Opprett og bytt til en egen branch for oppgaven
|
||||||
#[arg(long)]
|
#[arg(long)]
|
||||||
branch: Option<String>,
|
branch: Option<String>,
|
||||||
|
|
||||||
|
/// Gjenoppta en krasjet sesjon (sesjons-ID eller "latest")
|
||||||
|
#[arg(long)]
|
||||||
|
resume: Option<String>,
|
||||||
|
|
||||||
|
/// Checkpoint-intervall: lagre mellomtilstand etter N tool-kall (default: 10)
|
||||||
|
#[arg(long, default_value = "10")]
|
||||||
|
checkpoint_interval: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(clap::Subcommand)]
|
#[derive(clap::Subcommand)]
|
||||||
|
|
@ -162,6 +171,20 @@ pub(crate) struct AgentSession {
|
||||||
pub(crate) interrupted: Arc<AtomicBool>,
|
pub(crate) interrupted: Arc<AtomicBool>,
|
||||||
/// Optional PG pool for graph integration (synops_query tool, task picking).
|
/// Optional PG pool for graph integration (synops_query tool, task picking).
|
||||||
pub(crate) pg_pool: Option<PgPool>,
|
pub(crate) pg_pool: Option<PgPool>,
|
||||||
|
/// Session ID for checkpoint/recovery.
|
||||||
|
pub(crate) session_id: String,
|
||||||
|
/// Model spec string (for checkpoint metadata).
|
||||||
|
pub(crate) model_spec: String,
|
||||||
|
/// Current task text (for checkpoint/report).
|
||||||
|
pub(crate) task_text: Option<String>,
|
||||||
|
/// Task node ID if from graph.
|
||||||
|
pub(crate) task_node_id: Option<String>,
|
||||||
|
/// How often to save checkpoint (every N tool calls).
|
||||||
|
pub(crate) checkpoint_interval: usize,
|
||||||
|
/// Counter for tool calls since last checkpoint.
|
||||||
|
pub(crate) tool_calls_since_checkpoint: usize,
|
||||||
|
/// When the session started.
|
||||||
|
pub(crate) started_at: chrono::DateTime<chrono::Utc>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Whether to use plan mode for a task.
|
/// Whether to use plan mode for a task.
|
||||||
|
|
@ -324,6 +347,9 @@ impl AgentSession {
|
||||||
tool_calls: None,
|
tool_calls: None,
|
||||||
tool_call_id: Some(tc.id.clone()),
|
tool_call_id: Some(tc.id.clone()),
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// Periodic checkpoint
|
||||||
|
self.maybe_checkpoint();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -511,6 +537,82 @@ impl AgentSession {
|
||||||
}
|
}
|
||||||
eprintln!(" Iterasjoner: {}", self.total_iterations);
|
eprintln!(" Iterasjoner: {}", self.total_iterations);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Build a checkpoint from the current session state.
|
||||||
|
fn build_checkpoint(&self, status: &str, summary: Option<String>) -> checkpoint::Checkpoint {
|
||||||
|
checkpoint::Checkpoint {
|
||||||
|
session_id: self.session_id.clone(),
|
||||||
|
model_spec: self.model_spec.clone(),
|
||||||
|
messages: self.messages.clone(),
|
||||||
|
usage: self
|
||||||
|
.total_usage
|
||||||
|
.iter()
|
||||||
|
.map(|(k, v)| (k.clone(), checkpoint::TokenUsageData::from(v)))
|
||||||
|
.collect(),
|
||||||
|
total_cost: self.total_cost,
|
||||||
|
total_iterations: self.total_iterations,
|
||||||
|
task: self.task_text.clone(),
|
||||||
|
task_node_id: self.task_node_id.clone(),
|
||||||
|
working_dir: self.working_dir.display().to_string(),
|
||||||
|
files_changed: vec![], // Filled at session end
|
||||||
|
timestamp: chrono::Utc::now(),
|
||||||
|
started_at: self.started_at,
|
||||||
|
status: status.into(),
|
||||||
|
summary,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Save a checkpoint if enough tool calls have elapsed.
|
||||||
|
fn maybe_checkpoint(&mut self) {
|
||||||
|
if self.checkpoint_interval == 0 {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
self.tool_calls_since_checkpoint += 1;
|
||||||
|
if self.tool_calls_since_checkpoint >= self.checkpoint_interval {
|
||||||
|
self.tool_calls_since_checkpoint = 0;
|
||||||
|
let cp = self.build_checkpoint("running", None);
|
||||||
|
if let Err(e) = checkpoint::save(&cp) {
|
||||||
|
tracing::warn!(error = %e, "Checkpoint-lagring feilet");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Save final checkpoint and log usage to PG.
|
||||||
|
async fn finalize(&mut self, status: &str) {
|
||||||
|
let files = checkpoint::changed_files(&self.working_dir).await;
|
||||||
|
let summary = self
|
||||||
|
.messages
|
||||||
|
.iter()
|
||||||
|
.rev()
|
||||||
|
.find(|m| m.role == "assistant" && m.content.is_some())
|
||||||
|
.and_then(|m| m.content.clone());
|
||||||
|
|
||||||
|
let mut cp = self.build_checkpoint(status, summary);
|
||||||
|
cp.files_changed = files;
|
||||||
|
|
||||||
|
// Save checkpoint
|
||||||
|
if let Err(e) = checkpoint::save(&cp) {
|
||||||
|
tracing::warn!(error = %e, "Endelig checkpoint-lagring feilet");
|
||||||
|
}
|
||||||
|
|
||||||
|
// Log to PG
|
||||||
|
if let Some(ref pool) = self.pg_pool {
|
||||||
|
if let Err(e) = checkpoint::log_usage_to_pg(pool, &cp).await {
|
||||||
|
tracing::warn!(error = %e, "Kostnadslogging til PG feilet");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Print session report
|
||||||
|
let report = checkpoint::session_report(&cp);
|
||||||
|
eprintln!("\n{}", report);
|
||||||
|
|
||||||
|
// Cleanup old checkpoints (keep last 20)
|
||||||
|
match checkpoint::cleanup(20) {
|
||||||
|
Ok(0) => {}
|
||||||
|
Ok(n) => tracing::debug!(removed = n, "Ryddet gamle checkpoints"),
|
||||||
|
Err(e) => tracing::warn!(error = %e, "Checkpoint-opprydding feilet"),
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
|
|
@ -554,6 +656,25 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
return daemon::run(pool, config).await.map_err(Into::into);
|
return daemon::run(pool, config).await.map_err(Into::into);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Handle --resume: restore from checkpoint
|
||||||
|
if let Some(ref resume_id) = cli.resume {
|
||||||
|
let cp = if resume_id == "latest" {
|
||||||
|
checkpoint::load_latest()
|
||||||
|
} else {
|
||||||
|
checkpoint::load(resume_id)
|
||||||
|
};
|
||||||
|
|
||||||
|
match cp {
|
||||||
|
Ok(cp) => {
|
||||||
|
return resume_session(cp, &cli).await;
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
eprintln!("Feil ved gjenoppretting: {}", e);
|
||||||
|
std::process::exit(1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Validate: need either --task or --interactive
|
// Validate: need either --task or --interactive
|
||||||
if cli.task.is_none() && !cli.interactive {
|
if cli.task.is_none() && !cli.interactive {
|
||||||
eprintln!("Feil: Enten --task eller --interactive er påkrevd.");
|
eprintln!("Feil: Enten --task eller --interactive er påkrevd.");
|
||||||
|
|
@ -642,6 +763,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
verbose: cli.verbose,
|
verbose: cli.verbose,
|
||||||
interrupted: interrupted.clone(),
|
interrupted: interrupted.clone(),
|
||||||
pg_pool,
|
pg_pool,
|
||||||
|
session_id: checkpoint::new_session_id(),
|
||||||
|
model_spec: cli.model.clone(),
|
||||||
|
task_text: cli.task.clone(),
|
||||||
|
task_node_id: None,
|
||||||
|
checkpoint_interval: cli.checkpoint_interval,
|
||||||
|
tool_calls_since_checkpoint: 0,
|
||||||
|
started_at: chrono::Utc::now(),
|
||||||
};
|
};
|
||||||
|
|
||||||
// Git: branch-per-task
|
// Git: branch-per-task
|
||||||
|
|
@ -691,7 +819,16 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
session.run_turn().await?
|
session.run_turn().await?
|
||||||
};
|
};
|
||||||
|
|
||||||
session.print_summary();
|
// Determine final status
|
||||||
|
let status = match &result {
|
||||||
|
TurnResult::Done => "completed",
|
||||||
|
TurnResult::BudgetExhausted => "budget_exhausted",
|
||||||
|
TurnResult::MaxIterations => "max_iterations",
|
||||||
|
TurnResult::Interrupted => "interrupted",
|
||||||
|
};
|
||||||
|
|
||||||
|
// Finalize session: checkpoint, PG logging, report
|
||||||
|
session.finalize(status).await;
|
||||||
|
|
||||||
// Show diff after task
|
// Show diff after task
|
||||||
if let Ok(diff_output) = git::diff(&cli.working_dir).await {
|
if let Ok(diff_output) = git::diff(&cli.working_dir).await {
|
||||||
|
|
@ -741,7 +878,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
|
||||||
if matches!(result, TurnResult::BudgetExhausted) {
|
if matches!(result, TurnResult::BudgetExhausted) {
|
||||||
eprintln!("\n⚠ Budsjettgrense nådd. Oppgaven er ikke fullført.");
|
eprintln!("\n⚠ Budsjettgrense nådd. Oppgaven er ikke fullført.");
|
||||||
eprintln!(" Gjenstående arbeid bør fortsettes med høyere --max-cost");
|
eprintln!(" Gjenoppta med: synops-agent --resume {}", session.session_id);
|
||||||
std::process::exit(2);
|
std::process::exit(2);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -915,7 +1052,8 @@ async fn run_interactive(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
session.print_summary();
|
// Finalize session
|
||||||
|
session.finalize("completed").await;
|
||||||
|
|
||||||
// Save history
|
// Save history
|
||||||
let _ = rl.save_history(&history_path);
|
let _ = rl.save_history(&history_path);
|
||||||
|
|
@ -923,6 +1061,104 @@ async fn run_interactive(
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Resume a session from a checkpoint.
|
||||||
|
async fn resume_session(
|
||||||
|
cp: checkpoint::Checkpoint,
|
||||||
|
cli: &Cli,
|
||||||
|
) -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
eprintln!("Gjenopptar sesjon: {}", cp.session_id);
|
||||||
|
eprintln!(
|
||||||
|
" Oppgave: {}",
|
||||||
|
cp.task.as_deref().unwrap_or("(ukjent)")
|
||||||
|
);
|
||||||
|
eprintln!(
|
||||||
|
" Forrige status: {} ({} iterasjoner, ${:.4})",
|
||||||
|
cp.status, cp.total_iterations, cp.total_cost
|
||||||
|
);
|
||||||
|
eprintln!(" Meldinger i historikk: {}", cp.messages.len());
|
||||||
|
|
||||||
|
let api_keys = ApiKeys::from_env();
|
||||||
|
let model_spec = &cli.model;
|
||||||
|
let provider = create_provider(model_spec, &api_keys, cli.max_tokens)?;
|
||||||
|
|
||||||
|
let compaction_config = CompactionConfig {
|
||||||
|
context_window: provider.context_window(),
|
||||||
|
..Default::default()
|
||||||
|
};
|
||||||
|
|
||||||
|
// Restore token usage
|
||||||
|
let total_usage: HashMap<String, TokenUsage> = cp
|
||||||
|
.usage
|
||||||
|
.iter()
|
||||||
|
.map(|(k, v)| (k.clone(), TokenUsage::from(v)))
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
let pg_pool = match synops_common::db::connect().await {
|
||||||
|
Ok(pool) => Some(pool),
|
||||||
|
Err(_) => None,
|
||||||
|
};
|
||||||
|
|
||||||
|
let interrupted = Arc::new(AtomicBool::new(false));
|
||||||
|
|
||||||
|
let mut session = AgentSession {
|
||||||
|
provider,
|
||||||
|
messages: cp.messages,
|
||||||
|
tool_defs: tools::tool_definitions(),
|
||||||
|
total_usage,
|
||||||
|
total_cost: cp.total_cost,
|
||||||
|
total_iterations: cp.total_iterations,
|
||||||
|
retry_config: RetryConfig {
|
||||||
|
max_retries: cli.max_retries,
|
||||||
|
..Default::default()
|
||||||
|
},
|
||||||
|
compaction_config,
|
||||||
|
working_dir: PathBuf::from(&cp.working_dir),
|
||||||
|
max_iterations: cli.max_iterations,
|
||||||
|
max_cost: cli.max_cost,
|
||||||
|
verbose: cli.verbose,
|
||||||
|
interrupted,
|
||||||
|
pg_pool,
|
||||||
|
session_id: cp.session_id,
|
||||||
|
model_spec: model_spec.clone(),
|
||||||
|
task_text: cp.task.clone(),
|
||||||
|
task_node_id: cp.task_node_id,
|
||||||
|
checkpoint_interval: cli.checkpoint_interval,
|
||||||
|
tool_calls_since_checkpoint: 0,
|
||||||
|
started_at: cp.started_at,
|
||||||
|
};
|
||||||
|
|
||||||
|
// Add a resume message so the LLM knows it's continuing
|
||||||
|
session.messages.push(Message {
|
||||||
|
role: "user".into(),
|
||||||
|
content: Some(
|
||||||
|
"Sesjonen ble gjenopptatt etter avbrudd. Fortsett der du slapp. \
|
||||||
|
Hva har du gjort så langt, og hva gjenstår?"
|
||||||
|
.into(),
|
||||||
|
),
|
||||||
|
tool_calls: None,
|
||||||
|
tool_call_id: None,
|
||||||
|
});
|
||||||
|
|
||||||
|
let result = session.run_turn().await?;
|
||||||
|
|
||||||
|
let status = match &result {
|
||||||
|
TurnResult::Done => "completed",
|
||||||
|
TurnResult::BudgetExhausted => "budget_exhausted",
|
||||||
|
TurnResult::MaxIterations => "max_iterations",
|
||||||
|
TurnResult::Interrupted => "interrupted",
|
||||||
|
};
|
||||||
|
|
||||||
|
session.finalize(status).await;
|
||||||
|
|
||||||
|
if matches!(result, TurnResult::BudgetExhausted) {
|
||||||
|
eprintln!("\n⚠ Budsjettgrense nådd.");
|
||||||
|
eprintln!(" Gjenoppta med: synops-agent --resume {}", session.session_id);
|
||||||
|
std::process::exit(2);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
/// Format token count for display (e.g. 1234 → "1.2k", 1234567 → "1.2M").
|
/// Format token count for display (e.g. 1234 → "1.2k", 1234567 → "1.2M").
|
||||||
fn format_tokens(tokens: u64) -> String {
|
fn format_tokens(tokens: u64) -> String {
|
||||||
if tokens >= 1_000_000 {
|
if tokens >= 1_000_000 {
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue