synops/maskinrommet/src/orchestration_trigger.rs
vegard 44327df370 Kaskade: triggers-edge mellom orkestreringer (oppgave 24.8)
Etter vellykket pipeline-utførelse sjekker handle_orchestrate() om
orkestreringen har utgående triggers-edges til andre orchestration-noder.
Hvert gyldig mål enqueues som ny orchestrate-jobb med kaskade-kontekst.

Syklusdeteksjon via cascade_chain i jobb-payload:
- Sporer alle orchestration-IDer allerede utført i kjeden
- Blokkerer target som allerede finnes i kjeden (direkte + indirekte syklus)
- Dybdegrense på 10 ledd (MAX_CASCADE_DEPTH)
- Blokkerte kaskader logges i orchestration_log med status=skipped

Nedstrøms orkestreringer mottar:
- trigger_event: "cascade"
- upstream_orchestration_id i trigger_context
- {event.upstream_orchestration_id} tilgjengelig i script

Kaskade-feil er ikke-fatale — selve orkestreringen rapporteres som suksess.

8 nye enhetstester for syklusdeteksjon og dybdegrense.
2026-03-18 17:57:26 +00:00

555 lines
18 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

//! Trigger-evaluering for orchestration-noder.
//!
//! Ved node/edge-events fra PG LISTEN/NOTIFY evaluerer vi om noen
//! orchestration-noder matcher triggeren. Matchende orkestreringer
//! legges i jobbkøen for utførelse (oppgave 24.3).
//!
//! Designprinsipper:
//! - Effektiv lookup via indeks på `metadata->'trigger'->>'event'`
//! - Ingen LLM-kall — ren deterministisk evaluering
//! - Non-blocking — spawner async task, blokkerer ikke NOTIFY-loopen
//! - `observes`-edge overtrumfer conditions (eksplisitt > implisitt)
//!
//! Ref: docs/concepts/orkestrering.md § 56
use sqlx::PgPool;
use uuid::Uuid;
/// Kontekst for en trigger-event — informasjonen som orkestreringen
/// trenger for å evaluere betingelser og bygge payload.
#[derive(Debug, Clone)]
pub struct TriggerContext {
/// Trigger-event-type (f.eks. "node.created", "edge.created")
pub event: String,
/// Primær node-ID som utløste eventet (node selv, eller source/target)
pub node_id: Option<Uuid>,
/// Node-kind (f.eks. "content", "communication") — for node_kind-betingelse
pub node_kind: Option<String>,
/// Edge-type (for edge-events)
pub edge_type: Option<String>,
/// Source-ID (for edge-events)
pub source_id: Option<Uuid>,
/// Target-ID (for edge-events)
pub target_id: Option<Uuid>,
/// Operasjon (INSERT, UPDATE, DELETE)
pub op: String,
}
/// Matchende orchestration-node med metadata for jobb-oppretting.
#[derive(Debug, sqlx::FromRow)]
struct OrchestrationMatch {
id: Uuid,
metadata: serde_json::Value,
}
/// Spawner trigger-evaluering som en async task.
/// Blokkerer ikke NOTIFY-loopen — feil logges men propageres ikke.
pub fn spawn_trigger_evaluation(db: PgPool, ctx: TriggerContext) {
tokio::spawn(async move {
if let Err(e) = evaluate_triggers(&db, &ctx).await {
tracing::error!(
event = %ctx.event,
error = %e,
"Trigger-evaluering feilet"
);
}
});
}
/// Evaluerer alle orchestration-noder som matcher et gitt trigger-event.
///
/// Flyt:
/// 1. Finn orchestration-noder med matchende `metadata.trigger.event`
/// 2. For hver: sjekk `observes`-edge (eksplisitt kobling)
/// 3. For hver: evaluer `conditions` mot trigger-konteksten
/// 4. Legg matchende i jobbkøen som `orchestrate`-jobb
async fn evaluate_triggers(db: &PgPool, ctx: &TriggerContext) -> Result<(), String> {
// Steg 1: Finn alle orchestration-noder med matchende trigger-event.
// Bruker den funksjonelle indeksen idx_nodes_orchestration_trigger_event.
let candidates = sqlx::query_as::<_, OrchestrationMatch>(
r#"
SELECT id, metadata
FROM nodes
WHERE node_kind = 'orchestration'
AND metadata -> 'trigger' ->> 'event' = $1
"#,
)
.bind(&ctx.event)
.fetch_all(db)
.await
.map_err(|e| format!("Feil ved henting av orchestration-kandidater: {e}"))?;
if candidates.is_empty() {
return Ok(());
}
tracing::debug!(
event = %ctx.event,
candidates = candidates.len(),
"Fant orchestration-kandidater for trigger"
);
for candidate in &candidates {
match should_trigger(db, candidate, ctx).await {
Ok(true) => {
enqueue_orchestration(db, candidate, ctx).await?;
}
Ok(false) => {
tracing::debug!(
orchestration_id = %candidate.id,
"Orchestration matchet ikke betingelser, hoppes over"
);
}
Err(e) => {
tracing::warn!(
orchestration_id = %candidate.id,
error = %e,
"Feil ved evaluering av orchestration-betingelser"
);
}
}
}
Ok(())
}
/// Avgjør om en orchestration-node skal trigges, basert på:
/// 1. `observes`-edge — eksplisitt kobling til triggerende node
/// 2. `conditions` — implisitt matching mot trigger-kontekst
///
/// Logikk (fra docs/concepts/orkestrering.md § 9):
/// - Hvis orchestration har `observes`-edges: trigger KUN hvis en peker til triggerende node
/// - Hvis ingen `observes`-edges: evaluer `conditions` (implisitt matching)
async fn should_trigger(
db: &PgPool,
candidate: &OrchestrationMatch,
ctx: &TriggerContext,
) -> Result<bool, String> {
// Samle alle relevante node-IDer fra konteksten
let relevant_ids: Vec<Uuid> = [ctx.node_id, ctx.source_id, ctx.target_id]
.iter()
.filter_map(|id| *id)
.collect();
if relevant_ids.is_empty() {
// Ingen node-kontekst å matche mot — kan ikke evaluere
return Ok(false);
}
// Sjekk om orchestration har noen `observes`-edges overhodet
let observes_count = sqlx::query_scalar::<_, i64>(
"SELECT COUNT(*) FROM edges WHERE source_id = $1 AND edge_type = 'observes'",
)
.bind(candidate.id)
.fetch_one(db)
.await
.map_err(|e| format!("Feil ved sjekk av observes-edges: {e}"))?;
if observes_count > 0 {
// Eksplisitt modus: sjekk om noen observes-edge peker til en relevant node
let observes_match = sqlx::query_scalar::<_, i64>(
r#"
SELECT COUNT(*) FROM edges
WHERE source_id = $1
AND edge_type = 'observes'
AND target_id = ANY($2)
"#,
)
.bind(candidate.id)
.bind(&relevant_ids)
.fetch_one(db)
.await
.map_err(|e| format!("Feil ved matching av observes-edges: {e}"))?;
return Ok(observes_match > 0);
}
// Implisitt modus: evaluer conditions
evaluate_conditions(db, candidate, ctx).await
}
/// Evaluerer `metadata.trigger.conditions` mot trigger-konteksten.
///
/// Kjente betingelser:
/// - `node_kind` — matcher node_kind fra konteksten
/// - `has_trait` — noden tilhører en samling med denne traiten
/// - `has_tag` — noden har en `tagged`-edge med denne verdien
/// - `edge_type` — matcher edge_type fra konteksten (for edge-events)
///
/// Alle betingelser må matche (AND-logikk).
async fn evaluate_conditions(
db: &PgPool,
candidate: &OrchestrationMatch,
ctx: &TriggerContext,
) -> Result<bool, String> {
let conditions = match candidate
.metadata
.get("trigger")
.and_then(|t| t.get("conditions"))
.and_then(|c| c.as_object())
{
Some(c) => c,
None => return Ok(true), // Ingen betingelser = alltid match
};
for (key, value) in conditions {
let val_str = match value.as_str() {
Some(s) => s,
None => continue, // Ignorer ikke-streng-verdier
};
let matched = match key.as_str() {
"node_kind" => ctx.node_kind.as_deref() == Some(val_str),
"edge_type" => ctx.edge_type.as_deref() == Some(val_str),
"has_trait" => {
if let Some(node_id) = ctx.node_id {
has_trait(db, node_id, val_str).await?
} else {
false
}
}
"has_tag" => {
if let Some(node_id) = ctx.node_id {
has_tag(db, node_id, val_str).await?
} else {
false
}
}
_ => {
tracing::debug!(
condition = key.as_str(),
"Ukjent trigger-betingelse, ignoreres"
);
true // Ukjente betingelser blokkerer ikke
}
};
if !matched {
return Ok(false);
}
}
Ok(true)
}
/// Sjekker om en node tilhører en samling som har en bestemt trait.
/// Følger `belongs_to`-edge → samlingsnodiens `metadata.traits`.
async fn has_trait(db: &PgPool, node_id: Uuid, trait_name: &str) -> Result<bool, String> {
// Finn samlinger som noden tilhører, sjekk om noen har traiten
let count = sqlx::query_scalar::<_, i64>(
r#"
SELECT COUNT(*) FROM edges e
JOIN nodes n ON n.id = e.target_id
WHERE e.source_id = $1
AND e.edge_type = 'belongs_to'
AND n.node_kind = 'collection'
AND n.metadata -> 'traits' ? $2
"#,
)
.bind(node_id)
.bind(trait_name)
.fetch_one(db)
.await
.map_err(|e| format!("Feil ved has_trait-sjekk: {e}"))?;
Ok(count > 0)
}
/// Sjekker om en node har en `tagged`-edge med en bestemt verdi.
async fn has_tag(db: &PgPool, node_id: Uuid, tag: &str) -> Result<bool, String> {
let count = sqlx::query_scalar::<_, i64>(
r#"
SELECT COUNT(*) FROM edges
WHERE source_id = $1
AND edge_type = 'tagged'
AND metadata ->> 'tag' = $2
"#,
)
.bind(node_id)
.bind(tag)
.fetch_one(db)
.await
.map_err(|e| format!("Feil ved has_tag-sjekk: {e}"))?;
Ok(count > 0)
}
/// Legger en matchende orchestration i jobbkøen.
async fn enqueue_orchestration(
db: &PgPool,
candidate: &OrchestrationMatch,
ctx: &TriggerContext,
) -> Result<(), String> {
let payload = serde_json::json!({
"orchestration_id": candidate.id.to_string(),
"trigger_event": ctx.event,
"trigger_context": {
"node_id": ctx.node_id.map(|id| id.to_string()),
"node_kind": ctx.node_kind,
"edge_type": ctx.edge_type,
"source_id": ctx.source_id.map(|id| id.to_string()),
"target_id": ctx.target_id.map(|id| id.to_string()),
"op": ctx.op,
},
"cascade_chain": []
});
// Prioritet 5 = normal (mellom batch-jobber og brukerforespørsler)
crate::jobs::enqueue(db, "orchestrate", payload, None, 5)
.await
.map_err(|e| format!("Feil ved enqueue av orchestration-jobb: {e}"))?;
tracing::info!(
orchestration_id = %candidate.id,
event = %ctx.event,
"Orchestration trigget — jobb lagt i kø"
);
Ok(())
}
// =============================================================================
// Kaskade: triggers-edge mellom orkestreringer (oppgave 24.8)
// =============================================================================
/// Maks kaskadedybde — sikring mot dype kjeder selv uten syklus.
const MAX_CASCADE_DEPTH: usize = 10;
/// Etter vellykket orkestrering: finn `triggers`-edges til andre
/// orkestreringer og legg dem i jobbkøen.
///
/// Syklusdeteksjon: `cascade_chain` inneholder alle orchestration-IDer
/// som allerede er utført i denne kaskaden. Hvis target allerede finnes
/// i kjeden, logges det og target hoppes over.
///
/// Output fra den fullførte orkestreringen sendes videre som
/// `upstream_output` i trigger_context, slik at nedstrøms orkestreringer
/// kan bruke `{event.upstream_*}`-variabler.
pub async fn enqueue_cascade(
db: &PgPool,
completed_orchestration_id: Uuid,
cascade_chain: &[String],
pipeline_result: &serde_json::Value,
) -> Result<usize, String> {
// Sjekk dybdegrense
if cascade_chain.len() >= MAX_CASCADE_DEPTH {
tracing::warn!(
orchestration_id = %completed_orchestration_id,
depth = cascade_chain.len(),
max = MAX_CASCADE_DEPTH,
"Kaskade-dybdegrense nådd — stopper videre kaskade"
);
return Ok(0);
}
// Finn alle triggers-edges fra denne orkestreringen
let targets = sqlx::query_as::<_, (Uuid,)>(
r#"
SELECT target_id FROM edges
WHERE source_id = $1
AND edge_type = 'triggers'
"#,
)
.bind(completed_orchestration_id)
.fetch_all(db)
.await
.map_err(|e| format!("Feil ved henting av triggers-edges: {e}"))?;
if targets.is_empty() {
return Ok(0);
}
// Bygg oppdatert kaskadekjede
let mut new_chain = cascade_chain.to_vec();
new_chain.push(completed_orchestration_id.to_string());
let mut enqueued = 0;
for (target_id,) in &targets {
let target_str = target_id.to_string();
// Syklusdeteksjon: er target allerede i kjeden?
if new_chain.contains(&target_str) {
tracing::warn!(
source = %completed_orchestration_id,
target = %target_id,
chain = ?new_chain,
"Syklus oppdaget i kaskade — hopper over"
);
// Logg syklusen i orchestration_log
let _ = sqlx::query(
r#"
INSERT INTO orchestration_log
(orchestration_id, job_id, step_number, tool_binary, args,
is_fallback, status, error_msg, duration_ms)
VALUES ($1, '00000000-0000-0000-0000-000000000000', 0,
'cascade', '[]'::jsonb, false, 'skipped',
$2, 0)
"#,
)
.bind(completed_orchestration_id)
.bind(format!(
"Syklus oppdaget: {} allerede i kaskadekjede {:?}",
target_id, new_chain
))
.execute(db)
.await;
continue;
}
// Verifiser at target faktisk er en orchestration-node
let is_orch = sqlx::query_scalar::<_, i64>(
"SELECT COUNT(*) FROM nodes WHERE id = $1 AND node_kind = 'orchestration'"
)
.bind(target_id)
.fetch_one(db)
.await
.map_err(|e| format!("Feil ved verifisering av kaskade-mål: {e}"))?;
if is_orch == 0 {
tracing::warn!(
target = %target_id,
"triggers-edge peker til ikke-orchestration-node — hopper over"
);
continue;
}
// Bygg payload med kaskade-kontekst
let payload = serde_json::json!({
"orchestration_id": target_str,
"trigger_event": "cascade",
"trigger_context": {
"node_id": pipeline_result.get("orchestration_id")
.and_then(|v| v.as_str()),
"node_kind": "orchestration",
"op": "CASCADE",
"upstream_orchestration_id": completed_orchestration_id.to_string(),
"upstream_steps_ok": pipeline_result.get("steps_ok"),
},
"cascade_chain": new_chain,
});
crate::jobs::enqueue(db, "orchestrate", payload, None, 5)
.await
.map_err(|e| format!("Feil ved enqueue av kaskade-jobb: {e}"))?;
tracing::info!(
source = %completed_orchestration_id,
target = %target_id,
depth = new_chain.len(),
"Kaskade: nedstrøms orkestrering lagt i kø"
);
enqueued += 1;
}
Ok(enqueued)
}
/// Hjelpefunksjon for testing: sjekk om en kaskade ville blitt blokkert
/// av syklusdeteksjon eller dybdegrense.
///
/// Returnerer `Ok(true)` hvis kaskaden er trygg, `Err(grunn)` hvis blokkert.
pub fn check_cascade_safety(
completed_id: &str,
target_id: &str,
cascade_chain: &[String],
) -> Result<bool, String> {
if cascade_chain.len() >= MAX_CASCADE_DEPTH {
return Err(format!(
"Dybdegrense ({MAX_CASCADE_DEPTH}) nådd ved kjede med {} elementer",
cascade_chain.len()
));
}
let mut chain = cascade_chain.to_vec();
chain.push(completed_id.to_string());
if chain.contains(&target_id.to_string()) {
return Err(format!(
"Syklus: {target_id} finnes allerede i kjede {chain:?}"
));
}
Ok(true)
}
// =============================================================================
// Tester
// =============================================================================
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_cascade_no_cycle() {
let chain = vec!["aaa".to_string(), "bbb".to_string()];
assert!(check_cascade_safety("ccc", "ddd", &chain).is_ok());
}
#[test]
fn test_cascade_direct_cycle() {
// A → B → A: syklus!
let chain = vec!["aaa".to_string()];
let result = check_cascade_safety("bbb", "aaa", &chain);
assert!(result.is_err());
assert!(result.unwrap_err().contains("Syklus"));
}
#[test]
fn test_cascade_self_trigger() {
// A → A: syklus via seg selv
let chain: Vec<String> = vec![];
let result = check_cascade_safety("aaa", "aaa", &chain);
assert!(result.is_err());
assert!(result.unwrap_err().contains("Syklus"));
}
#[test]
fn test_cascade_indirect_cycle() {
// A → B → C → A: syklus
let chain = vec!["aaa".to_string(), "bbb".to_string()];
let result = check_cascade_safety("ccc", "aaa", &chain);
assert!(result.is_err());
}
#[test]
fn test_cascade_depth_limit() {
let chain: Vec<String> = (0..MAX_CASCADE_DEPTH)
.map(|i| format!("node-{i}"))
.collect();
let result = check_cascade_safety("node-next", "node-target", &chain);
assert!(result.is_err());
assert!(result.unwrap_err().contains("Dybdegrense"));
}
#[test]
fn test_cascade_at_depth_limit_minus_one() {
// Nøyaktig under grensen — skal være OK
let chain: Vec<String> = (0..MAX_CASCADE_DEPTH - 1)
.map(|i| format!("node-{i}"))
.collect();
let result = check_cascade_safety("node-next", "node-target", &chain);
assert!(result.is_ok());
}
#[test]
fn test_cascade_empty_chain() {
// Første orkestrering i kjeden — ingen syklus mulig
let chain: Vec<String> = vec![];
assert!(check_cascade_safety("aaa", "bbb", &chain).is_ok());
}
#[test]
fn test_cascade_completed_equals_target_with_chain() {
// completed=B, target=B, chain=[A]: B trigger seg selv
let chain = vec!["aaa".to_string()];
let result = check_cascade_safety("bbb", "bbb", &chain);
assert!(result.is_err());
}
}