//! Grafintegrasjon — les/skriv noder og edges via PG. //! //! Kobler synops-agent til Synops-grafen: //! - Plukk task-noder (node_kind: 'task') fra PG //! - Oppdater oppgavestatus (open → active → done) //! - Skriv tilbakemelding i oppdragets chat-node //! - Krasj-deteksjon: frigjør tasks >60 min //! - Spørring av noder og edges (synops_query-verktøy) use chrono::{DateTime, Utc}; use serde_json::json; use sqlx::PgPool; use uuid::Uuid; /// Agent node ID (claude-main). pub const AGENT_NODE_ID: &str = "d3eebc99-9c0b-4ef8-bb6d-6bb9bd380a44"; /// Pick the highest-priority open task and claim it atomically. /// /// Uses `FOR UPDATE SKIP LOCKED` to avoid contention with other agents. /// Sets status → "active", agent_id, started_at. pub async fn pick_task(pool: &PgPool) -> Result, String> { let agent_id = Uuid::parse_str(AGENT_NODE_ID).unwrap(); let now = Utc::now(); // Atomically pick and claim the task let row = sqlx::query_as::<_, TaskRow>( r#" UPDATE nodes SET metadata = metadata || jsonb_build_object( 'status', 'active', 'agent_id', $1::text, 'started_at', $2::text ) WHERE id = ( SELECT id FROM nodes WHERE node_kind = 'task' AND metadata->>'status' = 'open' ORDER BY (metadata->>'priority')::int ASC NULLS LAST, created_at ASC LIMIT 1 FOR UPDATE SKIP LOCKED ) RETURNING id, title, content, metadata, created_at "#, ) .bind(agent_id.to_string()) .bind(now.to_rfc3339()) .fetch_optional(pool) .await .map_err(|e| format!("pick_task feilet: {e}"))?; Ok(row.map(|r| TaskInfo { id: r.id, title: r.title, content: r.content, metadata: r.metadata.unwrap_or_default(), created_at: r.created_at, })) } /// Update a task's status (e.g. "done", "failed", "skipped"). pub async fn update_task_status( pool: &PgPool, task_id: Uuid, status: &str, result: Option<&str>, ) -> Result<(), String> { let now = Utc::now(); let mut patch = json!({ "status": status }); if matches!(status, "done" | "failed" | "skipped") { patch["completed_at"] = json!(now.to_rfc3339()); } if let Some(r) = result { patch["result"] = json!(r); } sqlx::query( r#" UPDATE nodes SET metadata = metadata || $1 WHERE id = $2 AND node_kind = 'task' "#, ) .bind(&patch) .bind(task_id) .execute(pool) .await .map_err(|e| format!("update_task_status feilet: {e}"))?; Ok(()) } /// Write a message in a task's discussion chat. /// /// Finds the communication node linked via `has_discussion` edge, /// then creates a content node as a message and links it. pub async fn write_task_message( pool: &PgPool, task_id: Uuid, message: &str, ) -> Result { let agent_id = Uuid::parse_str(AGENT_NODE_ID).unwrap(); // Find or create discussion node for this task let discussion_id = find_or_create_discussion(pool, task_id).await?; // Create message node let msg_id = Uuid::now_v7(); sqlx::query( r#" INSERT INTO nodes (id, node_kind, content, visibility, metadata, created_by) VALUES ($1, 'message', $2, 'hidden', '{}', $3) "#, ) .bind(msg_id) .bind(message) .bind(agent_id) .execute(pool) .await .map_err(|e| format!("write_task_message (node): {e}"))?; // Link message → discussion via belongs_to sqlx::query( r#" INSERT INTO edges (source_id, target_id, edge_type, system, created_by) VALUES ($1, $2, 'belongs_to', true, $3) "#, ) .bind(msg_id) .bind(discussion_id) .bind(agent_id) .execute(pool) .await .map_err(|e| format!("write_task_message (edge): {e}"))?; Ok(msg_id) } /// Release stale tasks: active >60 min → back to open. /// /// Returns the number of tasks released. pub async fn release_stale_tasks(pool: &PgPool) -> Result { let result = sqlx::query( r#" UPDATE nodes SET metadata = metadata || '{"status": "open"}'::jsonb - 'agent_id' - 'started_at' WHERE node_kind = 'task' AND metadata->>'status' = 'active' AND (metadata->>'started_at')::timestamptz < now() - interval '60 minutes' "#, ) .execute(pool) .await .map_err(|e| format!("release_stale_tasks feilet: {e}"))?; Ok(result.rows_affected()) } /// Query nodes by kind, with optional status filter and limit. pub async fn query_nodes( pool: &PgPool, node_kind: Option<&str>, status: Option<&str>, limit: i64, ) -> Result, String> { let rows = sqlx::query_as::<_, NodeInfoRow>( r#" SELECT id, node_kind::text, title, visibility::text, metadata, created_at FROM nodes WHERE ($1::text IS NULL OR node_kind = $1) AND ($2::text IS NULL OR metadata->>'status' = $2) ORDER BY created_at DESC LIMIT $3 "#, ) .bind(node_kind) .bind(status) .bind(limit) .fetch_all(pool) .await .map_err(|e| format!("query_nodes feilet: {e}"))?; Ok(rows .into_iter() .map(|r| NodeInfo { id: r.id, node_kind: r.node_kind, title: r.title, visibility: r.visibility, metadata: r.metadata, created_at: r.created_at, }) .collect()) } /// Query edges for a node (both outgoing and incoming). pub async fn query_edges( pool: &PgPool, node_id: Uuid, direction: EdgeDirection, edge_type: Option<&str>, limit: i64, ) -> Result, String> { let rows = match direction { EdgeDirection::Outgoing => { sqlx::query_as::<_, EdgeInfoRow>( r#" SELECT e.id, e.source_id, e.target_id, e.edge_type, e.metadata, e.created_at, n.title as other_title, n.node_kind::text as other_kind FROM edges e JOIN nodes n ON n.id = e.target_id WHERE e.source_id = $1 AND ($2::text IS NULL OR e.edge_type = $2) ORDER BY e.created_at DESC LIMIT $3 "#, ) .bind(node_id) .bind(edge_type) .bind(limit) .fetch_all(pool) .await } EdgeDirection::Incoming => { sqlx::query_as::<_, EdgeInfoRow>( r#" SELECT e.id, e.source_id, e.target_id, e.edge_type, e.metadata, e.created_at, n.title as other_title, n.node_kind::text as other_kind FROM edges e JOIN nodes n ON n.id = e.source_id WHERE e.target_id = $1 AND ($2::text IS NULL OR e.edge_type = $2) ORDER BY e.created_at DESC LIMIT $3 "#, ) .bind(node_id) .bind(edge_type) .bind(limit) .fetch_all(pool) .await } EdgeDirection::Both => { sqlx::query_as::<_, EdgeInfoRow>( r#" SELECT e.id, e.source_id, e.target_id, e.edge_type, e.metadata, e.created_at, CASE WHEN e.source_id = $1 THEN n2.title ELSE n1.title END as other_title, CASE WHEN e.source_id = $1 THEN n2.node_kind::text ELSE n1.node_kind::text END as other_kind FROM edges e JOIN nodes n1 ON n1.id = e.source_id JOIN nodes n2 ON n2.id = e.target_id WHERE (e.source_id = $1 OR e.target_id = $1) AND ($2::text IS NULL OR e.edge_type = $2) ORDER BY e.created_at DESC LIMIT $3 "#, ) .bind(node_id) .bind(edge_type) .bind(limit) .fetch_all(pool) .await } } .map_err(|e| format!("query_edges feilet: {e}"))?; Ok(rows .into_iter() .map(|r| EdgeInfo { id: r.id, source_id: r.source_id, target_id: r.target_id, edge_type: r.edge_type, metadata: r.metadata, created_at: r.created_at, other_title: r.other_title, other_kind: r.other_kind, }) .collect()) } /// Read a single node by ID. pub async fn get_node(pool: &PgPool, node_id: Uuid) -> Result, String> { let row = sqlx::query_as::<_, NodeInfoRow>( r#" SELECT id, node_kind::text, title, visibility::text, metadata, created_at FROM nodes WHERE id = $1 "#, ) .bind(node_id) .fetch_optional(pool) .await .map_err(|e| format!("get_node feilet: {e}"))?; Ok(row.map(|r| NodeInfo { id: r.id, node_kind: r.node_kind, title: r.title, visibility: r.visibility, metadata: r.metadata, created_at: r.created_at, })) } // ============================================================================ // Internal helpers // ============================================================================ /// Find or create a discussion (communication) node for a task/assignment. async fn find_or_create_discussion(pool: &PgPool, task_id: Uuid) -> Result { let agent_id = Uuid::parse_str(AGENT_NODE_ID).unwrap(); // Look for existing has_discussion edge let existing = sqlx::query_scalar::<_, Uuid>( r#" SELECT target_id FROM edges WHERE source_id = $1 AND edge_type = 'has_discussion' LIMIT 1 "#, ) .bind(task_id) .fetch_optional(pool) .await .map_err(|e| format!("find_discussion: {e}"))?; if let Some(id) = existing { return Ok(id); } // Create a new communication node let disc_id = Uuid::now_v7(); sqlx::query( r#" INSERT INTO nodes (id, node_kind, title, visibility, metadata, created_by) VALUES ($1, 'communication', 'Diskusjon', 'hidden', '{"type":"discussion"}', $2) "#, ) .bind(disc_id) .bind(agent_id) .execute(pool) .await .map_err(|e| format!("create discussion node: {e}"))?; // Link task → discussion sqlx::query( r#" INSERT INTO edges (source_id, target_id, edge_type, system, created_by) VALUES ($1, $2, 'has_discussion', true, $3) "#, ) .bind(task_id) .bind(disc_id) .bind(agent_id) .execute(pool) .await .map_err(|e| format!("create discussion edge: {e}"))?; // Make agent a member of the discussion sqlx::query( r#" INSERT INTO edges (source_id, target_id, edge_type, system, created_by) VALUES ($1, $2, 'member_of', true, $3) ON CONFLICT (source_id, target_id, edge_type) DO NOTHING "#, ) .bind(agent_id) .bind(disc_id) .bind(agent_id) .execute(pool) .await .map_err(|e| format!("join discussion: {e}"))?; Ok(disc_id) } // ============================================================================ // Daemon: chat polling and node creation // ============================================================================ /// Find or create the vaktmester communication node. /// /// Looks for a communication node titled "Vaktmester" owned by the agent. /// If none exists, creates one and makes the agent a member. pub async fn find_or_create_vaktmester_chat(pool: &PgPool) -> Result { let agent_id = Uuid::parse_str(AGENT_NODE_ID).unwrap(); // Look for existing vaktmester chat let existing = sqlx::query_scalar::<_, Uuid>( r#" SELECT n.id FROM nodes n JOIN edges e ON e.target_id = n.id AND e.source_id = $1 AND e.edge_type = 'member_of' WHERE n.node_kind = 'communication' AND n.metadata->>'type' = 'vaktmester' LIMIT 1 "#, ) .bind(agent_id) .fetch_optional(pool) .await .map_err(|e| format!("find vaktmester chat: {e}"))?; if let Some(id) = existing { return Ok(id); } // Create vaktmester communication node let chat_id = Uuid::now_v7(); sqlx::query( r#" INSERT INTO nodes (id, node_kind, title, visibility, metadata, created_by) VALUES ($1, 'communication', 'Vaktmester', 'discoverable', '{"type":"vaktmester"}', $2) "#, ) .bind(chat_id) .bind(agent_id) .execute(pool) .await .map_err(|e| format!("create vaktmester node: {e}"))?; // Agent is member_of sqlx::query( r#" INSERT INTO edges (source_id, target_id, edge_type, system, created_by) VALUES ($1, $2, 'member_of', true, $1) "#, ) .bind(agent_id) .bind(chat_id) .execute(pool) .await .map_err(|e| format!("agent join vaktmester: {e}"))?; tracing::info!(chat_id = %chat_id, "Opprettet vaktmester-chat"); Ok(chat_id) } /// A chat message from the vaktmester chat. #[derive(Debug)] pub struct ChatMessage { pub id: Uuid, pub content: String, pub created_by: Option, pub created_at: DateTime, } #[derive(sqlx::FromRow)] struct ChatMessageRow { id: Uuid, content: Option, created_by: Option, created_at: DateTime, } /// Poll for new messages in a communication node since a given timestamp. /// /// Returns messages not sent by the agent itself, ordered by time. pub async fn poll_chat_messages( pool: &PgPool, chat_id: Uuid, since: DateTime, ) -> Result, String> { let agent_id = Uuid::parse_str(AGENT_NODE_ID).unwrap(); let rows = sqlx::query_as::<_, ChatMessageRow>( r#" SELECT n.id, n.content, n.created_by, n.created_at FROM nodes n JOIN edges e ON e.source_id = n.id AND e.target_id = $1 AND e.edge_type = 'belongs_to' WHERE n.node_kind = 'message' AND n.created_at > $2 AND (n.created_by IS NULL OR n.created_by != $3) ORDER BY n.created_at ASC "#, ) .bind(chat_id) .bind(since) .bind(agent_id) .fetch_all(pool) .await .map_err(|e| format!("poll_chat_messages: {e}"))?; Ok(rows .into_iter() .filter_map(|r| { Some(ChatMessage { id: r.id, content: r.content?, created_by: r.created_by, created_at: r.created_at, }) }) .collect()) } /// Write a reply message in a communication node. pub async fn write_chat_reply( pool: &PgPool, chat_id: Uuid, message: &str, ) -> Result { let agent_id = Uuid::parse_str(AGENT_NODE_ID).unwrap(); let msg_id = Uuid::now_v7(); sqlx::query( r#" INSERT INTO nodes (id, node_kind, content, visibility, metadata, created_by) VALUES ($1, 'message', $2, 'hidden', '{}', $3) "#, ) .bind(msg_id) .bind(message) .bind(agent_id) .execute(pool) .await .map_err(|e| format!("write_chat_reply (node): {e}"))?; sqlx::query( r#" INSERT INTO edges (source_id, target_id, edge_type, system, created_by) VALUES ($1, $2, 'belongs_to', true, $3) "#, ) .bind(msg_id) .bind(chat_id) .bind(agent_id) .execute(pool) .await .map_err(|e| format!("write_chat_reply (edge): {e}"))?; // Notify frontend via PG NOTIFY let _ = sqlx::query("SELECT pg_notify('node_changes', $1)") .bind(serde_json::json!({ "type": "message", "node_id": msg_id.to_string(), "communication_id": chat_id.to_string(), }).to_string()) .execute(pool) .await; Ok(msg_id) } /// Create a proposal node. pub async fn create_proposal( pool: &PgPool, title: &str, description: &str, ) -> Result { let agent_id = Uuid::parse_str(AGENT_NODE_ID).unwrap(); let id = Uuid::now_v7(); sqlx::query( r#" INSERT INTO nodes (id, node_kind, title, content, visibility, metadata, created_by) VALUES ($1, 'proposal', $2, $3, 'internal', '{"status":"draft"}', $4) "#, ) .bind(id) .bind(title) .bind(description) .bind(agent_id) .execute(pool) .await .map_err(|e| format!("create_proposal: {e}"))?; Ok(id) } /// Create a task node. pub async fn create_task( pool: &PgPool, title: &str, description: &str, priority: i32, ) -> Result { let agent_id = Uuid::parse_str(AGENT_NODE_ID).unwrap(); let id = Uuid::now_v7(); sqlx::query( r#" INSERT INTO nodes (id, node_kind, title, content, visibility, metadata, created_by) VALUES ($1, 'task', $2, $3, 'internal', jsonb_build_object('status', 'open', 'priority', $4), $5) "#, ) .bind(id) .bind(title) .bind(description) .bind(priority) .bind(agent_id) .execute(pool) .await .map_err(|e| format!("create_task: {e}"))?; Ok(id) } /// Check if the agent is active (kill switch). pub async fn is_agent_active(pool: &PgPool) -> Result { let active: Option = sqlx::query_scalar( r#" SELECT is_active FROM agent_identities WHERE agent_key = 'claude-main' "#, ) .fetch_optional(pool) .await .map_err(|e| format!("is_agent_active: {e}"))?; Ok(active.unwrap_or(true)) } // ============================================================================ // Types // ============================================================================ #[derive(Debug)] pub struct TaskInfo { pub id: Uuid, pub title: Option, pub content: Option, pub metadata: serde_json::Value, pub created_at: DateTime, } #[derive(Debug)] pub struct NodeInfo { pub id: Uuid, pub node_kind: String, pub title: Option, pub visibility: String, pub metadata: Option, pub created_at: DateTime, } #[derive(Debug, Clone, Copy)] pub enum EdgeDirection { Outgoing, Incoming, Both, } #[derive(Debug)] pub struct EdgeInfo { pub id: Uuid, pub source_id: Uuid, pub target_id: Uuid, pub edge_type: String, pub metadata: Option, pub created_at: DateTime, pub other_title: Option, pub other_kind: Option, } // sqlx FromRow types (private) #[derive(sqlx::FromRow)] struct TaskRow { id: Uuid, title: Option, content: Option, metadata: Option, created_at: DateTime, } #[derive(sqlx::FromRow)] struct NodeInfoRow { id: Uuid, node_kind: String, title: Option, visibility: String, metadata: Option, created_at: DateTime, } #[derive(sqlx::FromRow)] struct EdgeInfoRow { id: Uuid, source_id: Uuid, target_id: Uuid, edge_type: String, metadata: Option, created_at: DateTime, other_title: Option, other_kind: Option, }