//! Trigger-evaluering for orchestration-noder. //! //! Ved node/edge-events fra PG LISTEN/NOTIFY evaluerer vi om noen //! orchestration-noder matcher triggeren. Matchende orkestreringer //! legges i jobbkøen for utførelse (oppgave 24.3). //! //! Designprinsipper: //! - Effektiv lookup via indeks på `metadata->'trigger'->>'event'` //! - Ingen LLM-kall — ren deterministisk evaluering //! - Non-blocking — spawner async task, blokkerer ikke NOTIFY-loopen //! - `observes`-edge overtrumfer conditions (eksplisitt > implisitt) //! //! Ref: docs/concepts/orkestrering.md § 5–6 use sqlx::PgPool; use uuid::Uuid; /// Kontekst for en trigger-event — informasjonen som orkestreringen /// trenger for å evaluere betingelser og bygge payload. #[derive(Debug, Clone)] pub struct TriggerContext { /// Trigger-event-type (f.eks. "node.created", "edge.created") pub event: String, /// Primær node-ID som utløste eventet (node selv, eller source/target) pub node_id: Option, /// Node-kind (f.eks. "content", "communication") — for node_kind-betingelse pub node_kind: Option, /// Edge-type (for edge-events) pub edge_type: Option, /// Source-ID (for edge-events) pub source_id: Option, /// Target-ID (for edge-events) pub target_id: Option, /// Operasjon (INSERT, UPDATE, DELETE) pub op: String, } /// Matchende orchestration-node med metadata for jobb-oppretting. #[derive(Debug, sqlx::FromRow)] struct OrchestrationMatch { id: Uuid, metadata: serde_json::Value, } /// Spawner trigger-evaluering som en async task. /// Blokkerer ikke NOTIFY-loopen — feil logges men propageres ikke. pub fn spawn_trigger_evaluation(db: PgPool, ctx: TriggerContext) { tokio::spawn(async move { if let Err(e) = evaluate_triggers(&db, &ctx).await { tracing::error!( event = %ctx.event, error = %e, "Trigger-evaluering feilet" ); } }); } /// Evaluerer alle orchestration-noder som matcher et gitt trigger-event. /// /// Flyt: /// 1. Finn orchestration-noder med matchende `metadata.trigger.event` /// 2. For hver: sjekk `observes`-edge (eksplisitt kobling) /// 3. For hver: evaluer `conditions` mot trigger-konteksten /// 4. Legg matchende i jobbkøen som `orchestrate`-jobb async fn evaluate_triggers(db: &PgPool, ctx: &TriggerContext) -> Result<(), String> { // Steg 1: Finn alle orchestration-noder med matchende trigger-event. // Bruker den funksjonelle indeksen idx_nodes_orchestration_trigger_event. let candidates = sqlx::query_as::<_, OrchestrationMatch>( r#" SELECT id, metadata FROM nodes WHERE node_kind = 'orchestration' AND metadata -> 'trigger' ->> 'event' = $1 "#, ) .bind(&ctx.event) .fetch_all(db) .await .map_err(|e| format!("Feil ved henting av orchestration-kandidater: {e}"))?; if candidates.is_empty() { return Ok(()); } tracing::debug!( event = %ctx.event, candidates = candidates.len(), "Fant orchestration-kandidater for trigger" ); for candidate in &candidates { match should_trigger(db, candidate, ctx).await { Ok(true) => { enqueue_orchestration(db, candidate, ctx).await?; } Ok(false) => { tracing::debug!( orchestration_id = %candidate.id, "Orchestration matchet ikke betingelser, hoppes over" ); } Err(e) => { tracing::warn!( orchestration_id = %candidate.id, error = %e, "Feil ved evaluering av orchestration-betingelser" ); } } } Ok(()) } /// Avgjør om en orchestration-node skal trigges, basert på: /// 1. `observes`-edge — eksplisitt kobling til triggerende node /// 2. `conditions` — implisitt matching mot trigger-kontekst /// /// Logikk (fra docs/concepts/orkestrering.md § 9): /// - Hvis orchestration har `observes`-edges: trigger KUN hvis en peker til triggerende node /// - Hvis ingen `observes`-edges: evaluer `conditions` (implisitt matching) async fn should_trigger( db: &PgPool, candidate: &OrchestrationMatch, ctx: &TriggerContext, ) -> Result { // Samle alle relevante node-IDer fra konteksten let relevant_ids: Vec = [ctx.node_id, ctx.source_id, ctx.target_id] .iter() .filter_map(|id| *id) .collect(); if relevant_ids.is_empty() { // Ingen node-kontekst å matche mot — kan ikke evaluere return Ok(false); } // Sjekk om orchestration har noen `observes`-edges overhodet let observes_count = sqlx::query_scalar::<_, i64>( "SELECT COUNT(*) FROM edges WHERE source_id = $1 AND edge_type = 'observes'", ) .bind(candidate.id) .fetch_one(db) .await .map_err(|e| format!("Feil ved sjekk av observes-edges: {e}"))?; if observes_count > 0 { // Eksplisitt modus: sjekk om noen observes-edge peker til en relevant node let observes_match = sqlx::query_scalar::<_, i64>( r#" SELECT COUNT(*) FROM edges WHERE source_id = $1 AND edge_type = 'observes' AND target_id = ANY($2) "#, ) .bind(candidate.id) .bind(&relevant_ids) .fetch_one(db) .await .map_err(|e| format!("Feil ved matching av observes-edges: {e}"))?; return Ok(observes_match > 0); } // Implisitt modus: evaluer conditions evaluate_conditions(db, candidate, ctx).await } /// Evaluerer `metadata.trigger.conditions` mot trigger-konteksten. /// /// Kjente betingelser: /// - `node_kind` — matcher node_kind fra konteksten /// - `has_trait` — noden tilhører en samling med denne traiten /// - `has_tag` — noden har en `tagged`-edge med denne verdien /// - `edge_type` — matcher edge_type fra konteksten (for edge-events) /// /// Alle betingelser må matche (AND-logikk). async fn evaluate_conditions( db: &PgPool, candidate: &OrchestrationMatch, ctx: &TriggerContext, ) -> Result { let conditions = match candidate .metadata .get("trigger") .and_then(|t| t.get("conditions")) .and_then(|c| c.as_object()) { Some(c) => c, None => return Ok(true), // Ingen betingelser = alltid match }; for (key, value) in conditions { let val_str = match value.as_str() { Some(s) => s, None => continue, // Ignorer ikke-streng-verdier }; let matched = match key.as_str() { "node_kind" => ctx.node_kind.as_deref() == Some(val_str), "edge_type" => ctx.edge_type.as_deref() == Some(val_str), "has_trait" => { if let Some(node_id) = ctx.node_id { has_trait(db, node_id, val_str).await? } else { false } } "has_tag" => { if let Some(node_id) = ctx.node_id { has_tag(db, node_id, val_str).await? } else { false } } _ => { tracing::debug!( condition = key.as_str(), "Ukjent trigger-betingelse, ignoreres" ); true // Ukjente betingelser blokkerer ikke } }; if !matched { return Ok(false); } } Ok(true) } /// Sjekker om en node tilhører en samling som har en bestemt trait. /// Følger `belongs_to`-edge → samlingsnodiens `metadata.traits`. async fn has_trait(db: &PgPool, node_id: Uuid, trait_name: &str) -> Result { // Finn samlinger som noden tilhører, sjekk om noen har traiten let count = sqlx::query_scalar::<_, i64>( r#" SELECT COUNT(*) FROM edges e JOIN nodes n ON n.id = e.target_id WHERE e.source_id = $1 AND e.edge_type = 'belongs_to' AND n.node_kind = 'collection' AND n.metadata -> 'traits' ? $2 "#, ) .bind(node_id) .bind(trait_name) .fetch_one(db) .await .map_err(|e| format!("Feil ved has_trait-sjekk: {e}"))?; Ok(count > 0) } /// Sjekker om en node har en `tagged`-edge med en bestemt verdi. async fn has_tag(db: &PgPool, node_id: Uuid, tag: &str) -> Result { let count = sqlx::query_scalar::<_, i64>( r#" SELECT COUNT(*) FROM edges WHERE source_id = $1 AND edge_type = 'tagged' AND metadata ->> 'tag' = $2 "#, ) .bind(node_id) .bind(tag) .fetch_one(db) .await .map_err(|e| format!("Feil ved has_tag-sjekk: {e}"))?; Ok(count > 0) } /// Legger en matchende orchestration i jobbkøen. async fn enqueue_orchestration( db: &PgPool, candidate: &OrchestrationMatch, ctx: &TriggerContext, ) -> Result<(), String> { let payload = serde_json::json!({ "orchestration_id": candidate.id.to_string(), "trigger_event": ctx.event, "trigger_context": { "node_id": ctx.node_id.map(|id| id.to_string()), "node_kind": ctx.node_kind, "edge_type": ctx.edge_type, "source_id": ctx.source_id.map(|id| id.to_string()), "target_id": ctx.target_id.map(|id| id.to_string()), "op": ctx.op, }, "cascade_chain": [] }); // Prioritet 5 = normal (mellom batch-jobber og brukerforespørsler) crate::jobs::enqueue(db, "orchestrate", payload, None, 5) .await .map_err(|e| format!("Feil ved enqueue av orchestration-jobb: {e}"))?; tracing::info!( orchestration_id = %candidate.id, event = %ctx.event, "Orchestration trigget — jobb lagt i kø" ); Ok(()) } // ============================================================================= // Kaskade: triggers-edge mellom orkestreringer (oppgave 24.8) // ============================================================================= /// Maks kaskadedybde — sikring mot dype kjeder selv uten syklus. const MAX_CASCADE_DEPTH: usize = 10; /// Etter vellykket orkestrering: finn `triggers`-edges til andre /// orkestreringer og legg dem i jobbkøen. /// /// Syklusdeteksjon: `cascade_chain` inneholder alle orchestration-IDer /// som allerede er utført i denne kaskaden. Hvis target allerede finnes /// i kjeden, logges det og target hoppes over. /// /// Output fra den fullførte orkestreringen sendes videre som /// `upstream_output` i trigger_context, slik at nedstrøms orkestreringer /// kan bruke `{event.upstream_*}`-variabler. pub async fn enqueue_cascade( db: &PgPool, completed_orchestration_id: Uuid, cascade_chain: &[String], pipeline_result: &serde_json::Value, ) -> Result { // Sjekk dybdegrense if cascade_chain.len() >= MAX_CASCADE_DEPTH { tracing::warn!( orchestration_id = %completed_orchestration_id, depth = cascade_chain.len(), max = MAX_CASCADE_DEPTH, "Kaskade-dybdegrense nådd — stopper videre kaskade" ); return Ok(0); } // Finn alle triggers-edges fra denne orkestreringen let targets = sqlx::query_as::<_, (Uuid,)>( r#" SELECT target_id FROM edges WHERE source_id = $1 AND edge_type = 'triggers' "#, ) .bind(completed_orchestration_id) .fetch_all(db) .await .map_err(|e| format!("Feil ved henting av triggers-edges: {e}"))?; if targets.is_empty() { return Ok(0); } // Bygg oppdatert kaskadekjede let mut new_chain = cascade_chain.to_vec(); new_chain.push(completed_orchestration_id.to_string()); let mut enqueued = 0; for (target_id,) in &targets { let target_str = target_id.to_string(); // Syklusdeteksjon: er target allerede i kjeden? if new_chain.contains(&target_str) { tracing::warn!( source = %completed_orchestration_id, target = %target_id, chain = ?new_chain, "Syklus oppdaget i kaskade — hopper over" ); // Logg syklusen i orchestration_log let _ = sqlx::query( r#" INSERT INTO orchestration_log (orchestration_id, job_id, step_number, tool_binary, args, is_fallback, status, error_msg, duration_ms) VALUES ($1, '00000000-0000-0000-0000-000000000000', 0, 'cascade', '[]'::jsonb, false, 'skipped', $2, 0) "#, ) .bind(completed_orchestration_id) .bind(format!( "Syklus oppdaget: {} allerede i kaskadekjede {:?}", target_id, new_chain )) .execute(db) .await; continue; } // Verifiser at target faktisk er en orchestration-node let is_orch = sqlx::query_scalar::<_, i64>( "SELECT COUNT(*) FROM nodes WHERE id = $1 AND node_kind = 'orchestration'" ) .bind(target_id) .fetch_one(db) .await .map_err(|e| format!("Feil ved verifisering av kaskade-mål: {e}"))?; if is_orch == 0 { tracing::warn!( target = %target_id, "triggers-edge peker til ikke-orchestration-node — hopper over" ); continue; } // Bygg payload med kaskade-kontekst let payload = serde_json::json!({ "orchestration_id": target_str, "trigger_event": "cascade", "trigger_context": { "node_id": pipeline_result.get("orchestration_id") .and_then(|v| v.as_str()), "node_kind": "orchestration", "op": "CASCADE", "upstream_orchestration_id": completed_orchestration_id.to_string(), "upstream_steps_ok": pipeline_result.get("steps_ok"), }, "cascade_chain": new_chain, }); crate::jobs::enqueue(db, "orchestrate", payload, None, 5) .await .map_err(|e| format!("Feil ved enqueue av kaskade-jobb: {e}"))?; tracing::info!( source = %completed_orchestration_id, target = %target_id, depth = new_chain.len(), "Kaskade: nedstrøms orkestrering lagt i kø" ); enqueued += 1; } Ok(enqueued) } /// Hjelpefunksjon for testing: sjekk om en kaskade ville blitt blokkert /// av syklusdeteksjon eller dybdegrense. /// /// Returnerer `Ok(true)` hvis kaskaden er trygg, `Err(grunn)` hvis blokkert. pub fn check_cascade_safety( completed_id: &str, target_id: &str, cascade_chain: &[String], ) -> Result { if cascade_chain.len() >= MAX_CASCADE_DEPTH { return Err(format!( "Dybdegrense ({MAX_CASCADE_DEPTH}) nådd ved kjede med {} elementer", cascade_chain.len() )); } let mut chain = cascade_chain.to_vec(); chain.push(completed_id.to_string()); if chain.contains(&target_id.to_string()) { return Err(format!( "Syklus: {target_id} finnes allerede i kjede {chain:?}" )); } Ok(true) } // ============================================================================= // Tester // ============================================================================= #[cfg(test)] mod tests { use super::*; #[test] fn test_cascade_no_cycle() { let chain = vec!["aaa".to_string(), "bbb".to_string()]; assert!(check_cascade_safety("ccc", "ddd", &chain).is_ok()); } #[test] fn test_cascade_direct_cycle() { // A → B → A: syklus! let chain = vec!["aaa".to_string()]; let result = check_cascade_safety("bbb", "aaa", &chain); assert!(result.is_err()); assert!(result.unwrap_err().contains("Syklus")); } #[test] fn test_cascade_self_trigger() { // A → A: syklus via seg selv let chain: Vec = vec![]; let result = check_cascade_safety("aaa", "aaa", &chain); assert!(result.is_err()); assert!(result.unwrap_err().contains("Syklus")); } #[test] fn test_cascade_indirect_cycle() { // A → B → C → A: syklus let chain = vec!["aaa".to_string(), "bbb".to_string()]; let result = check_cascade_safety("ccc", "aaa", &chain); assert!(result.is_err()); } #[test] fn test_cascade_depth_limit() { let chain: Vec = (0..MAX_CASCADE_DEPTH) .map(|i| format!("node-{i}")) .collect(); let result = check_cascade_safety("node-next", "node-target", &chain); assert!(result.is_err()); assert!(result.unwrap_err().contains("Dybdegrense")); } #[test] fn test_cascade_at_depth_limit_minus_one() { // Nøyaktig under grensen — skal være OK let chain: Vec = (0..MAX_CASCADE_DEPTH - 1) .map(|i| format!("node-{i}")) .collect(); let result = check_cascade_safety("node-next", "node-target", &chain); assert!(result.is_ok()); } #[test] fn test_cascade_empty_chain() { // Første orkestrering i kjeden — ingen syklus mulig let chain: Vec = vec![]; assert!(check_cascade_safety("aaa", "bbb", &chain).is_ok()); } #[test] fn test_cascade_completed_equals_target_with_chain() { // completed=B, target=B, chain=[A]: B trigger seg selv let chain = vec!["aaa".to_string()]; let result = check_cascade_safety("bbb", "bbb", &chain); assert!(result.is_err()); } }