Kaskade: triggers-edge mellom orkestreringer (oppgave 24.8)
Etter vellykket pipeline-utførelse sjekker handle_orchestrate() om
orkestreringen har utgående triggers-edges til andre orchestration-noder.
Hvert gyldig mål enqueues som ny orchestrate-jobb med kaskade-kontekst.
Syklusdeteksjon via cascade_chain i jobb-payload:
- Sporer alle orchestration-IDer allerede utført i kjeden
- Blokkerer target som allerede finnes i kjeden (direkte + indirekte syklus)
- Dybdegrense på 10 ledd (MAX_CASCADE_DEPTH)
- Blokkerte kaskader logges i orchestration_log med status=skipped
Nedstrøms orkestreringer mottar:
- trigger_event: "cascade"
- upstream_orchestration_id i trigger_context
- {event.upstream_orchestration_id} tilgjengelig i script
Kaskade-feil er ikke-fatale — selve orkestreringen rapporteres som suksess.
8 nye enhetstester for syklusdeteksjon og dybdegrense.
This commit is contained in:
parent
d435f6ab33
commit
44327df370
6 changed files with 319 additions and 6 deletions
|
|
@ -387,7 +387,7 @@ Dette gjelder også feilhåndtering: et feilet VED_FEIL-steg
|
|||
blir en work_item som Claude Code løser i neste sesjon —
|
||||
reparerer scriptet, legger til manglende verktøy, osv.
|
||||
|
||||
## 7. Koblinger mellom orkestreringer
|
||||
## 7. Koblinger mellom orkestreringer (kaskade)
|
||||
|
||||
Orkestreringer er noder med edges:
|
||||
|
||||
|
|
@ -400,6 +400,34 @@ Orkestreringer er noder med edges:
|
|||
Output fra én orkestrering kan trigge neste via `triggers`-edge.
|
||||
Kaskade via edges, ikke hardkodet.
|
||||
|
||||
### Implementasjon (oppgave 24.8)
|
||||
|
||||
Etter vellykket pipeline-utførelse sjekker `handle_orchestrate()` om
|
||||
orkestreringen har utgående `triggers`-edges. For hvert gyldig mål
|
||||
(må være `node_kind = 'orchestration'`) legges en ny `orchestrate`-jobb
|
||||
i køen med:
|
||||
|
||||
- `trigger_event: "cascade"` — skiller kaskade fra primær-triggere
|
||||
- `trigger_context.upstream_orchestration_id` — ID til kilden
|
||||
- `trigger_context.op: "CASCADE"` — for betingelsesmatching
|
||||
- `cascade_chain` — liste med alle orchestration-IDer allerede utført
|
||||
|
||||
Nedstrøms script kan bruke `{event.upstream_orchestration_id}` for å
|
||||
referere til oppstrøms orkestrering.
|
||||
|
||||
### Syklusdeteksjon
|
||||
|
||||
Kaskadekjeden (`cascade_chain`) spores som en array i jobb-payloaden.
|
||||
Før enqueue av hvert mål sjekkes:
|
||||
|
||||
1. **Dybdegrense:** Maks 10 ledd i kjeden (konfigurerbart via `MAX_CASCADE_DEPTH`)
|
||||
2. **Syklussjekk:** Target-ID finnes ikke allerede i kjeden (inkludert
|
||||
orkestreringen som nettopp fullførte)
|
||||
|
||||
Blokkerte kaskader logges i `orchestration_log` med `status = 'skipped'`
|
||||
og `tool_binary = 'cascade'`. Kaskade-feil er ikke-fatale — den
|
||||
fullførte orkestreringen rapporteres fortsatt som suksess.
|
||||
|
||||
## 8. Brukergrensesnitt
|
||||
|
||||
```
|
||||
|
|
|
|||
|
|
@ -76,6 +76,7 @@ valideres i maskinrommet.
|
|||
| `source_material` | Kildemateriale (avledet node → kilde) | `{ "context": "quoted", "excerpt": "..." }` |
|
||||
| `assigned_to` | Tildelt (work_item → person/agent) | — |
|
||||
| `observes` | Overvåker (orchestration → target node) | — |
|
||||
| `triggers` | Kaskade-kobling (orchestration → orchestration) | — |
|
||||
| `derived_from` | Prosessert versjon av (f.eks. lydstudio-output → original) | — |
|
||||
| `has_studio` | Studio-sesjon (sesjon → medienode) | — |
|
||||
|
||||
|
|
|
|||
|
|
@ -378,14 +378,49 @@ async fn handle_orchestrate(
|
|||
));
|
||||
}
|
||||
|
||||
Ok(serde_json::json!({
|
||||
let result_json = serde_json::json!({
|
||||
"status": "executed",
|
||||
"orchestration_id": orch_id.to_string(),
|
||||
"steps_compiled": compiled.steps.len(),
|
||||
"steps_run": pipeline_result.steps_run,
|
||||
"steps_ok": pipeline_result.steps_ok,
|
||||
"technical": compiled.technical,
|
||||
}))
|
||||
});
|
||||
|
||||
// === Kaskade: triggers-edge til nedstrøms orkestreringer (oppgave 24.8) ===
|
||||
let cascade_chain: Vec<String> = job
|
||||
.payload
|
||||
.get("cascade_chain")
|
||||
.and_then(|v| serde_json::from_value(v.clone()).ok())
|
||||
.unwrap_or_default();
|
||||
|
||||
match crate::orchestration_trigger::enqueue_cascade(
|
||||
db,
|
||||
orch_id,
|
||||
&cascade_chain,
|
||||
&result_json,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(0) => {} // Ingen kaskade-mål
|
||||
Ok(n) => {
|
||||
tracing::info!(
|
||||
orchestration_id = %orch_id,
|
||||
cascade_targets = n,
|
||||
"Kaskade: {n} nedstrøms orkestrering(er) lagt i kø"
|
||||
);
|
||||
}
|
||||
Err(e) => {
|
||||
// Kaskade-feil er ikke-fatal — selve orkestreringen lyktes
|
||||
tracing::error!(
|
||||
orchestration_id = %orch_id,
|
||||
error = %e,
|
||||
"Feil ved kaskade-evaluering (orkestreringen selv lyktes)"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(result_json)
|
||||
}
|
||||
|
||||
/// Synops-render binary path.
|
||||
|
|
|
|||
|
|
@ -294,7 +294,8 @@ async fn enqueue_orchestration(
|
|||
"source_id": ctx.source_id.map(|id| id.to_string()),
|
||||
"target_id": ctx.target_id.map(|id| id.to_string()),
|
||||
"op": ctx.op,
|
||||
}
|
||||
},
|
||||
"cascade_chain": []
|
||||
});
|
||||
|
||||
// Prioritet 5 = normal (mellom batch-jobber og brukerforespørsler)
|
||||
|
|
@ -310,3 +311,245 @@ async fn enqueue_orchestration(
|
|||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// =============================================================================
|
||||
// Kaskade: triggers-edge mellom orkestreringer (oppgave 24.8)
|
||||
// =============================================================================
|
||||
|
||||
/// Maks kaskadedybde — sikring mot dype kjeder selv uten syklus.
|
||||
const MAX_CASCADE_DEPTH: usize = 10;
|
||||
|
||||
/// Etter vellykket orkestrering: finn `triggers`-edges til andre
|
||||
/// orkestreringer og legg dem i jobbkøen.
|
||||
///
|
||||
/// Syklusdeteksjon: `cascade_chain` inneholder alle orchestration-IDer
|
||||
/// som allerede er utført i denne kaskaden. Hvis target allerede finnes
|
||||
/// i kjeden, logges det og target hoppes over.
|
||||
///
|
||||
/// Output fra den fullførte orkestreringen sendes videre som
|
||||
/// `upstream_output` i trigger_context, slik at nedstrøms orkestreringer
|
||||
/// kan bruke `{event.upstream_*}`-variabler.
|
||||
pub async fn enqueue_cascade(
|
||||
db: &PgPool,
|
||||
completed_orchestration_id: Uuid,
|
||||
cascade_chain: &[String],
|
||||
pipeline_result: &serde_json::Value,
|
||||
) -> Result<usize, String> {
|
||||
// Sjekk dybdegrense
|
||||
if cascade_chain.len() >= MAX_CASCADE_DEPTH {
|
||||
tracing::warn!(
|
||||
orchestration_id = %completed_orchestration_id,
|
||||
depth = cascade_chain.len(),
|
||||
max = MAX_CASCADE_DEPTH,
|
||||
"Kaskade-dybdegrense nådd — stopper videre kaskade"
|
||||
);
|
||||
return Ok(0);
|
||||
}
|
||||
|
||||
// Finn alle triggers-edges fra denne orkestreringen
|
||||
let targets = sqlx::query_as::<_, (Uuid,)>(
|
||||
r#"
|
||||
SELECT target_id FROM edges
|
||||
WHERE source_id = $1
|
||||
AND edge_type = 'triggers'
|
||||
"#,
|
||||
)
|
||||
.bind(completed_orchestration_id)
|
||||
.fetch_all(db)
|
||||
.await
|
||||
.map_err(|e| format!("Feil ved henting av triggers-edges: {e}"))?;
|
||||
|
||||
if targets.is_empty() {
|
||||
return Ok(0);
|
||||
}
|
||||
|
||||
// Bygg oppdatert kaskadekjede
|
||||
let mut new_chain = cascade_chain.to_vec();
|
||||
new_chain.push(completed_orchestration_id.to_string());
|
||||
|
||||
let mut enqueued = 0;
|
||||
|
||||
for (target_id,) in &targets {
|
||||
let target_str = target_id.to_string();
|
||||
|
||||
// Syklusdeteksjon: er target allerede i kjeden?
|
||||
if new_chain.contains(&target_str) {
|
||||
tracing::warn!(
|
||||
source = %completed_orchestration_id,
|
||||
target = %target_id,
|
||||
chain = ?new_chain,
|
||||
"Syklus oppdaget i kaskade — hopper over"
|
||||
);
|
||||
// Logg syklusen i orchestration_log
|
||||
let _ = sqlx::query(
|
||||
r#"
|
||||
INSERT INTO orchestration_log
|
||||
(orchestration_id, job_id, step_number, tool_binary, args,
|
||||
is_fallback, status, error_msg, duration_ms)
|
||||
VALUES ($1, '00000000-0000-0000-0000-000000000000', 0,
|
||||
'cascade', '[]'::jsonb, false, 'skipped',
|
||||
$2, 0)
|
||||
"#,
|
||||
)
|
||||
.bind(completed_orchestration_id)
|
||||
.bind(format!(
|
||||
"Syklus oppdaget: {} allerede i kaskadekjede {:?}",
|
||||
target_id, new_chain
|
||||
))
|
||||
.execute(db)
|
||||
.await;
|
||||
continue;
|
||||
}
|
||||
|
||||
// Verifiser at target faktisk er en orchestration-node
|
||||
let is_orch = sqlx::query_scalar::<_, i64>(
|
||||
"SELECT COUNT(*) FROM nodes WHERE id = $1 AND node_kind = 'orchestration'"
|
||||
)
|
||||
.bind(target_id)
|
||||
.fetch_one(db)
|
||||
.await
|
||||
.map_err(|e| format!("Feil ved verifisering av kaskade-mål: {e}"))?;
|
||||
|
||||
if is_orch == 0 {
|
||||
tracing::warn!(
|
||||
target = %target_id,
|
||||
"triggers-edge peker til ikke-orchestration-node — hopper over"
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
||||
// Bygg payload med kaskade-kontekst
|
||||
let payload = serde_json::json!({
|
||||
"orchestration_id": target_str,
|
||||
"trigger_event": "cascade",
|
||||
"trigger_context": {
|
||||
"node_id": pipeline_result.get("orchestration_id")
|
||||
.and_then(|v| v.as_str()),
|
||||
"node_kind": "orchestration",
|
||||
"op": "CASCADE",
|
||||
"upstream_orchestration_id": completed_orchestration_id.to_string(),
|
||||
"upstream_steps_ok": pipeline_result.get("steps_ok"),
|
||||
},
|
||||
"cascade_chain": new_chain,
|
||||
});
|
||||
|
||||
crate::jobs::enqueue(db, "orchestrate", payload, None, 5)
|
||||
.await
|
||||
.map_err(|e| format!("Feil ved enqueue av kaskade-jobb: {e}"))?;
|
||||
|
||||
tracing::info!(
|
||||
source = %completed_orchestration_id,
|
||||
target = %target_id,
|
||||
depth = new_chain.len(),
|
||||
"Kaskade: nedstrøms orkestrering lagt i kø"
|
||||
);
|
||||
|
||||
enqueued += 1;
|
||||
}
|
||||
|
||||
Ok(enqueued)
|
||||
}
|
||||
|
||||
/// Hjelpefunksjon for testing: sjekk om en kaskade ville blitt blokkert
|
||||
/// av syklusdeteksjon eller dybdegrense.
|
||||
///
|
||||
/// Returnerer `Ok(true)` hvis kaskaden er trygg, `Err(grunn)` hvis blokkert.
|
||||
pub fn check_cascade_safety(
|
||||
completed_id: &str,
|
||||
target_id: &str,
|
||||
cascade_chain: &[String],
|
||||
) -> Result<bool, String> {
|
||||
if cascade_chain.len() >= MAX_CASCADE_DEPTH {
|
||||
return Err(format!(
|
||||
"Dybdegrense ({MAX_CASCADE_DEPTH}) nådd ved kjede med {} elementer",
|
||||
cascade_chain.len()
|
||||
));
|
||||
}
|
||||
|
||||
let mut chain = cascade_chain.to_vec();
|
||||
chain.push(completed_id.to_string());
|
||||
|
||||
if chain.contains(&target_id.to_string()) {
|
||||
return Err(format!(
|
||||
"Syklus: {target_id} finnes allerede i kjede {chain:?}"
|
||||
));
|
||||
}
|
||||
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
// =============================================================================
|
||||
// Tester
|
||||
// =============================================================================
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_cascade_no_cycle() {
|
||||
let chain = vec!["aaa".to_string(), "bbb".to_string()];
|
||||
assert!(check_cascade_safety("ccc", "ddd", &chain).is_ok());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_cascade_direct_cycle() {
|
||||
// A → B → A: syklus!
|
||||
let chain = vec!["aaa".to_string()];
|
||||
let result = check_cascade_safety("bbb", "aaa", &chain);
|
||||
assert!(result.is_err());
|
||||
assert!(result.unwrap_err().contains("Syklus"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_cascade_self_trigger() {
|
||||
// A → A: syklus via seg selv
|
||||
let chain: Vec<String> = vec![];
|
||||
let result = check_cascade_safety("aaa", "aaa", &chain);
|
||||
assert!(result.is_err());
|
||||
assert!(result.unwrap_err().contains("Syklus"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_cascade_indirect_cycle() {
|
||||
// A → B → C → A: syklus
|
||||
let chain = vec!["aaa".to_string(), "bbb".to_string()];
|
||||
let result = check_cascade_safety("ccc", "aaa", &chain);
|
||||
assert!(result.is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_cascade_depth_limit() {
|
||||
let chain: Vec<String> = (0..MAX_CASCADE_DEPTH)
|
||||
.map(|i| format!("node-{i}"))
|
||||
.collect();
|
||||
let result = check_cascade_safety("node-next", "node-target", &chain);
|
||||
assert!(result.is_err());
|
||||
assert!(result.unwrap_err().contains("Dybdegrense"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_cascade_at_depth_limit_minus_one() {
|
||||
// Nøyaktig under grensen — skal være OK
|
||||
let chain: Vec<String> = (0..MAX_CASCADE_DEPTH - 1)
|
||||
.map(|i| format!("node-{i}"))
|
||||
.collect();
|
||||
let result = check_cascade_safety("node-next", "node-target", &chain);
|
||||
assert!(result.is_ok());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_cascade_empty_chain() {
|
||||
// Første orkestrering i kjeden — ingen syklus mulig
|
||||
let chain: Vec<String> = vec![];
|
||||
assert!(check_cascade_safety("aaa", "bbb", &chain).is_ok());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_cascade_completed_equals_target_with_chain() {
|
||||
// completed=B, target=B, chain=[A]: B trigger seg selv
|
||||
let chain = vec!["aaa".to_string()];
|
||||
let result = check_cascade_safety("bbb", "bbb", &chain);
|
||||
assert!(result.is_err());
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -28,6 +28,8 @@ pub struct ExecutionContext {
|
|||
pub cas_hash: Option<String>,
|
||||
pub communication_id: Option<String>,
|
||||
pub collection_id: Option<String>,
|
||||
/// ID til oppstrøms orkestrering (ved kaskade via triggers-edge)
|
||||
pub upstream_orchestration_id: Option<String>,
|
||||
}
|
||||
|
||||
impl ExecutionContext {
|
||||
|
|
@ -45,6 +47,7 @@ impl ExecutionContext {
|
|||
cas_hash: s("cas_hash"),
|
||||
communication_id: s("communication_id"),
|
||||
collection_id: s("collection_id"),
|
||||
upstream_orchestration_id: s("upstream_orchestration_id"),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -64,6 +67,7 @@ impl ExecutionContext {
|
|||
"event.cas_hash" => self.cas_hash.clone(),
|
||||
"event.communication_id" => self.communication_id.clone(),
|
||||
"event.collection_id" => self.collection_id.clone(),
|
||||
"event.upstream_orchestration_id" => self.upstream_orchestration_id.clone(),
|
||||
_ => None,
|
||||
}
|
||||
.unwrap_or_else(|| arg.to_string())
|
||||
|
|
@ -446,6 +450,7 @@ mod tests {
|
|||
cas_hash: Some("sha256:abc".into()),
|
||||
communication_id: Some("comm-456".into()),
|
||||
collection_id: Some("coll-789".into()),
|
||||
upstream_orchestration_id: None,
|
||||
};
|
||||
|
||||
assert_eq!(ctx.substitute("{event.node_id}"), "node-123");
|
||||
|
|
@ -471,6 +476,7 @@ mod tests {
|
|||
cas_hash: Some("sha256:abc".into()),
|
||||
communication_id: None,
|
||||
collection_id: None,
|
||||
upstream_orchestration_id: None,
|
||||
};
|
||||
|
||||
let args = vec![
|
||||
|
|
@ -495,6 +501,7 @@ mod tests {
|
|||
cas_hash: None,
|
||||
communication_id: None,
|
||||
collection_id: None,
|
||||
upstream_orchestration_id: None,
|
||||
};
|
||||
|
||||
// Manglende variabel beholder {event.*}-syntaks
|
||||
|
|
|
|||
3
tasks.md
3
tasks.md
|
|
@ -324,8 +324,7 @@ automatisk eskalering av intelligens ved feil, kompilering av velprøvde mønstr
|
|||
- [x] 24.5 Script-executor: vaktmesteren parser kompilert script og eksekverer steg sekvensielt via generisk dispatch. VED_FEIL-håndtering. Logger i `orchestration_log`.
|
||||
- [x] 24.6 Orchestration UI: editor med tre visninger (Enkel/Teknisk/Kompilert) som tabber. Sanntids kompileringsfeil. Trigger-velger, "Test kjøring"-knapp, kjørehistorikk. Ref: `docs/concepts/orkestrering.md`.
|
||||
- [x] 24.7 AI-assistert oppretting: `synops-ai` med auto-generert systemprompt (fra cli_tool-noder) foreslår script fra fritekst-beskrivelse. Vaktmesteren validerer. Eventually-modus: lagre som work_item for Claude Code.
|
||||
- [~] 24.8 Kaskade: `triggers`-edge mellom orkestreringer. Output fra én trigger neste. Syklusdeteksjon for å unngå uendelige loops.
|
||||
> Påbegynt: 2026-03-18T17:50
|
||||
- [x] 24.8 Kaskade: `triggers`-edge mellom orkestreringer. Output fra én trigger neste. Syklusdeteksjon for å unngå uendelige loops.
|
||||
- [ ] 24.9 Seed-orkestreringer: opprett standard-orkestreringer for podcast-pipeline, publiseringsflyt, og AI-beriking basert på eksisterende hardkodet logikk i vaktmesteren. Skrives i menneskelig scriptspråk.
|
||||
|
||||
## Fase 25: Web Clipper — `synops-clip`
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue