diff --git a/docs/concepts/orkestrering.md b/docs/concepts/orkestrering.md index 690e1db..255aae2 100644 --- a/docs/concepts/orkestrering.md +++ b/docs/concepts/orkestrering.md @@ -387,7 +387,7 @@ Dette gjelder også feilhåndtering: et feilet VED_FEIL-steg blir en work_item som Claude Code løser i neste sesjon — reparerer scriptet, legger til manglende verktøy, osv. -## 7. Koblinger mellom orkestreringer +## 7. Koblinger mellom orkestreringer (kaskade) Orkestreringer er noder med edges: @@ -400,6 +400,34 @@ Orkestreringer er noder med edges: Output fra én orkestrering kan trigge neste via `triggers`-edge. Kaskade via edges, ikke hardkodet. +### Implementasjon (oppgave 24.8) + +Etter vellykket pipeline-utførelse sjekker `handle_orchestrate()` om +orkestreringen har utgående `triggers`-edges. For hvert gyldig mål +(må være `node_kind = 'orchestration'`) legges en ny `orchestrate`-jobb +i køen med: + +- `trigger_event: "cascade"` — skiller kaskade fra primær-triggere +- `trigger_context.upstream_orchestration_id` — ID til kilden +- `trigger_context.op: "CASCADE"` — for betingelsesmatching +- `cascade_chain` — liste med alle orchestration-IDer allerede utført + +Nedstrøms script kan bruke `{event.upstream_orchestration_id}` for å +referere til oppstrøms orkestrering. + +### Syklusdeteksjon + +Kaskadekjeden (`cascade_chain`) spores som en array i jobb-payloaden. +Før enqueue av hvert mål sjekkes: + +1. **Dybdegrense:** Maks 10 ledd i kjeden (konfigurerbart via `MAX_CASCADE_DEPTH`) +2. **Syklussjekk:** Target-ID finnes ikke allerede i kjeden (inkludert + orkestreringen som nettopp fullførte) + +Blokkerte kaskader logges i `orchestration_log` med `status = 'skipped'` +og `tool_binary = 'cascade'`. Kaskade-feil er ikke-fatale — den +fullførte orkestreringen rapporteres fortsatt som suksess. + ## 8. Brukergrensesnitt ``` diff --git a/docs/primitiver/edges.md b/docs/primitiver/edges.md index e4917df..01c51f0 100644 --- a/docs/primitiver/edges.md +++ b/docs/primitiver/edges.md @@ -76,6 +76,7 @@ valideres i maskinrommet. | `source_material` | Kildemateriale (avledet node → kilde) | `{ "context": "quoted", "excerpt": "..." }` | | `assigned_to` | Tildelt (work_item → person/agent) | — | | `observes` | Overvåker (orchestration → target node) | — | +| `triggers` | Kaskade-kobling (orchestration → orchestration) | — | | `derived_from` | Prosessert versjon av (f.eks. lydstudio-output → original) | — | | `has_studio` | Studio-sesjon (sesjon → medienode) | — | diff --git a/maskinrommet/src/jobs.rs b/maskinrommet/src/jobs.rs index dcf5fbd..5f82d36 100644 --- a/maskinrommet/src/jobs.rs +++ b/maskinrommet/src/jobs.rs @@ -378,14 +378,49 @@ async fn handle_orchestrate( )); } - Ok(serde_json::json!({ + let result_json = serde_json::json!({ "status": "executed", "orchestration_id": orch_id.to_string(), "steps_compiled": compiled.steps.len(), "steps_run": pipeline_result.steps_run, "steps_ok": pipeline_result.steps_ok, "technical": compiled.technical, - })) + }); + + // === Kaskade: triggers-edge til nedstrøms orkestreringer (oppgave 24.8) === + let cascade_chain: Vec = job + .payload + .get("cascade_chain") + .and_then(|v| serde_json::from_value(v.clone()).ok()) + .unwrap_or_default(); + + match crate::orchestration_trigger::enqueue_cascade( + db, + orch_id, + &cascade_chain, + &result_json, + ) + .await + { + Ok(0) => {} // Ingen kaskade-mål + Ok(n) => { + tracing::info!( + orchestration_id = %orch_id, + cascade_targets = n, + "Kaskade: {n} nedstrøms orkestrering(er) lagt i kø" + ); + } + Err(e) => { + // Kaskade-feil er ikke-fatal — selve orkestreringen lyktes + tracing::error!( + orchestration_id = %orch_id, + error = %e, + "Feil ved kaskade-evaluering (orkestreringen selv lyktes)" + ); + } + } + + Ok(result_json) } /// Synops-render binary path. diff --git a/maskinrommet/src/orchestration_trigger.rs b/maskinrommet/src/orchestration_trigger.rs index 93666cf..120d8aa 100644 --- a/maskinrommet/src/orchestration_trigger.rs +++ b/maskinrommet/src/orchestration_trigger.rs @@ -294,7 +294,8 @@ async fn enqueue_orchestration( "source_id": ctx.source_id.map(|id| id.to_string()), "target_id": ctx.target_id.map(|id| id.to_string()), "op": ctx.op, - } + }, + "cascade_chain": [] }); // Prioritet 5 = normal (mellom batch-jobber og brukerforespørsler) @@ -310,3 +311,245 @@ async fn enqueue_orchestration( Ok(()) } + +// ============================================================================= +// Kaskade: triggers-edge mellom orkestreringer (oppgave 24.8) +// ============================================================================= + +/// Maks kaskadedybde — sikring mot dype kjeder selv uten syklus. +const MAX_CASCADE_DEPTH: usize = 10; + +/// Etter vellykket orkestrering: finn `triggers`-edges til andre +/// orkestreringer og legg dem i jobbkøen. +/// +/// Syklusdeteksjon: `cascade_chain` inneholder alle orchestration-IDer +/// som allerede er utført i denne kaskaden. Hvis target allerede finnes +/// i kjeden, logges det og target hoppes over. +/// +/// Output fra den fullførte orkestreringen sendes videre som +/// `upstream_output` i trigger_context, slik at nedstrøms orkestreringer +/// kan bruke `{event.upstream_*}`-variabler. +pub async fn enqueue_cascade( + db: &PgPool, + completed_orchestration_id: Uuid, + cascade_chain: &[String], + pipeline_result: &serde_json::Value, +) -> Result { + // Sjekk dybdegrense + if cascade_chain.len() >= MAX_CASCADE_DEPTH { + tracing::warn!( + orchestration_id = %completed_orchestration_id, + depth = cascade_chain.len(), + max = MAX_CASCADE_DEPTH, + "Kaskade-dybdegrense nådd — stopper videre kaskade" + ); + return Ok(0); + } + + // Finn alle triggers-edges fra denne orkestreringen + let targets = sqlx::query_as::<_, (Uuid,)>( + r#" + SELECT target_id FROM edges + WHERE source_id = $1 + AND edge_type = 'triggers' + "#, + ) + .bind(completed_orchestration_id) + .fetch_all(db) + .await + .map_err(|e| format!("Feil ved henting av triggers-edges: {e}"))?; + + if targets.is_empty() { + return Ok(0); + } + + // Bygg oppdatert kaskadekjede + let mut new_chain = cascade_chain.to_vec(); + new_chain.push(completed_orchestration_id.to_string()); + + let mut enqueued = 0; + + for (target_id,) in &targets { + let target_str = target_id.to_string(); + + // Syklusdeteksjon: er target allerede i kjeden? + if new_chain.contains(&target_str) { + tracing::warn!( + source = %completed_orchestration_id, + target = %target_id, + chain = ?new_chain, + "Syklus oppdaget i kaskade — hopper over" + ); + // Logg syklusen i orchestration_log + let _ = sqlx::query( + r#" + INSERT INTO orchestration_log + (orchestration_id, job_id, step_number, tool_binary, args, + is_fallback, status, error_msg, duration_ms) + VALUES ($1, '00000000-0000-0000-0000-000000000000', 0, + 'cascade', '[]'::jsonb, false, 'skipped', + $2, 0) + "#, + ) + .bind(completed_orchestration_id) + .bind(format!( + "Syklus oppdaget: {} allerede i kaskadekjede {:?}", + target_id, new_chain + )) + .execute(db) + .await; + continue; + } + + // Verifiser at target faktisk er en orchestration-node + let is_orch = sqlx::query_scalar::<_, i64>( + "SELECT COUNT(*) FROM nodes WHERE id = $1 AND node_kind = 'orchestration'" + ) + .bind(target_id) + .fetch_one(db) + .await + .map_err(|e| format!("Feil ved verifisering av kaskade-mål: {e}"))?; + + if is_orch == 0 { + tracing::warn!( + target = %target_id, + "triggers-edge peker til ikke-orchestration-node — hopper over" + ); + continue; + } + + // Bygg payload med kaskade-kontekst + let payload = serde_json::json!({ + "orchestration_id": target_str, + "trigger_event": "cascade", + "trigger_context": { + "node_id": pipeline_result.get("orchestration_id") + .and_then(|v| v.as_str()), + "node_kind": "orchestration", + "op": "CASCADE", + "upstream_orchestration_id": completed_orchestration_id.to_string(), + "upstream_steps_ok": pipeline_result.get("steps_ok"), + }, + "cascade_chain": new_chain, + }); + + crate::jobs::enqueue(db, "orchestrate", payload, None, 5) + .await + .map_err(|e| format!("Feil ved enqueue av kaskade-jobb: {e}"))?; + + tracing::info!( + source = %completed_orchestration_id, + target = %target_id, + depth = new_chain.len(), + "Kaskade: nedstrøms orkestrering lagt i kø" + ); + + enqueued += 1; + } + + Ok(enqueued) +} + +/// Hjelpefunksjon for testing: sjekk om en kaskade ville blitt blokkert +/// av syklusdeteksjon eller dybdegrense. +/// +/// Returnerer `Ok(true)` hvis kaskaden er trygg, `Err(grunn)` hvis blokkert. +pub fn check_cascade_safety( + completed_id: &str, + target_id: &str, + cascade_chain: &[String], +) -> Result { + if cascade_chain.len() >= MAX_CASCADE_DEPTH { + return Err(format!( + "Dybdegrense ({MAX_CASCADE_DEPTH}) nådd ved kjede med {} elementer", + cascade_chain.len() + )); + } + + let mut chain = cascade_chain.to_vec(); + chain.push(completed_id.to_string()); + + if chain.contains(&target_id.to_string()) { + return Err(format!( + "Syklus: {target_id} finnes allerede i kjede {chain:?}" + )); + } + + Ok(true) +} + +// ============================================================================= +// Tester +// ============================================================================= + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_cascade_no_cycle() { + let chain = vec!["aaa".to_string(), "bbb".to_string()]; + assert!(check_cascade_safety("ccc", "ddd", &chain).is_ok()); + } + + #[test] + fn test_cascade_direct_cycle() { + // A → B → A: syklus! + let chain = vec!["aaa".to_string()]; + let result = check_cascade_safety("bbb", "aaa", &chain); + assert!(result.is_err()); + assert!(result.unwrap_err().contains("Syklus")); + } + + #[test] + fn test_cascade_self_trigger() { + // A → A: syklus via seg selv + let chain: Vec = vec![]; + let result = check_cascade_safety("aaa", "aaa", &chain); + assert!(result.is_err()); + assert!(result.unwrap_err().contains("Syklus")); + } + + #[test] + fn test_cascade_indirect_cycle() { + // A → B → C → A: syklus + let chain = vec!["aaa".to_string(), "bbb".to_string()]; + let result = check_cascade_safety("ccc", "aaa", &chain); + assert!(result.is_err()); + } + + #[test] + fn test_cascade_depth_limit() { + let chain: Vec = (0..MAX_CASCADE_DEPTH) + .map(|i| format!("node-{i}")) + .collect(); + let result = check_cascade_safety("node-next", "node-target", &chain); + assert!(result.is_err()); + assert!(result.unwrap_err().contains("Dybdegrense")); + } + + #[test] + fn test_cascade_at_depth_limit_minus_one() { + // Nøyaktig under grensen — skal være OK + let chain: Vec = (0..MAX_CASCADE_DEPTH - 1) + .map(|i| format!("node-{i}")) + .collect(); + let result = check_cascade_safety("node-next", "node-target", &chain); + assert!(result.is_ok()); + } + + #[test] + fn test_cascade_empty_chain() { + // Første orkestrering i kjeden — ingen syklus mulig + let chain: Vec = vec![]; + assert!(check_cascade_safety("aaa", "bbb", &chain).is_ok()); + } + + #[test] + fn test_cascade_completed_equals_target_with_chain() { + // completed=B, target=B, chain=[A]: B trigger seg selv + let chain = vec!["aaa".to_string()]; + let result = check_cascade_safety("bbb", "bbb", &chain); + assert!(result.is_err()); + } +} diff --git a/maskinrommet/src/script_executor.rs b/maskinrommet/src/script_executor.rs index 9e80529..4323a31 100644 --- a/maskinrommet/src/script_executor.rs +++ b/maskinrommet/src/script_executor.rs @@ -28,6 +28,8 @@ pub struct ExecutionContext { pub cas_hash: Option, pub communication_id: Option, pub collection_id: Option, + /// ID til oppstrøms orkestrering (ved kaskade via triggers-edge) + pub upstream_orchestration_id: Option, } impl ExecutionContext { @@ -45,6 +47,7 @@ impl ExecutionContext { cas_hash: s("cas_hash"), communication_id: s("communication_id"), collection_id: s("collection_id"), + upstream_orchestration_id: s("upstream_orchestration_id"), } } @@ -64,6 +67,7 @@ impl ExecutionContext { "event.cas_hash" => self.cas_hash.clone(), "event.communication_id" => self.communication_id.clone(), "event.collection_id" => self.collection_id.clone(), + "event.upstream_orchestration_id" => self.upstream_orchestration_id.clone(), _ => None, } .unwrap_or_else(|| arg.to_string()) @@ -446,6 +450,7 @@ mod tests { cas_hash: Some("sha256:abc".into()), communication_id: Some("comm-456".into()), collection_id: Some("coll-789".into()), + upstream_orchestration_id: None, }; assert_eq!(ctx.substitute("{event.node_id}"), "node-123"); @@ -471,6 +476,7 @@ mod tests { cas_hash: Some("sha256:abc".into()), communication_id: None, collection_id: None, + upstream_orchestration_id: None, }; let args = vec![ @@ -495,6 +501,7 @@ mod tests { cas_hash: None, communication_id: None, collection_id: None, + upstream_orchestration_id: None, }; // Manglende variabel beholder {event.*}-syntaks diff --git a/tasks.md b/tasks.md index 91af043..d7200cc 100644 --- a/tasks.md +++ b/tasks.md @@ -324,8 +324,7 @@ automatisk eskalering av intelligens ved feil, kompilering av velprøvde mønstr - [x] 24.5 Script-executor: vaktmesteren parser kompilert script og eksekverer steg sekvensielt via generisk dispatch. VED_FEIL-håndtering. Logger i `orchestration_log`. - [x] 24.6 Orchestration UI: editor med tre visninger (Enkel/Teknisk/Kompilert) som tabber. Sanntids kompileringsfeil. Trigger-velger, "Test kjøring"-knapp, kjørehistorikk. Ref: `docs/concepts/orkestrering.md`. - [x] 24.7 AI-assistert oppretting: `synops-ai` med auto-generert systemprompt (fra cli_tool-noder) foreslår script fra fritekst-beskrivelse. Vaktmesteren validerer. Eventually-modus: lagre som work_item for Claude Code. -- [~] 24.8 Kaskade: `triggers`-edge mellom orkestreringer. Output fra én trigger neste. Syklusdeteksjon for å unngå uendelige loops. - > Påbegynt: 2026-03-18T17:50 +- [x] 24.8 Kaskade: `triggers`-edge mellom orkestreringer. Output fra én trigger neste. Syklusdeteksjon for å unngå uendelige loops. - [ ] 24.9 Seed-orkestreringer: opprett standard-orkestreringer for podcast-pipeline, publiseringsflyt, og AI-beriking basert på eksisterende hardkodet logikk i vaktmesteren. Skrives i menneskelig scriptspråk. ## Fase 25: Web Clipper — `synops-clip`