Implementer grafintegrasjon i synops-agent (PG)

Ny modul graph.rs med:
- pick_task: plukk høyest-prioritet open task atomisk (FOR UPDATE SKIP LOCKED)
- update_task_status: oppdater status (open → active → done/failed)
- write_task_message: skriv melding i oppdragets chat-node
- release_stale_tasks: frigjør tasks stuck >60 min (krasj-deteksjon)
- query_nodes/query_edges/get_node: generiske graf-spørringer

Nytt verktøy synops_query for LLM:
- action: nodes (list med kind/status-filter)
- action: edges (for en node, med retning/type-filter)
- action: get (les enkeltnode med metadata)

PG-tilkobling er valgfri — degraderer gracefully uten DATABASE_URL.
Krasj-deteksjon kjøres ved oppstart.
This commit is contained in:
vegard 2026-03-19 18:32:28 +00:00
parent 231bceabbd
commit ec5fc662fe
5 changed files with 792 additions and 2 deletions

View file

@ -144,6 +144,15 @@ dependencies = [
"generic-array", "generic-array",
] ]
[[package]]
name = "block2"
version = "0.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cdeb9d870516001442e364c5220d3574d2da8dc765554b4a617230d33fa58ef5"
dependencies = [
"objc2",
]
[[package]] [[package]]
name = "bumpalo" name = "bumpalo"
version = "3.20.2" version = "3.20.2"
@ -238,6 +247,15 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c8d4a3bb8b1e0c1050499d1815f5ab16d04f0959b233085fb31653fbfc9d98f9" checksum = "c8d4a3bb8b1e0c1050499d1815f5ab16d04f0959b233085fb31653fbfc9d98f9"
[[package]]
name = "clipboard-win"
version = "5.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bde03770d3df201d4fb868f2c9c59e66a3e4e2bd06692a0fe701e7103c7e84d4"
dependencies = [
"error-code",
]
[[package]] [[package]]
name = "colorchoice" name = "colorchoice"
version = "1.0.5" version = "1.0.5"
@ -314,6 +332,17 @@ dependencies = [
"typenum", "typenum",
] ]
[[package]]
name = "ctrlc"
version = "3.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e0b1fab2ae45819af2d0731d60f2afe17227ebb1a1538a236da84c93e9a60162"
dependencies = [
"dispatch2",
"nix 0.31.2",
"windows-sys 0.61.2",
]
[[package]] [[package]]
name = "der" name = "der"
version = "0.7.10" version = "0.7.10"
@ -337,6 +366,18 @@ dependencies = [
"subtle", "subtle",
] ]
[[package]]
name = "dispatch2"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e0e367e4e7da84520dedcac1901e4da967309406d1e51017ae1abfb97adbd38"
dependencies = [
"bitflags",
"block2",
"libc",
"objc2",
]
[[package]] [[package]]
name = "displaydoc" name = "displaydoc"
version = "0.2.5" version = "0.2.5"
@ -363,6 +404,12 @@ dependencies = [
"serde", "serde",
] ]
[[package]]
name = "endian-type"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c34f04666d835ff5d62e058c3995147c06f42fe86ff053337632bca83e42702d"
[[package]] [[package]]
name = "equivalent" name = "equivalent"
version = "1.0.2" version = "1.0.2"
@ -379,6 +426,12 @@ dependencies = [
"windows-sys 0.61.2", "windows-sys 0.61.2",
] ]
[[package]]
name = "error-code"
version = "3.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dea2df4cf52843e0452895c455a1a2cfbb842a1e7329671acf418fdc53ed4c59"
[[package]] [[package]]
name = "etcetera" name = "etcetera"
version = "0.8.0" version = "0.8.0"
@ -401,6 +454,17 @@ dependencies = [
"pin-project-lite", "pin-project-lite",
] ]
[[package]]
name = "fd-lock"
version = "4.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ce92ff622d6dadf7349484f42c93271a0d49b7cc4d466a936405bacbe10aa78"
dependencies = [
"cfg-if",
"rustix",
"windows-sys 0.52.0",
]
[[package]] [[package]]
name = "find-msvc-tools" name = "find-msvc-tools"
version = "0.1.9" version = "0.1.9"
@ -950,6 +1014,12 @@ dependencies = [
"vcpkg", "vcpkg",
] ]
[[package]]
name = "linux-raw-sys"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "32a66949e030da00e8c7d4434b251670a91556f4144941d37452769c25d58a53"
[[package]] [[package]]
name = "litemap" name = "litemap"
version = "0.8.1" version = "0.8.1"
@ -1013,6 +1083,39 @@ dependencies = [
"windows-sys 0.61.2", "windows-sys 0.61.2",
] ]
[[package]]
name = "nibble_vec"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77a5d83df9f36fe23f0c3648c6bbb8b0298bb5f1939c8f2704431371f4b84d43"
dependencies = [
"smallvec",
]
[[package]]
name = "nix"
version = "0.29.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "71e2746dc3a24dd78b3cfcb7be93368c6de9963d30f43a6a73998a9cf4b17b46"
dependencies = [
"bitflags",
"cfg-if",
"cfg_aliases",
"libc",
]
[[package]]
name = "nix"
version = "0.31.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5d6d0705320c1e6ba1d912b5e37cf18071b6c2e9b7fa8215a1e8a7651966f5d3"
dependencies = [
"bitflags",
"cfg-if",
"cfg_aliases",
"libc",
]
[[package]] [[package]]
name = "nu-ansi-term" name = "nu-ansi-term"
version = "0.50.3" version = "0.50.3"
@ -1068,6 +1171,21 @@ dependencies = [
"libm", "libm",
] ]
[[package]]
name = "objc2"
version = "0.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3a12a8ed07aefc768292f076dc3ac8c48f3781c8f2d5851dd3d98950e8c5a89f"
dependencies = [
"objc2-encode",
]
[[package]]
name = "objc2-encode"
version = "4.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ef25abbcd74fb2609453eb695bd2f860d389e457f67dc17cafc8b8cbc89d0c33"
[[package]] [[package]]
name = "once_cell" name = "once_cell"
version = "1.21.4" version = "1.21.4"
@ -1282,6 +1400,16 @@ version = "6.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8dcc9c7d52a811697d2151c701e0d08956f92b0e24136cf4cf27b57a6a0d9bf" checksum = "f8dcc9c7d52a811697d2151c701e0d08956f92b0e24136cf4cf27b57a6a0d9bf"
[[package]]
name = "radix_trie"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c069c179fcdc6a2fe24d8d18305cf085fdbd4f922c041943e203685d6a1c58fd"
dependencies = [
"endian-type",
"nibble_vec",
]
[[package]] [[package]]
name = "rand" name = "rand"
version = "0.8.5" version = "0.8.5"
@ -1454,6 +1582,19 @@ version = "2.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d" checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d"
[[package]]
name = "rustix"
version = "1.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6fe4565b9518b83ef4f91bb47ce29620ca828bd32cb7e408f0062e9930ba190"
dependencies = [
"bitflags",
"errno",
"libc",
"linux-raw-sys",
"windows-sys 0.61.2",
]
[[package]] [[package]]
name = "rustls" name = "rustls"
version = "0.23.37" version = "0.23.37"
@ -1495,6 +1636,28 @@ version = "1.0.22"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b39cdef0fa800fc44525c84ccb54a029961a8215f9619753635a9c0d2538d46d" checksum = "b39cdef0fa800fc44525c84ccb54a029961a8215f9619753635a9c0d2538d46d"
[[package]]
name = "rustyline"
version = "15.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2ee1e066dc922e513bda599c6ccb5f3bb2b0ea5870a579448f2622993f0a9a2f"
dependencies = [
"bitflags",
"cfg-if",
"clipboard-win",
"fd-lock",
"home",
"libc",
"log",
"memchr",
"nix 0.29.0",
"radix_trie",
"unicode-segmentation",
"unicode-width",
"utf8parse",
"windows-sys 0.59.0",
]
[[package]] [[package]]
name = "ryu" name = "ryu"
version = "1.0.23" version = "1.0.23"
@ -1923,9 +2086,12 @@ dependencies = [
"async-trait", "async-trait",
"chrono", "chrono",
"clap", "clap",
"ctrlc",
"reqwest", "reqwest",
"rustyline",
"serde", "serde",
"serde_json", "serde_json",
"sqlx",
"synops-common", "synops-common",
"thiserror", "thiserror",
"tokio", "tokio",
@ -2210,6 +2376,18 @@ version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7df058c713841ad818f1dc5d3fd88063241cc61f49f5fbea4b951e8cf5a8d71d" checksum = "7df058c713841ad818f1dc5d3fd88063241cc61f49f5fbea4b951e8cf5a8d71d"
[[package]]
name = "unicode-segmentation"
version = "1.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f6ccf251212114b54433ec949fd6a7841275f9ada20dddd2f29e9ceea4501493"
[[package]]
name = "unicode-width"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b4ac048d71ede7ee76d585517add45da530660ef4390e49b098733c6e897f254"
[[package]] [[package]]
name = "unicode-xid" name = "unicode-xid"
version = "0.2.6" version = "0.2.6"
@ -2533,6 +2711,15 @@ dependencies = [
"windows-targets 0.52.6", "windows-targets 0.52.6",
] ]
[[package]]
name = "windows-sys"
version = "0.59.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e38bc4d79ed67fd075bcc251a1c39b32a1776bbe92e5bef1f0bf1f8c531853b"
dependencies = [
"windows-targets 0.52.6",
]
[[package]] [[package]]
name = "windows-sys" name = "windows-sys"
version = "0.60.2" version = "0.60.2"

