From 26f03ef21d37ad108eff72c02e3c290f6eace1f1 Mon Sep 17 00:00:00 2001 From: vegard Date: Wed, 18 Mar 2026 16:53:59 +0000 Subject: [PATCH] Trigger-evaluering i portvokteren (oppgave 24.2) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Ved node/edge-events fra PG LISTEN/NOTIFY evaluerer portvokteren nå om noen orchestration-noder matcher triggeren. Implementert som non-blocking async task som ikke blokkerer WebSocket-flyten. Ny modul orchestration_trigger.rs: - Mapper NOTIFY-events til trigger-typer (node.created, edge.created) - Effektiv lookup via funksjonell B-tree-indeks på metadata->trigger->event - Evaluerer observes-edges (eksplisitt) vs conditions (implisitt) - Betingelser: node_kind, edge_type, has_trait, has_tag (AND-logikk) - Legger matchende orkestreringer i jobbkøen som "orchestrate"-jobb Ny migration 021: indeks for trigger-event lookup på orchestration-noder. Jobbkø-dispatcher håndterer "orchestrate" med placeholder (24.3 implementerer utførelse). Verifisert: content-node trigrer matching orchestration, communication-node hoppes over. --- maskinrommet/src/jobs.rs | 17 + maskinrommet/src/main.rs | 1 + maskinrommet/src/orchestration_trigger.rs | 312 ++++++++++++++++++ maskinrommet/src/ws.rs | 45 +++ .../021_orchestration_trigger_index.sql | 13 + tasks.md | 3 +- 6 files changed, 389 insertions(+), 2 deletions(-) create mode 100644 maskinrommet/src/orchestration_trigger.rs create mode 100644 migrations/021_orchestration_trigger_index.sql diff --git a/maskinrommet/src/jobs.rs b/maskinrommet/src/jobs.rs index 16c3d80..a6e0fdd 100644 --- a/maskinrommet/src/jobs.rs +++ b/maskinrommet/src/jobs.rs @@ -217,6 +217,23 @@ async fn dispatch( "pg_delete_edge" => { pg_writes::handle_delete_edge(job, db, index_cache).await } + // Orchestration: trigger-evaluering har lagt jobben i kø, + // men utførelsen implementeres i oppgave 24.3. + // Foreløpig logger vi og returnerer OK. + "orchestrate" => { + let orch_id = job.payload.get("orchestration_id") + .and_then(|v| v.as_str()) + .unwrap_or("ukjent"); + tracing::info!( + orchestration_id = %orch_id, + "Orchestrate-jobb mottatt (utførelse kommer i oppgave 24.3)" + ); + Ok(serde_json::json!({ + "status": "pending_implementation", + "orchestration_id": orch_id, + "message": "Orchestration execution not yet implemented (task 24.3)" + })) + } other => Err(format!("Ukjent jobbtype: {other}")), } } diff --git a/maskinrommet/src/main.rs b/maskinrommet/src/main.rs index 335b20b..0eaf97d 100644 --- a/maskinrommet/src/main.rs +++ b/maskinrommet/src/main.rs @@ -25,6 +25,7 @@ mod serving; pub mod summarize; pub mod ws; pub mod mixer; +pub mod orchestration_trigger; pub mod tiptap; pub mod transcribe; pub mod tts; diff --git a/maskinrommet/src/orchestration_trigger.rs b/maskinrommet/src/orchestration_trigger.rs new file mode 100644 index 0000000..93666cf --- /dev/null +++ b/maskinrommet/src/orchestration_trigger.rs @@ -0,0 +1,312 @@ +//! Trigger-evaluering for orchestration-noder. +//! +//! Ved node/edge-events fra PG LISTEN/NOTIFY evaluerer vi om noen +//! orchestration-noder matcher triggeren. Matchende orkestreringer +//! legges i jobbkøen for utførelse (oppgave 24.3). +//! +//! Designprinsipper: +//! - Effektiv lookup via indeks på `metadata->'trigger'->>'event'` +//! - Ingen LLM-kall — ren deterministisk evaluering +//! - Non-blocking — spawner async task, blokkerer ikke NOTIFY-loopen +//! - `observes`-edge overtrumfer conditions (eksplisitt > implisitt) +//! +//! Ref: docs/concepts/orkestrering.md § 5–6 + +use sqlx::PgPool; +use uuid::Uuid; + +/// Kontekst for en trigger-event — informasjonen som orkestreringen +/// trenger for å evaluere betingelser og bygge payload. +#[derive(Debug, Clone)] +pub struct TriggerContext { + /// Trigger-event-type (f.eks. "node.created", "edge.created") + pub event: String, + /// Primær node-ID som utløste eventet (node selv, eller source/target) + pub node_id: Option, + /// Node-kind (f.eks. "content", "communication") — for node_kind-betingelse + pub node_kind: Option, + /// Edge-type (for edge-events) + pub edge_type: Option, + /// Source-ID (for edge-events) + pub source_id: Option, + /// Target-ID (for edge-events) + pub target_id: Option, + /// Operasjon (INSERT, UPDATE, DELETE) + pub op: String, +} + +/// Matchende orchestration-node med metadata for jobb-oppretting. +#[derive(Debug, sqlx::FromRow)] +struct OrchestrationMatch { + id: Uuid, + metadata: serde_json::Value, +} + +/// Spawner trigger-evaluering som en async task. +/// Blokkerer ikke NOTIFY-loopen — feil logges men propageres ikke. +pub fn spawn_trigger_evaluation(db: PgPool, ctx: TriggerContext) { + tokio::spawn(async move { + if let Err(e) = evaluate_triggers(&db, &ctx).await { + tracing::error!( + event = %ctx.event, + error = %e, + "Trigger-evaluering feilet" + ); + } + }); +} + +/// Evaluerer alle orchestration-noder som matcher et gitt trigger-event. +/// +/// Flyt: +/// 1. Finn orchestration-noder med matchende `metadata.trigger.event` +/// 2. For hver: sjekk `observes`-edge (eksplisitt kobling) +/// 3. For hver: evaluer `conditions` mot trigger-konteksten +/// 4. Legg matchende i jobbkøen som `orchestrate`-jobb +async fn evaluate_triggers(db: &PgPool, ctx: &TriggerContext) -> Result<(), String> { + // Steg 1: Finn alle orchestration-noder med matchende trigger-event. + // Bruker den funksjonelle indeksen idx_nodes_orchestration_trigger_event. + let candidates = sqlx::query_as::<_, OrchestrationMatch>( + r#" + SELECT id, metadata + FROM nodes + WHERE node_kind = 'orchestration' + AND metadata -> 'trigger' ->> 'event' = $1 + "#, + ) + .bind(&ctx.event) + .fetch_all(db) + .await + .map_err(|e| format!("Feil ved henting av orchestration-kandidater: {e}"))?; + + if candidates.is_empty() { + return Ok(()); + } + + tracing::debug!( + event = %ctx.event, + candidates = candidates.len(), + "Fant orchestration-kandidater for trigger" + ); + + for candidate in &candidates { + match should_trigger(db, candidate, ctx).await { + Ok(true) => { + enqueue_orchestration(db, candidate, ctx).await?; + } + Ok(false) => { + tracing::debug!( + orchestration_id = %candidate.id, + "Orchestration matchet ikke betingelser, hoppes over" + ); + } + Err(e) => { + tracing::warn!( + orchestration_id = %candidate.id, + error = %e, + "Feil ved evaluering av orchestration-betingelser" + ); + } + } + } + + Ok(()) +} + +/// Avgjør om en orchestration-node skal trigges, basert på: +/// 1. `observes`-edge — eksplisitt kobling til triggerende node +/// 2. `conditions` — implisitt matching mot trigger-kontekst +/// +/// Logikk (fra docs/concepts/orkestrering.md § 9): +/// - Hvis orchestration har `observes`-edges: trigger KUN hvis en peker til triggerende node +/// - Hvis ingen `observes`-edges: evaluer `conditions` (implisitt matching) +async fn should_trigger( + db: &PgPool, + candidate: &OrchestrationMatch, + ctx: &TriggerContext, +) -> Result { + // Samle alle relevante node-IDer fra konteksten + let relevant_ids: Vec = [ctx.node_id, ctx.source_id, ctx.target_id] + .iter() + .filter_map(|id| *id) + .collect(); + + if relevant_ids.is_empty() { + // Ingen node-kontekst å matche mot — kan ikke evaluere + return Ok(false); + } + + // Sjekk om orchestration har noen `observes`-edges overhodet + let observes_count = sqlx::query_scalar::<_, i64>( + "SELECT COUNT(*) FROM edges WHERE source_id = $1 AND edge_type = 'observes'", + ) + .bind(candidate.id) + .fetch_one(db) + .await + .map_err(|e| format!("Feil ved sjekk av observes-edges: {e}"))?; + + if observes_count > 0 { + // Eksplisitt modus: sjekk om noen observes-edge peker til en relevant node + let observes_match = sqlx::query_scalar::<_, i64>( + r#" + SELECT COUNT(*) FROM edges + WHERE source_id = $1 + AND edge_type = 'observes' + AND target_id = ANY($2) + "#, + ) + .bind(candidate.id) + .bind(&relevant_ids) + .fetch_one(db) + .await + .map_err(|e| format!("Feil ved matching av observes-edges: {e}"))?; + + return Ok(observes_match > 0); + } + + // Implisitt modus: evaluer conditions + evaluate_conditions(db, candidate, ctx).await +} + +/// Evaluerer `metadata.trigger.conditions` mot trigger-konteksten. +/// +/// Kjente betingelser: +/// - `node_kind` — matcher node_kind fra konteksten +/// - `has_trait` — noden tilhører en samling med denne traiten +/// - `has_tag` — noden har en `tagged`-edge med denne verdien +/// - `edge_type` — matcher edge_type fra konteksten (for edge-events) +/// +/// Alle betingelser må matche (AND-logikk). +async fn evaluate_conditions( + db: &PgPool, + candidate: &OrchestrationMatch, + ctx: &TriggerContext, +) -> Result { + let conditions = match candidate + .metadata + .get("trigger") + .and_then(|t| t.get("conditions")) + .and_then(|c| c.as_object()) + { + Some(c) => c, + None => return Ok(true), // Ingen betingelser = alltid match + }; + + for (key, value) in conditions { + let val_str = match value.as_str() { + Some(s) => s, + None => continue, // Ignorer ikke-streng-verdier + }; + + let matched = match key.as_str() { + "node_kind" => ctx.node_kind.as_deref() == Some(val_str), + + "edge_type" => ctx.edge_type.as_deref() == Some(val_str), + + "has_trait" => { + if let Some(node_id) = ctx.node_id { + has_trait(db, node_id, val_str).await? + } else { + false + } + } + + "has_tag" => { + if let Some(node_id) = ctx.node_id { + has_tag(db, node_id, val_str).await? + } else { + false + } + } + + _ => { + tracing::debug!( + condition = key.as_str(), + "Ukjent trigger-betingelse, ignoreres" + ); + true // Ukjente betingelser blokkerer ikke + } + }; + + if !matched { + return Ok(false); + } + } + + Ok(true) +} + +/// Sjekker om en node tilhører en samling som har en bestemt trait. +/// Følger `belongs_to`-edge → samlingsnodiens `metadata.traits`. +async fn has_trait(db: &PgPool, node_id: Uuid, trait_name: &str) -> Result { + // Finn samlinger som noden tilhører, sjekk om noen har traiten + let count = sqlx::query_scalar::<_, i64>( + r#" + SELECT COUNT(*) FROM edges e + JOIN nodes n ON n.id = e.target_id + WHERE e.source_id = $1 + AND e.edge_type = 'belongs_to' + AND n.node_kind = 'collection' + AND n.metadata -> 'traits' ? $2 + "#, + ) + .bind(node_id) + .bind(trait_name) + .fetch_one(db) + .await + .map_err(|e| format!("Feil ved has_trait-sjekk: {e}"))?; + + Ok(count > 0) +} + +/// Sjekker om en node har en `tagged`-edge med en bestemt verdi. +async fn has_tag(db: &PgPool, node_id: Uuid, tag: &str) -> Result { + let count = sqlx::query_scalar::<_, i64>( + r#" + SELECT COUNT(*) FROM edges + WHERE source_id = $1 + AND edge_type = 'tagged' + AND metadata ->> 'tag' = $2 + "#, + ) + .bind(node_id) + .bind(tag) + .fetch_one(db) + .await + .map_err(|e| format!("Feil ved has_tag-sjekk: {e}"))?; + + Ok(count > 0) +} + +/// Legger en matchende orchestration i jobbkøen. +async fn enqueue_orchestration( + db: &PgPool, + candidate: &OrchestrationMatch, + ctx: &TriggerContext, +) -> Result<(), String> { + let payload = serde_json::json!({ + "orchestration_id": candidate.id.to_string(), + "trigger_event": ctx.event, + "trigger_context": { + "node_id": ctx.node_id.map(|id| id.to_string()), + "node_kind": ctx.node_kind, + "edge_type": ctx.edge_type, + "source_id": ctx.source_id.map(|id| id.to_string()), + "target_id": ctx.target_id.map(|id| id.to_string()), + "op": ctx.op, + } + }); + + // Prioritet 5 = normal (mellom batch-jobber og brukerforespørsler) + crate::jobs::enqueue(db, "orchestrate", payload, None, 5) + .await + .map_err(|e| format!("Feil ved enqueue av orchestration-jobb: {e}"))?; + + tracing::info!( + orchestration_id = %candidate.id, + event = %ctx.event, + "Orchestration trigget — jobb lagt i kø" + ); + + Ok(()) +} diff --git a/maskinrommet/src/ws.rs b/maskinrommet/src/ws.rs index 00b1b30..3aaa309 100644 --- a/maskinrommet/src/ws.rs +++ b/maskinrommet/src/ws.rs @@ -289,10 +289,55 @@ async fn pg_listen_loop(db: &PgPool, ws: &WsBroadcast) -> Result<(), sqlx::Error _ => continue, }; + // Trigger-evaluering: sjekk om orchestration-noder matcher eventet. + // Spawner async — blokkerer ikke NOTIFY-loopen. + if let Some(trigger_ctx) = build_trigger_context(&event) { + crate::orchestration_trigger::spawn_trigger_evaluation(db.clone(), trigger_ctx); + } + let _ = ws.tx.send(event); } } +/// Mapper et BroadcastEvent til en TriggerContext for orchestration-evaluering. +/// Returnerer None for events som ikke er relevante for triggere (access, mixer). +fn build_trigger_context(event: &BroadcastEvent) -> Option { + match event { + BroadcastEvent::NodeChanged { op, id, kind, .. } => { + let trigger_event = match op.as_str() { + "INSERT" => "node.created", + _ => return None, // UPDATE/DELETE er ikke trigger-events foreløpig + }; + Some(crate::orchestration_trigger::TriggerContext { + event: trigger_event.to_string(), + node_id: Some(*id), + node_kind: Some(kind.clone()), + edge_type: None, + source_id: None, + target_id: None, + op: op.clone(), + }) + } + BroadcastEvent::EdgeChanged { op, source_id, target_id, edge_type, .. } => { + let trigger_event = match op.as_str() { + "INSERT" => "edge.created", + _ => return None, + }; + Some(crate::orchestration_trigger::TriggerContext { + event: trigger_event.to_string(), + node_id: Some(*source_id), // source som primær node-kontekst + node_kind: None, + edge_type: Some(edge_type.clone()), + source_id: Some(*source_id), + target_id: Some(*target_id), + op: op.clone(), + }) + } + // Access- og mixer-events er ikke orchestration-triggere + BroadcastEvent::AccessChanged { .. } | BroadcastEvent::MixerChannelChanged { .. } => None, + } +} + // --------------------------------------------------------------------------- // Berikelse: hent full rad fra PG etter NOTIFY // --------------------------------------------------------------------------- diff --git a/migrations/021_orchestration_trigger_index.sql b/migrations/021_orchestration_trigger_index.sql new file mode 100644 index 0000000..ff4b203 --- /dev/null +++ b/migrations/021_orchestration_trigger_index.sql @@ -0,0 +1,13 @@ +-- Migration 021: Indeks for effektiv trigger-lookup på orchestration-noder. +-- +-- Portvokteren trenger å finne alle orchestration-noder som matcher et +-- gitt trigger-event (f.eks. "node.created") raskt ved hver NOTIFY. +-- Denne indeksen gjør det til en B-tree-lookup i stedet for full tabellscan. +-- +-- Ref: docs/concepts/orkestrering.md § 5 "Strukturert trigger" + +-- Funksjonell indeks: trekker ut trigger.event fra metadata-JSONB, +-- begrenset til orchestration-noder. +CREATE INDEX IF NOT EXISTS idx_nodes_orchestration_trigger_event + ON nodes ((metadata -> 'trigger' ->> 'event')) + WHERE node_kind = 'orchestration'; diff --git a/tasks.md b/tasks.md index 7f0e506..4190125 100644 --- a/tasks.md +++ b/tasks.md @@ -317,8 +317,7 @@ med fritekst-instruksjoner som boten utfører via function calling. Strukturerte automatisk eskalering av intelligens ved feil, kompilering av velprøvde mønstre. - [x] 24.1 Orchestration node-type: legg til `orchestration` i maskinrommets node-validering. Metadata-skjema: `trigger` (event + conditions), `executor`, `intelligence`, `effort`, `compiled`, `pipeline`. Valider trigger-events mot kjent liste. -- [~] 24.2 Trigger-evaluering i portvokteren: ved node/edge-events, sjekk om noen `orchestration`-noder matcher triggeren. Effektiv lookup (indeksert på `metadata.trigger.event`). Ingen LLM-kall for trigger-matching. - > Påbegynt: 2026-03-18T16:46 +- [x] 24.2 Trigger-evaluering i portvokteren: ved node/edge-events, sjekk om noen `orchestration`-noder matcher triggeren. Effektiv lookup (indeksert på `metadata.trigger.event`). Ingen LLM-kall for trigger-matching. - [ ] 24.3 Script-kompilator: parser menneskelig scriptspråk ("transkriber lydfilen (stor modell)") og kompilerer til tekniske CLI-kall. Matcher verb mot `cli_tool`-noders `aliases`, argumenter mot `args_hints`, variabler fra trigger-kontekst. Rust-stil kompileringsfeil med forslag. - [ ] 24.4 cli_tool alias-metadata: utvid alle `cli_tool`-noder med `aliases` (norske verb) og `args_hints` (menneskelige argumenter → CLI-flagg). Seed for alle eksisterende verktøy. - [ ] 24.5 Script-executor: vaktmesteren parser kompilert script og eksekverer steg sekvensielt via generisk dispatch. VED_FEIL-håndtering. Logger i `orchestration_log`.