synops/tools/synops-agent/src/checkpoint.rs
vegard 202682e2e0 Implementer checkpoint og recovery i synops-agent
- Ny checkpoint-modul: lagrer sesjonsstatus (meldinger, tokens, oppgave) til JSON
- --resume flagg for å gjenoppta etter krasj (sesjons-ID eller "latest")
- --checkpoint-interval for å styre hvor ofte mellomtilstand lagres
- Kostnadslogging til ai_usage_log i PG ved sesjonsslutt
- Sesjonsrapport: modell, varighet, tokens, kostnad, filer endret
- Integrert i agent-loop (periodisk checkpoint), batch-modus og daemon
- Automatisk opprydding av gamle checkpoints (beholder siste 20)
2026-03-19 18:49:09 +00:00

425 lines
13 KiB
Rust

//! Checkpoint og recovery — lagre og gjenoppta agent-sesjoner.
//!
//! Skriver mellomtilstand til JSON-fil slik at en krasjet sesjon
//! kan gjenopptas med `--resume`. Inkluderer:
//! - Meldingshistorikk
//! - Token-forbruk per modell
//! - Pågående oppgave
//! - Tidsstempel og sesjons-ID
//!
//! Checkpoint skrives periodisk (etter hvert N-te tool-kall) og
//! ved sesjonsslutt. Ved `--resume` lastes siste checkpoint og
//! agenten fortsetter der den slapp.
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use uuid::Uuid;
use crate::provider::{Message, TokenUsage};
/// Serialiserbar sesjonsstatus for checkpoint.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Checkpoint {
/// Unik sesjons-ID.
pub session_id: String,
/// Modellspesifikasjon brukt.
pub model_spec: String,
/// Meldingshistorikk.
pub messages: Vec<Message>,
/// Token-forbruk per modell.
pub usage: HashMap<String, TokenUsageData>,
/// Total kostnad i USD.
pub total_cost: f64,
/// Antall iterasjoner utført.
pub total_iterations: usize,
/// Pågående oppgave (task-tekst).
pub task: Option<String>,
/// Task-node-ID (hvis hentet fra grafen).
pub task_node_id: Option<String>,
/// Arbeidsmappe.
pub working_dir: String,
/// Filer endret i sesjonen (fra git diff).
pub files_changed: Vec<String>,
/// Tidsstempel for checkpoint.
pub timestamp: DateTime<Utc>,
/// Tidsstempel for sesjonsstart.
pub started_at: DateTime<Utc>,
/// Status: "running", "completed", "failed", "interrupted".
pub status: String,
/// Avsluttende oppsummering (fylles ved sesjonsslutt).
pub summary: Option<String>,
}
/// Token-forbruk (serialiserbar versjon).
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct TokenUsageData {
pub input_tokens: u64,
pub output_tokens: u64,
}
impl From<&TokenUsage> for TokenUsageData {
fn from(u: &TokenUsage) -> Self {
Self {
input_tokens: u.input_tokens,
output_tokens: u.output_tokens,
}
}
}
impl From<&TokenUsageData> for TokenUsage {
fn from(d: &TokenUsageData) -> Self {
Self {
input_tokens: d.input_tokens,
output_tokens: d.output_tokens,
}
}
}
/// Standard checkpoint-mappe.
pub fn checkpoint_dir() -> PathBuf {
let home = std::env::var("HOME").unwrap_or_else(|_| ".".into());
PathBuf::from(home).join(".synops").join("checkpoints")
}
/// Filsti for en gitt sesjons-ID.
pub fn checkpoint_path(session_id: &str) -> PathBuf {
checkpoint_dir().join(format!("{}.json", session_id))
}
/// Sti til "latest" symlink/fil som peker på siste checkpoint.
fn latest_path() -> PathBuf {
checkpoint_dir().join("latest")
}
/// Generer en unik sesjons-ID.
pub fn new_session_id() -> String {
let now = Utc::now();
format!(
"{}-{}",
now.format("%Y%m%d-%H%M%S"),
&Uuid::now_v7().to_string()[..8]
)
}
/// Lagre checkpoint til disk.
pub fn save(checkpoint: &Checkpoint) -> Result<(), String> {
let dir = checkpoint_dir();
std::fs::create_dir_all(&dir)
.map_err(|e| format!("Kunne ikke opprette checkpoint-mappe: {e}"))?;
let path = checkpoint_path(&checkpoint.session_id);
let json = serde_json::to_string_pretty(checkpoint)
.map_err(|e| format!("Serialisering feilet: {e}"))?;
// Atomisk skriving via temp-fil
let tmp = path.with_extension("tmp");
std::fs::write(&tmp, &json)
.map_err(|e| format!("Skriving feilet: {e}"))?;
std::fs::rename(&tmp, &path)
.map_err(|e| format!("Rename feilet: {e}"))?;
// Oppdater "latest"-peker
let latest = latest_path();
let _ = std::fs::write(&latest, &checkpoint.session_id);
tracing::debug!(
session_id = checkpoint.session_id.as_str(),
path = %path.display(),
"Checkpoint lagret"
);
Ok(())
}
/// Last inn checkpoint fra disk.
pub fn load(session_id: &str) -> Result<Checkpoint, String> {
let path = checkpoint_path(session_id);
let json = std::fs::read_to_string(&path)
.map_err(|e| format!("Kunne ikke lese checkpoint {}: {e}", path.display()))?;
let checkpoint: Checkpoint = serde_json::from_str(&json)
.map_err(|e| format!("Ugyldig checkpoint JSON: {e}"))?;
Ok(checkpoint)
}
/// Last inn siste checkpoint (fra "latest"-peker).
pub fn load_latest() -> Result<Checkpoint, String> {
let latest = latest_path();
let session_id = std::fs::read_to_string(&latest)
.map_err(|e| format!("Ingen siste checkpoint funnet: {e}"))?;
let session_id = session_id.trim();
load(session_id)
}
/// Rydd opp gamle checkpoints (behold de siste N).
pub fn cleanup(keep: usize) -> Result<usize, String> {
let dir = checkpoint_dir();
if !dir.exists() {
return Ok(0);
}
let mut entries: Vec<_> = std::fs::read_dir(&dir)
.map_err(|e| format!("Kunne ikke lese checkpoint-mappe: {e}"))?
.filter_map(|e| e.ok())
.filter(|e| {
e.path()
.extension()
.is_some_and(|ext| ext == "json")
})
.collect();
if entries.len() <= keep {
return Ok(0);
}
// Sorter etter modifiseringstid (nyeste først)
entries.sort_by(|a, b| {
let t_a = a.metadata().and_then(|m| m.modified()).unwrap_or(std::time::SystemTime::UNIX_EPOCH);
let t_b = b.metadata().and_then(|m| m.modified()).unwrap_or(std::time::SystemTime::UNIX_EPOCH);
t_b.cmp(&t_a)
});
let mut removed = 0;
for entry in entries.iter().skip(keep) {
if std::fs::remove_file(entry.path()).is_ok() {
removed += 1;
}
}
Ok(removed)
}
/// Hent liste over endrede filer fra git diff (for sesjonsrapport).
pub async fn changed_files(working_dir: &Path) -> Vec<String> {
let output = tokio::process::Command::new("git")
.args(["diff", "--name-only", "HEAD"])
.current_dir(working_dir)
.output()
.await;
match output {
Ok(o) if o.status.success() => {
String::from_utf8_lossy(&o.stdout)
.lines()
.filter(|l| !l.is_empty())
.map(|l| l.to_string())
.collect()
}
_ => vec![],
}
}
/// Generer en sesjonsrapport-streng.
pub fn session_report(checkpoint: &Checkpoint) -> String {
let mut report = String::new();
report.push_str(&format!("# Sesjonsrapport: {}\n\n", checkpoint.session_id));
report.push_str(&format!("Modell: {}\n", checkpoint.model_spec));
report.push_str(&format!("Status: {}\n", checkpoint.status));
report.push_str(&format!(
"Varighet: {}{}\n",
checkpoint.started_at.format("%Y-%m-%d %H:%M:%S"),
checkpoint.timestamp.format("%Y-%m-%d %H:%M:%S"),
));
report.push_str(&format!("Iterasjoner: {}\n\n", checkpoint.total_iterations));
// Token-forbruk
report.push_str("## Token-forbruk\n\n");
report.push_str("| Modell | Input | Output | Kostnad |\n");
report.push_str("|--------|-------|--------|---------|\n");
let mut total_input = 0u64;
let mut total_output = 0u64;
for (model, usage) in &checkpoint.usage {
let cost = crate::provider::calculate_cost(
model,
&TokenUsage {
input_tokens: usage.input_tokens,
output_tokens: usage.output_tokens,
},
);
report.push_str(&format!(
"| {} | {} | {} | ${:.4} |\n",
model, usage.input_tokens, usage.output_tokens, cost
));
total_input += usage.input_tokens;
total_output += usage.output_tokens;
}
report.push_str(&format!(
"| **Totalt** | **{}** | **{}** | **${:.4}** |\n\n",
total_input, total_output, checkpoint.total_cost
));
// Oppgave
if let Some(ref task) = checkpoint.task {
report.push_str("## Oppgave\n\n");
report.push_str(task);
report.push_str("\n\n");
}
// Filer endret
if !checkpoint.files_changed.is_empty() {
report.push_str("## Filer endret\n\n");
for f in &checkpoint.files_changed {
report.push_str(&format!("- {}\n", f));
}
report.push('\n');
}
// Oppsummering
if let Some(ref summary) = checkpoint.summary {
report.push_str("## Oppsummering\n\n");
report.push_str(summary);
report.push('\n');
}
report
}
/// Logg token-forbruk til ai_usage_log i PG.
pub async fn log_usage_to_pg(
pool: &sqlx::PgPool,
checkpoint: &Checkpoint,
) -> Result<(), String> {
let agent_id = Uuid::parse_str(crate::graph::AGENT_NODE_ID)
.map_err(|e| format!("Ugyldig agent UUID: {e}"))?;
let task_node_id = checkpoint
.task_node_id
.as_deref()
.and_then(|s| Uuid::parse_str(s).ok());
for (model, usage) in &checkpoint.usage {
let total = usage.input_tokens + usage.output_tokens;
let cost = crate::provider::calculate_cost(
model,
&TokenUsage {
input_tokens: usage.input_tokens,
output_tokens: usage.output_tokens,
},
);
sqlx::query(
r#"
INSERT INTO ai_usage_log
(agent_node_id, model_alias, model_actual,
prompt_tokens, completion_tokens, total_tokens,
estimated_cost, job_type)
VALUES ($1, $2, $2, $3, $4, $5, $6, $7)
"#,
)
.bind(agent_id)
.bind(model)
.bind(usage.input_tokens as i32)
.bind(usage.output_tokens as i32)
.bind(total as i32)
.bind(cost)
.bind(
if task_node_id.is_some() {
"agent_task"
} else {
"agent_session"
},
)
.execute(pool)
.await
.map_err(|e| format!("ai_usage_log insert feilet: {e}"))?;
}
tracing::info!(
session_id = checkpoint.session_id.as_str(),
models = checkpoint.usage.len(),
total_cost = format!("${:.4}", checkpoint.total_cost).as_str(),
"Kostnadslogging til PG fullført"
);
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_new_session_id() {
let id = new_session_id();
assert!(id.len() > 15);
assert!(id.contains('-'));
}
#[test]
fn test_checkpoint_roundtrip() {
let cp = Checkpoint {
session_id: "test-123".into(),
model_spec: "openrouter/anthropic/claude-sonnet-4".into(),
messages: vec![Message {
role: "system".into(),
content: Some("Hello".into()),
tool_calls: None,
tool_call_id: None,
}],
usage: {
let mut m = HashMap::new();
m.insert("claude-sonnet-4".into(), TokenUsageData {
input_tokens: 1000,
output_tokens: 500,
});
m
},
total_cost: 0.0225,
total_iterations: 5,
task: Some("Fiks buggen".into()),
task_node_id: None,
working_dir: "/home/vegard/synops".into(),
files_changed: vec!["src/main.rs".into()],
timestamp: Utc::now(),
started_at: Utc::now(),
status: "completed".into(),
summary: Some("Buggen ble fikset.".into()),
};
let json = serde_json::to_string_pretty(&cp).unwrap();
let loaded: Checkpoint = serde_json::from_str(&json).unwrap();
assert_eq!(loaded.session_id, "test-123");
assert_eq!(loaded.messages.len(), 1);
assert_eq!(loaded.total_cost, 0.0225);
assert_eq!(loaded.files_changed.len(), 1);
}
#[test]
fn test_session_report() {
let cp = Checkpoint {
session_id: "test-report".into(),
model_spec: "test-model".into(),
messages: vec![],
usage: {
let mut m = HashMap::new();
m.insert("test-model".into(), TokenUsageData {
input_tokens: 2000,
output_tokens: 1000,
});
m
},
total_cost: 0.05,
total_iterations: 10,
task: Some("Test-oppgave".into()),
task_node_id: None,
working_dir: "/tmp".into(),
files_changed: vec!["a.rs".into(), "b.rs".into()],
timestamp: Utc::now(),
started_at: Utc::now(),
status: "completed".into(),
summary: Some("Alt gikk bra.".into()),
};
let report = session_report(&cp);
assert!(report.contains("Sesjonsrapport"));
assert!(report.contains("test-model"));
assert!(report.contains("a.rs"));
assert!(report.contains("Alt gikk bra"));
}
}