View file

@ -29,6 +29,9 @@ tracing-subscriber = { version = "0.3", features = ["env-filter"] }
# Shared Synops lib (PG, CAS, nodes/edges) # Shared Synops lib (PG, CAS, nodes/edges)
synops-common = { path = "../synops-common" } synops-common = { path = "../synops-common" }
# PostgreSQL (via synops-common, but we need PgPool type)
sqlx = { version = "0.8", features = ["runtime-tokio", "postgres"], default-features = false }
# Async trait for provider abstraction # Async trait for provider abstraction
async-trait = "0.1" async-trait = "0.1"

View file

@ -0,0 +1,450 @@
//! Grafintegrasjon — les/skriv noder og edges via PG.
//!
//! Kobler synops-agent til Synops-grafen:
//! - Plukk task-noder (node_kind: 'task') fra PG
//! - Oppdater oppgavestatus (open → active → done)
//! - Skriv tilbakemelding i oppdragets chat-node
//! - Krasj-deteksjon: frigjør tasks >60 min
//! - Spørring av noder og edges (synops_query-verktøy)
use chrono::{DateTime, Utc};
use serde_json::json;
use sqlx::PgPool;
use uuid::Uuid;
/// Agent node ID (claude-main).
const AGENT_NODE_ID: &str = "d3eebc99-9c0b-4ef8-bb6d-6bb9bd380a44";
/// Pick the highest-priority open task and claim it atomically.
///
/// Uses `FOR UPDATE SKIP LOCKED` to avoid contention with other agents.
/// Sets status → "active", agent_id, started_at.
pub async fn pick_task(pool: &PgPool) -> Result<Option<TaskInfo>, String> {
let agent_id = Uuid::parse_str(AGENT_NODE_ID).unwrap();
let now = Utc::now();
// Atomically pick and claim the task
let row = sqlx::query_as::<_, TaskRow>(
r#"
UPDATE nodes SET metadata = metadata
|| jsonb_build_object(
'status', 'active',
'agent_id', $1::text,
'started_at', $2::text
)
WHERE id = (
SELECT id FROM nodes
WHERE node_kind = 'task'
AND metadata->>'status' = 'open'
ORDER BY (metadata->>'priority')::int ASC NULLS LAST, created_at ASC
LIMIT 1
FOR UPDATE SKIP LOCKED
)
RETURNING id, title, content, metadata, created_at
"#,
)
.bind(agent_id.to_string())
.bind(now.to_rfc3339())
.fetch_optional(pool)
.await
.map_err(|e| format!("pick_task feilet: {e}"))?;
Ok(row.map(|r| TaskInfo {
id: r.id,
title: r.title,
content: r.content,
metadata: r.metadata.unwrap_or_default(),
created_at: r.created_at,
}))
}
/// Update a task's status (e.g. "done", "failed", "skipped").
pub async fn update_task_status(
pool: &PgPool,
task_id: Uuid,
status: &str,
result: Option<&str>,
) -> Result<(), String> {
let now = Utc::now();
let mut patch = json!({ "status": status });
if matches!(status, "done" | "failed" | "skipped") {
patch["completed_at"] = json!(now.to_rfc3339());
}
if let Some(r) = result {
patch["result"] = json!(r);
}
sqlx::query(
r#"
UPDATE nodes SET metadata = metadata || $1
WHERE id = $2 AND node_kind = 'task'
"#,
)
.bind(&patch)
.bind(task_id)
.execute(pool)
.await
.map_err(|e| format!("update_task_status feilet: {e}"))?;
Ok(())
}
/// Write a message in a task's discussion chat.
///
/// Finds the communication node linked via `has_discussion` edge,
/// then creates a content node as a message and links it.
pub async fn write_task_message(
pool: &PgPool,
task_id: Uuid,
message: &str,
) -> Result<Uuid, String> {
let agent_id = Uuid::parse_str(AGENT_NODE_ID).unwrap();
// Find or create discussion node for this task
let discussion_id = find_or_create_discussion(pool, task_id).await?;
// Create message node
let msg_id = Uuid::now_v7();
sqlx::query(
r#"
INSERT INTO nodes (id, node_kind, content, visibility, metadata, created_by)
VALUES ($1, 'message', $2, 'hidden', '{}', $3)
"#,
)
.bind(msg_id)
.bind(message)
.bind(agent_id)
.execute(pool)
.await
.map_err(|e| format!("write_task_message (node): {e}"))?;
// Link message → discussion via belongs_to
sqlx::query(
r#"
INSERT INTO edges (source_id, target_id, edge_type, system, created_by)
VALUES ($1, $2, 'belongs_to', true, $3)
"#,
)
.bind(msg_id)
.bind(discussion_id)
.bind(agent_id)
.execute(pool)
.await
.map_err(|e| format!("write_task_message (edge): {e}"))?;
Ok(msg_id)
}
/// Release stale tasks: active >60 min → back to open.
///
/// Returns the number of tasks released.
pub async fn release_stale_tasks(pool: &PgPool) -> Result<u64, String> {
let result = sqlx::query(
r#"
UPDATE nodes SET metadata = metadata
|| '{"status": "open"}'::jsonb
- 'agent_id'
- 'started_at'
WHERE node_kind = 'task'
AND metadata->>'status' = 'active'
AND (metadata->>'started_at')::timestamptz < now() - interval '60 minutes'
"#,
)
.execute(pool)
.await
.map_err(|e| format!("release_stale_tasks feilet: {e}"))?;
Ok(result.rows_affected())
}
/// Query nodes by kind, with optional status filter and limit.
pub async fn query_nodes(
pool: &PgPool,
node_kind: Option<&str>,
status: Option<&str>,
limit: i64,
) -> Result<Vec<NodeInfo>, String> {
let rows = sqlx::query_as::<_, NodeInfoRow>(
r#"
SELECT id, node_kind::text, title, visibility::text, metadata, created_at
FROM nodes
WHERE ($1::text IS NULL OR node_kind = $1)
AND ($2::text IS NULL OR metadata->>'status' = $2)
ORDER BY created_at DESC
LIMIT $3
"#,
)
.bind(node_kind)
.bind(status)
.bind(limit)
.fetch_all(pool)
.await
.map_err(|e| format!("query_nodes feilet: {e}"))?;
Ok(rows
.into_iter()
.map(|r| NodeInfo {
id: r.id,
node_kind: r.node_kind,
title: r.title,
visibility: r.visibility,
metadata: r.metadata,
created_at: r.created_at,
})
.collect())
}
/// Query edges for a node (both outgoing and incoming).
pub async fn query_edges(
pool: &PgPool,
node_id: Uuid,
direction: EdgeDirection,
edge_type: Option<&str>,
limit: i64,
) -> Result<Vec<EdgeInfo>, String> {
let rows = match direction {
EdgeDirection::Outgoing => {
sqlx::query_as::<_, EdgeInfoRow>(
r#"
SELECT e.id, e.source_id, e.target_id, e.edge_type, e.metadata, e.created_at,
n.title as other_title, n.node_kind::text as other_kind
FROM edges e
JOIN nodes n ON n.id = e.target_id
WHERE e.source_id = $1
AND ($2::text IS NULL OR e.edge_type = $2)
ORDER BY e.created_at DESC
LIMIT $3
"#,
)
.bind(node_id)
.bind(edge_type)
.bind(limit)
.fetch_all(pool)
.await
}
EdgeDirection::Incoming => {
sqlx::query_as::<_, EdgeInfoRow>(
r#"
SELECT e.id, e.source_id, e.target_id, e.edge_type, e.metadata, e.created_at,
n.title as other_title, n.node_kind::text as other_kind
FROM edges e
JOIN nodes n ON n.id = e.source_id
WHERE e.target_id = $1
AND ($2::text IS NULL OR e.edge_type = $2)
ORDER BY e.created_at DESC
LIMIT $3
"#,
)
.bind(node_id)
.bind(edge_type)
.bind(limit)
.fetch_all(pool)
.await
}
EdgeDirection::Both => {
sqlx::query_as::<_, EdgeInfoRow>(
r#"
SELECT e.id, e.source_id, e.target_id, e.edge_type, e.metadata, e.created_at,
CASE WHEN e.source_id = $1 THEN n2.title ELSE n1.title END as other_title,
CASE WHEN e.source_id = $1 THEN n2.node_kind::text ELSE n1.node_kind::text END as other_kind
FROM edges e
JOIN nodes n1 ON n1.id = e.source_id
JOIN nodes n2 ON n2.id = e.target_id
WHERE (e.source_id = $1 OR e.target_id = $1)
AND ($2::text IS NULL OR e.edge_type = $2)
ORDER BY e.created_at DESC
LIMIT $3
"#,
)
.bind(node_id)
.bind(edge_type)
.bind(limit)
.fetch_all(pool)
.await
}
}
.map_err(|e| format!("query_edges feilet: {e}"))?;
Ok(rows
.into_iter()
.map(|r| EdgeInfo {
id: r.id,
source_id: r.source_id,
target_id: r.target_id,
edge_type: r.edge_type,
metadata: r.metadata,
created_at: r.created_at,
other_title: r.other_title,
other_kind: r.other_kind,
})
.collect())
}
/// Read a single node by ID.
pub async fn get_node(pool: &PgPool, node_id: Uuid) -> Result<Option<NodeInfo>, String> {
let row = sqlx::query_as::<_, NodeInfoRow>(
r#"
SELECT id, node_kind::text, title, visibility::text, metadata, created_at
FROM nodes WHERE id = $1
"#,
)
.bind(node_id)
.fetch_optional(pool)
.await
.map_err(|e| format!("get_node feilet: {e}"))?;
Ok(row.map(|r| NodeInfo {
id: r.id,
node_kind: r.node_kind,
title: r.title,
visibility: r.visibility,
metadata: r.metadata,
created_at: r.created_at,
}))
}
// ============================================================================
// Internal helpers
// ============================================================================
/// Find or create a discussion (communication) node for a task/assignment.
async fn find_or_create_discussion(pool: &PgPool, task_id: Uuid) -> Result<Uuid, String> {
let agent_id = Uuid::parse_str(AGENT_NODE_ID).unwrap();
// Look for existing has_discussion edge
let existing = sqlx::query_scalar::<_, Uuid>(
r#"
SELECT target_id FROM edges
WHERE source_id = $1 AND edge_type = 'has_discussion'
LIMIT 1
"#,
)
.bind(task_id)
.fetch_optional(pool)
.await
.map_err(|e| format!("find_discussion: {e}"))?;
if let Some(id) = existing {
return Ok(id);
}
// Create a new communication node
let disc_id = Uuid::now_v7();
sqlx::query(
r#"
INSERT INTO nodes (id, node_kind, title, visibility, metadata, created_by)
VALUES ($1, 'communication', 'Diskusjon', 'hidden', '{"type":"discussion"}', $2)
"#,
)
.bind(disc_id)
.bind(agent_id)
.execute(pool)
.await
.map_err(|e| format!("create discussion node: {e}"))?;
// Link task → discussion
sqlx::query(
r#"
INSERT INTO edges (source_id, target_id, edge_type, system, created_by)
VALUES ($1, $2, 'has_discussion', true, $3)
"#,
)
.bind(task_id)
.bind(disc_id)
.bind(agent_id)
.execute(pool)
.await
.map_err(|e| format!("create discussion edge: {e}"))?;
// Make agent a member of the discussion
sqlx::query(
r#"
INSERT INTO edges (source_id, target_id, edge_type, system, created_by)
VALUES ($1, $2, 'member_of', true, $3)
ON CONFLICT (source_id, target_id, edge_type) DO NOTHING
"#,
)
.bind(agent_id)
.bind(disc_id)
.bind(agent_id)
.execute(pool)
.await
.map_err(|e| format!("join discussion: {e}"))?;
Ok(disc_id)
}
// ============================================================================
// Types
// ============================================================================
#[derive(Debug)]
pub struct TaskInfo {
pub id: Uuid,
pub title: Option<String>,
pub content: Option<String>,
pub metadata: serde_json::Value,
pub created_at: DateTime<Utc>,
}
#[derive(Debug)]
pub struct NodeInfo {
pub id: Uuid,
pub node_kind: String,
pub title: Option<String>,
pub visibility: String,
pub metadata: Option<serde_json::Value>,
pub created_at: DateTime<Utc>,
}
#[derive(Debug, Clone, Copy)]
pub enum EdgeDirection {
Outgoing,
Incoming,
Both,
}
#[derive(Debug)]
pub struct EdgeInfo {
pub id: Uuid,
pub source_id: Uuid,
pub target_id: Uuid,
pub edge_type: String,
pub metadata: Option<serde_json::Value>,
pub created_at: DateTime<Utc>,
pub other_title: Option<String>,
pub other_kind: Option<String>,
}
// sqlx FromRow types (private)
#[derive(sqlx::FromRow)]
struct TaskRow {
id: Uuid,
title: Option<String>,
content: Option<String>,
metadata: Option<serde_json::Value>,
created_at: DateTime<Utc>,
}
#[derive(sqlx::FromRow)]
struct NodeInfoRow {
id: Uuid,
node_kind: String,
title: Option<String>,
visibility: String,
metadata: Option<serde_json::Value>,
created_at: DateTime<Utc>,
}
#[derive(sqlx::FromRow)]
struct EdgeInfoRow {
id: Uuid,
source_id: Uuid,
target_id: Uuid,
edge_type: String,
metadata: Option<serde_json::Value>,
created_at: DateTime<Utc>,
other_title: Option<String>,
other_kind: Option<String>,
}

