Ved node/edge-events fra PG LISTEN/NOTIFY evaluerer portvokteren nå om noen orchestration-noder matcher triggeren. Implementert som non-blocking async task som ikke blokkerer WebSocket-flyten. Ny modul orchestration_trigger.rs: - Mapper NOTIFY-events til trigger-typer (node.created, edge.created) - Effektiv lookup via funksjonell B-tree-indeks på metadata->trigger->event - Evaluerer observes-edges (eksplisitt) vs conditions (implisitt) - Betingelser: node_kind, edge_type, has_trait, has_tag (AND-logikk) - Legger matchende orkestreringer i jobbkøen som "orchestrate"-jobb Ny migration 021: indeks for trigger-event lookup på orchestration-noder. Jobbkø-dispatcher håndterer "orchestrate" med placeholder (24.3 implementerer utførelse). Verifisert: content-node trigrer matching orchestration, communication-node hoppes over.
312 lines
9.8 KiB
Rust
312 lines
9.8 KiB
Rust
//! Trigger-evaluering for orchestration-noder.
|
||
//!
|
||
//! Ved node/edge-events fra PG LISTEN/NOTIFY evaluerer vi om noen
|
||
//! orchestration-noder matcher triggeren. Matchende orkestreringer
|
||
//! legges i jobbkøen for utførelse (oppgave 24.3).
|
||
//!
|
||
//! Designprinsipper:
|
||
//! - Effektiv lookup via indeks på `metadata->'trigger'->>'event'`
|
||
//! - Ingen LLM-kall — ren deterministisk evaluering
|
||
//! - Non-blocking — spawner async task, blokkerer ikke NOTIFY-loopen
|
||
//! - `observes`-edge overtrumfer conditions (eksplisitt > implisitt)
|
||
//!
|
||
//! Ref: docs/concepts/orkestrering.md § 5–6
|
||
|
||
use sqlx::PgPool;
|
||
use uuid::Uuid;
|
||
|
||
/// Kontekst for en trigger-event — informasjonen som orkestreringen
|
||
/// trenger for å evaluere betingelser og bygge payload.
|
||
#[derive(Debug, Clone)]
|
||
pub struct TriggerContext {
|
||
/// Trigger-event-type (f.eks. "node.created", "edge.created")
|
||
pub event: String,
|
||
/// Primær node-ID som utløste eventet (node selv, eller source/target)
|
||
pub node_id: Option<Uuid>,
|
||
/// Node-kind (f.eks. "content", "communication") — for node_kind-betingelse
|
||
pub node_kind: Option<String>,
|
||
/// Edge-type (for edge-events)
|
||
pub edge_type: Option<String>,
|
||
/// Source-ID (for edge-events)
|
||
pub source_id: Option<Uuid>,
|
||
/// Target-ID (for edge-events)
|
||
pub target_id: Option<Uuid>,
|
||
/// Operasjon (INSERT, UPDATE, DELETE)
|
||
pub op: String,
|
||
}
|
||
|
||
/// Matchende orchestration-node med metadata for jobb-oppretting.
|
||
#[derive(Debug, sqlx::FromRow)]
|
||
struct OrchestrationMatch {
|
||
id: Uuid,
|
||
metadata: serde_json::Value,
|
||
}
|
||
|
||
/// Spawner trigger-evaluering som en async task.
|
||
/// Blokkerer ikke NOTIFY-loopen — feil logges men propageres ikke.
|
||
pub fn spawn_trigger_evaluation(db: PgPool, ctx: TriggerContext) {
|
||
tokio::spawn(async move {
|
||
if let Err(e) = evaluate_triggers(&db, &ctx).await {
|
||
tracing::error!(
|
||
event = %ctx.event,
|
||
error = %e,
|
||
"Trigger-evaluering feilet"
|
||
);
|
||
}
|
||
});
|
||
}
|
||
|
||
/// Evaluerer alle orchestration-noder som matcher et gitt trigger-event.
|
||
///
|
||
/// Flyt:
|
||
/// 1. Finn orchestration-noder med matchende `metadata.trigger.event`
|
||
/// 2. For hver: sjekk `observes`-edge (eksplisitt kobling)
|
||
/// 3. For hver: evaluer `conditions` mot trigger-konteksten
|
||
/// 4. Legg matchende i jobbkøen som `orchestrate`-jobb
|
||
async fn evaluate_triggers(db: &PgPool, ctx: &TriggerContext) -> Result<(), String> {
|
||
// Steg 1: Finn alle orchestration-noder med matchende trigger-event.
|
||
// Bruker den funksjonelle indeksen idx_nodes_orchestration_trigger_event.
|
||
let candidates = sqlx::query_as::<_, OrchestrationMatch>(
|
||
r#"
|
||
SELECT id, metadata
|
||
FROM nodes
|
||
WHERE node_kind = 'orchestration'
|
||
AND metadata -> 'trigger' ->> 'event' = $1
|
||
"#,
|
||
)
|
||
.bind(&ctx.event)
|
||
.fetch_all(db)
|
||
.await
|
||
.map_err(|e| format!("Feil ved henting av orchestration-kandidater: {e}"))?;
|
||
|
||
if candidates.is_empty() {
|
||
return Ok(());
|
||
}
|
||
|
||
tracing::debug!(
|
||
event = %ctx.event,
|
||
candidates = candidates.len(),
|
||
"Fant orchestration-kandidater for trigger"
|
||
);
|
||
|
||
for candidate in &candidates {
|
||
match should_trigger(db, candidate, ctx).await {
|
||
Ok(true) => {
|
||
enqueue_orchestration(db, candidate, ctx).await?;
|
||
}
|
||
Ok(false) => {
|
||
tracing::debug!(
|
||
orchestration_id = %candidate.id,
|
||
"Orchestration matchet ikke betingelser, hoppes over"
|
||
);
|
||
}
|
||
Err(e) => {
|
||
tracing::warn!(
|
||
orchestration_id = %candidate.id,
|
||
error = %e,
|
||
"Feil ved evaluering av orchestration-betingelser"
|
||
);
|
||
}
|
||
}
|
||
}
|
||
|
||
Ok(())
|
||
}
|
||
|
||
/// Avgjør om en orchestration-node skal trigges, basert på:
|
||
/// 1. `observes`-edge — eksplisitt kobling til triggerende node
|
||
/// 2. `conditions` — implisitt matching mot trigger-kontekst
|
||
///
|
||
/// Logikk (fra docs/concepts/orkestrering.md § 9):
|
||
/// - Hvis orchestration har `observes`-edges: trigger KUN hvis en peker til triggerende node
|
||
/// - Hvis ingen `observes`-edges: evaluer `conditions` (implisitt matching)
|
||
async fn should_trigger(
|
||
db: &PgPool,
|
||
candidate: &OrchestrationMatch,
|
||
ctx: &TriggerContext,
|
||
) -> Result<bool, String> {
|
||
// Samle alle relevante node-IDer fra konteksten
|
||
let relevant_ids: Vec<Uuid> = [ctx.node_id, ctx.source_id, ctx.target_id]
|
||
.iter()
|
||
.filter_map(|id| *id)
|
||
.collect();
|
||
|
||
if relevant_ids.is_empty() {
|
||
// Ingen node-kontekst å matche mot — kan ikke evaluere
|
||
return Ok(false);
|
||
}
|
||
|
||
// Sjekk om orchestration har noen `observes`-edges overhodet
|
||
let observes_count = sqlx::query_scalar::<_, i64>(
|
||
"SELECT COUNT(*) FROM edges WHERE source_id = $1 AND edge_type = 'observes'",
|
||
)
|
||
.bind(candidate.id)
|
||
.fetch_one(db)
|
||
.await
|
||
.map_err(|e| format!("Feil ved sjekk av observes-edges: {e}"))?;
|
||
|
||
if observes_count > 0 {
|
||
// Eksplisitt modus: sjekk om noen observes-edge peker til en relevant node
|
||
let observes_match = sqlx::query_scalar::<_, i64>(
|
||
r#"
|
||
SELECT COUNT(*) FROM edges
|
||
WHERE source_id = $1
|
||
AND edge_type = 'observes'
|
||
AND target_id = ANY($2)
|
||
"#,
|
||
)
|
||
.bind(candidate.id)
|
||
.bind(&relevant_ids)
|
||
.fetch_one(db)
|
||
.await
|
||
.map_err(|e| format!("Feil ved matching av observes-edges: {e}"))?;
|
||
|
||
return Ok(observes_match > 0);
|
||
}
|
||
|
||
// Implisitt modus: evaluer conditions
|
||
evaluate_conditions(db, candidate, ctx).await
|
||
}
|
||
|
||
/// Evaluerer `metadata.trigger.conditions` mot trigger-konteksten.
|
||
///
|
||
/// Kjente betingelser:
|
||
/// - `node_kind` — matcher node_kind fra konteksten
|
||
/// - `has_trait` — noden tilhører en samling med denne traiten
|
||
/// - `has_tag` — noden har en `tagged`-edge med denne verdien
|
||
/// - `edge_type` — matcher edge_type fra konteksten (for edge-events)
|
||
///
|
||
/// Alle betingelser må matche (AND-logikk).
|
||
async fn evaluate_conditions(
|
||
db: &PgPool,
|
||
candidate: &OrchestrationMatch,
|
||
ctx: &TriggerContext,
|
||
) -> Result<bool, String> {
|
||
let conditions = match candidate
|
||
.metadata
|
||
.get("trigger")
|
||
.and_then(|t| t.get("conditions"))
|
||
.and_then(|c| c.as_object())
|
||
{
|
||
Some(c) => c,
|
||
None => return Ok(true), // Ingen betingelser = alltid match
|
||
};
|
||
|
||
for (key, value) in conditions {
|
||
let val_str = match value.as_str() {
|
||
Some(s) => s,
|
||
None => continue, // Ignorer ikke-streng-verdier
|
||
};
|
||
|
||
let matched = match key.as_str() {
|
||
"node_kind" => ctx.node_kind.as_deref() == Some(val_str),
|
||
|
||
"edge_type" => ctx.edge_type.as_deref() == Some(val_str),
|
||
|
||
"has_trait" => {
|
||
if let Some(node_id) = ctx.node_id {
|
||
has_trait(db, node_id, val_str).await?
|
||
} else {
|
||
false
|
||
}
|
||
}
|
||
|
||
"has_tag" => {
|
||
if let Some(node_id) = ctx.node_id {
|
||
has_tag(db, node_id, val_str).await?
|
||
} else {
|
||
false
|
||
}
|
||
}
|
||
|
||
_ => {
|
||
tracing::debug!(
|
||
condition = key.as_str(),
|
||
"Ukjent trigger-betingelse, ignoreres"
|
||
);
|
||
true // Ukjente betingelser blokkerer ikke
|
||
}
|
||
};
|
||
|
||
if !matched {
|
||
return Ok(false);
|
||
}
|
||
}
|
||
|
||
Ok(true)
|
||
}
|
||
|
||
/// Sjekker om en node tilhører en samling som har en bestemt trait.
|
||
/// Følger `belongs_to`-edge → samlingsnodiens `metadata.traits`.
|
||
async fn has_trait(db: &PgPool, node_id: Uuid, trait_name: &str) -> Result<bool, String> {
|
||
// Finn samlinger som noden tilhører, sjekk om noen har traiten
|
||
let count = sqlx::query_scalar::<_, i64>(
|
||
r#"
|
||
SELECT COUNT(*) FROM edges e
|
||
JOIN nodes n ON n.id = e.target_id
|
||
WHERE e.source_id = $1
|
||
AND e.edge_type = 'belongs_to'
|
||
AND n.node_kind = 'collection'
|
||
AND n.metadata -> 'traits' ? $2
|
||
"#,
|
||
)
|
||
.bind(node_id)
|
||
.bind(trait_name)
|
||
.fetch_one(db)
|
||
.await
|
||
.map_err(|e| format!("Feil ved has_trait-sjekk: {e}"))?;
|
||
|
||
Ok(count > 0)
|
||
}
|
||
|
||
/// Sjekker om en node har en `tagged`-edge med en bestemt verdi.
|
||
async fn has_tag(db: &PgPool, node_id: Uuid, tag: &str) -> Result<bool, String> {
|
||
let count = sqlx::query_scalar::<_, i64>(
|
||
r#"
|
||
SELECT COUNT(*) FROM edges
|
||
WHERE source_id = $1
|
||
AND edge_type = 'tagged'
|
||
AND metadata ->> 'tag' = $2
|
||
"#,
|
||
)
|
||
.bind(node_id)
|
||
.bind(tag)
|
||
.fetch_one(db)
|
||
.await
|
||
.map_err(|e| format!("Feil ved has_tag-sjekk: {e}"))?;
|
||
|
||
Ok(count > 0)
|
||
}
|
||
|
||
/// Legger en matchende orchestration i jobbkøen.
|
||
async fn enqueue_orchestration(
|
||
db: &PgPool,
|
||
candidate: &OrchestrationMatch,
|
||
ctx: &TriggerContext,
|
||
) -> Result<(), String> {
|
||
let payload = serde_json::json!({
|
||
"orchestration_id": candidate.id.to_string(),
|
||
"trigger_event": ctx.event,
|
||
"trigger_context": {
|
||
"node_id": ctx.node_id.map(|id| id.to_string()),
|
||
"node_kind": ctx.node_kind,
|
||
"edge_type": ctx.edge_type,
|
||
"source_id": ctx.source_id.map(|id| id.to_string()),
|
||
"target_id": ctx.target_id.map(|id| id.to_string()),
|
||
"op": ctx.op,
|
||
}
|
||
});
|
||
|
||
// Prioritet 5 = normal (mellom batch-jobber og brukerforespørsler)
|
||
crate::jobs::enqueue(db, "orchestrate", payload, None, 5)
|
||
.await
|
||
.map_err(|e| format!("Feil ved enqueue av orchestration-jobb: {e}"))?;
|
||
|
||
tracing::info!(
|
||
orchestration_id = %candidate.id,
|
||
event = %ctx.event,
|
||
"Orchestration trigget — jobb lagt i kø"
|
||
);
|
||
|
||
Ok(())
|
||
}
|