View file

@ -10,6 +10,7 @@
mod context; mod context;
mod git; mod git;
mod graph;
mod provider; mod provider;
mod tools; mod tools;
@ -19,6 +20,7 @@ use provider::{
ApiKeys, LlmProvider, Message, RetryConfig, TokenUsage, ApiKeys, LlmProvider, Message, RetryConfig, TokenUsage,
calculate_cost, complete_with_retry, create_provider, calculate_cost, complete_with_retry, create_provider,
}; };
use sqlx::PgPool;
use std::collections::HashMap; use std::collections::HashMap;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::Arc; use std::sync::Arc;
@ -116,6 +118,8 @@ struct AgentSession {
verbose: bool, verbose: bool,
/// Set to true by Ctrl+C handler during a turn; checked by tool execution. /// Set to true by Ctrl+C handler during a turn; checked by tool execution.
interrupted: Arc<AtomicBool>, interrupted: Arc<AtomicBool>,
/// Optional PG pool for graph integration (synops_query tool, task picking).
pg_pool: Option<PgPool>,
} }
/// Whether to use plan mode for a task. /// Whether to use plan mode for a task.
@ -260,7 +264,7 @@ impl AgentSession {
tracing::info!(tool = tc.function.name, "Kjører verktøy"); tracing::info!(tool = tc.function.name, "Kjører verktøy");
let result = let result =
tools::execute_tool(&tc.function.name, &args, &self.working_dir).await; tools::execute_tool(&tc.function.name, &args, &self.working_dir, self.pg_pool.as_ref()).await;
if self.verbose { if self.verbose {
let preview = if result.len() > 200 { let preview = if result.len() > 200 {
@ -507,6 +511,26 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
..Default::default() ..Default::default()
}; };
// Connect to PG if DATABASE_URL is set (optional — tools degrade gracefully)
let pg_pool = match synops_common::db::connect().await {
Ok(pool) => {
tracing::info!("PG tilkoblet (grafintegrasjon aktiv)");
// Release stale tasks on startup
match graph::release_stale_tasks(&pool).await {
Ok(0) => {}
Ok(n) => tracing::warn!(count = n, "Frigjorde stale tasks (>60 min)"),
Err(e) => tracing::warn!(error = %e, "Krasj-deteksjon feilet"),
}
Some(pool)
}
Err(e) => {
tracing::info!(reason = %e, "PG ikke tilgjengelig — synops_query deaktivert");
None
}
};
tracing::info!( tracing::info!(
context_window = compaction_config.context_window, context_window = compaction_config.context_window,
max_cost = cli max_cost = cli
@ -514,6 +538,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.map(|c| format!("${:.2}", c)) .map(|c| format!("${:.2}", c))
.as_deref() .as_deref()
.unwrap_or("unlimited"), .unwrap_or("unlimited"),
pg = pg_pool.is_some(),
"Agent konfigurert" "Agent konfigurert"
); );
@ -544,6 +569,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
max_cost: cli.max_cost, max_cost: cli.max_cost,
verbose: cli.verbose, verbose: cli.verbose,
interrupted: interrupted.clone(), interrupted: interrupted.clone(),
pg_pool,
}; };
// Git: branch-per-task // Git: branch-per-task

View file

@ -1,9 +1,11 @@
//! Agent tools — file operations, shell, search. //! Agent tools — file operations, shell, search, graph queries.
//! //!
//! Each tool returns a string result that gets sent back to the LLM. //! Each tool returns a string result that gets sent back to the LLM.
use crate::git; use crate::git;
use crate::graph;
use crate::provider::{ToolDef, FunctionDef}; use crate::provider::{ToolDef, FunctionDef};
use sqlx::PgPool;
use std::path::Path; use std::path::Path;
use tokio::process::Command; use tokio::process::Command;
@ -12,6 +14,7 @@ pub async fn execute_tool(
name: &str, name: &str,
args: &serde_json::Value, args: &serde_json::Value,
working_dir: &Path, working_dir: &Path,
pg_pool: Option<&PgPool>,
) -> String { ) -> String {
let result = match name { let result = match name {
"read_file" => read_file(args, working_dir).await, "read_file" => read_file(args, working_dir).await,
@ -22,6 +25,7 @@ pub async fn execute_tool(
"glob" => glob_search(args, working_dir).await, "glob" => glob_search(args, working_dir).await,
"list_files" => list_files(args, working_dir).await, "list_files" => list_files(args, working_dir).await,
"git" => git_tool(args, working_dir).await, "git" => git_tool(args, working_dir).await,
"synops_query" => synops_query(args, pg_pool).await,
_ => Err(format!("Unknown tool: {}", name)), _ => Err(format!("Unknown tool: {}", name)),
}; };
@ -165,6 +169,30 @@ pub fn tool_definitions() -> Vec<ToolDef> {
}), }),
}, },
}, },
ToolDef {
r#type: "function".into(),
function: FunctionDef {
name: "synops_query".into(),
description: "Query Synops graph (nodes and edges). Use action 'nodes' to list nodes, 'edges' to list edges for a node, 'get' to read a single node.".into(),
parameters: serde_json::json!({
"type": "object",
"properties": {
"action": {
"type": "string",
"description": "Query action",
"enum": ["nodes", "edges", "get"]
},
"node_id": { "type": "string", "description": "Node UUID (for 'edges' and 'get')" },
"node_kind": { "type": "string", "description": "Filter by node_kind (for 'nodes')" },
"status": { "type": "string", "description": "Filter by metadata.status (for 'nodes')" },
"edge_type": { "type": "string", "description": "Filter by edge_type (for 'edges')" },
"direction": { "type": "string", "description": "Edge direction: outgoing, incoming, both (default: both)", "enum": ["outgoing", "incoming", "both"] },
"limit": { "type": "integer", "description": "Max results (default: 20)" }
},
"required": ["action"]
}),
},
},
] ]
} }
@ -440,3 +468,99 @@ fn resolve_path(file_path: &str, working_dir: &Path) -> std::path::PathBuf {
working_dir.join(p) working_dir.join(p)
} }
} }
async fn synops_query(
args: &serde_json::Value,
pg_pool: Option<&PgPool>,
) -> Result<String, String> {
let pool = pg_pool.ok_or("PG not connected (DATABASE_URL not set)")?;
let action = args["action"].as_str().ok_or("action is required")?;
let limit = args["limit"].as_i64().unwrap_or(20);
match action {
"nodes" => {
let node_kind = args["node_kind"].as_str();
let status = args["status"].as_str();
let nodes = graph::query_nodes(pool, node_kind, status, limit).await?;
if nodes.is_empty() {
return Ok("No nodes found.".into());
}
let mut out = String::new();
for n in &nodes {
let status = n
.metadata
.as_ref()
.and_then(|m| m.get("status"))
.and_then(|s| s.as_str())
.unwrap_or("-");
out.push_str(&format!(
"{} | {} | {} | {} | {}\n",
n.id,
n.node_kind,
n.title.as_deref().unwrap_or("(untitled)"),
status,
n.created_at.format("%Y-%m-%d %H:%M"),
));
}
Ok(out)
}
"edges" => {
let node_id_str = args["node_id"].as_str().ok_or("node_id is required for 'edges'")?;
let node_id = uuid::Uuid::parse_str(node_id_str)
.map_err(|e| format!("Invalid UUID: {e}"))?;
let edge_type = args["edge_type"].as_str();
let direction = match args["direction"].as_str().unwrap_or("both") {
"outgoing" => graph::EdgeDirection::Outgoing,
"incoming" => graph::EdgeDirection::Incoming,
_ => graph::EdgeDirection::Both,
};
let edges = graph::query_edges(pool, node_id, direction, edge_type, limit).await?;
if edges.is_empty() {
return Ok("No edges found.".into());
}
let mut out = String::new();
for e in &edges {
out.push_str(&format!(
"{} --[{}]--> {} | {} ({})\n",
e.source_id,
e.edge_type,
e.target_id,
e.other_title.as_deref().unwrap_or("(untitled)"),
e.other_kind.as_deref().unwrap_or("?"),
));
}
Ok(out)
}
"get" => {
let node_id_str = args["node_id"].as_str().ok_or("node_id is required for 'get'")?;
let node_id = uuid::Uuid::parse_str(node_id_str)
.map_err(|e| format!("Invalid UUID: {e}"))?;
let node = graph::get_node(pool, node_id)
.await?
.ok_or_else(|| format!("Node not found: {node_id}"))?;
let meta_str = node
.metadata
.as_ref()
.map(|m| serde_json::to_string_pretty(m).unwrap_or_default())
.unwrap_or_default();
Ok(format!(
"ID: {}\nKind: {}\nTitle: {}\nVisibility: {}\nCreated: {}\nMetadata: {}",
node.id,
node.node_kind,
node.title.as_deref().unwrap_or("(untitled)"),
node.visibility,
node.created_at.format("%Y-%m-%d %H:%M"),
meta_str,
))
}
_ => Err(format!("Unknown action: {action}. Use 'nodes', 'edges', or 'get'.")),
}
}