From 059c776bf49378f545118b857f958d43c0017471 Mon Sep 17 00:00:00 2001 From: vegard Date: Wed, 18 Mar 2026 17:18:10 +0000 Subject: [PATCH] Script-executor: vaktmesteren eksekverer kompilerte pipelines (oppgave 24.5) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Ny modul script_executor.rs som tar en kompilert pipeline fra script_compiler og kjører stegene sekvensielt: - Substituerer {event.*}-variabler fra trigger-kontekst - Spawner hvert CLI-verktøy som subprosess via generisk dispatch - VED_FEIL-håndtering: steg-fallback → global fallback → stopp - Spesialhåndtering av work_item (oppretter oppgave-node i grafen) - Logger hvert steg i ny orchestration_log-tabell handle_orchestrate i jobs.rs utvides: kompilerer + utfører i samme jobb (var tidligere kun kompilering). Migration 023: orchestration_log-tabell med indekser for effektiv spørring per orkestrering og per jobb. --- maskinrommet/src/jobs.rs | 55 ++- maskinrommet/src/main.rs | 1 + maskinrommet/src/script_executor.rs | 503 +++++++++++++++++++++++++++ migrations/023_orchestration_log.sql | 22 ++ tasks.md | 3 +- 5 files changed, 573 insertions(+), 11 deletions(-) create mode 100644 maskinrommet/src/script_executor.rs create mode 100644 migrations/023_orchestration_log.sql diff --git a/maskinrommet/src/jobs.rs b/maskinrommet/src/jobs.rs index 98ea1fa..dcf5fbd 100644 --- a/maskinrommet/src/jobs.rs +++ b/maskinrommet/src/jobs.rs @@ -25,6 +25,7 @@ use crate::pg_writes; use crate::publishing::IndexCache; use crate::resources::{self, PriorityRules}; use crate::script_compiler; +use crate::script_executor; use crate::summarize; use crate::transcribe; use crate::tts; @@ -228,13 +229,15 @@ async fn dispatch( } } -/// Handler for `orchestrate`-jobb — kompilerer orchestration-script. +/// Handler for `orchestrate`-jobb — kompilerer og utfører orchestration-script. /// -/// Henter orchestration-nodens `content` (menneskelig script), -/// kompilerer det via script_compiler, og lagrer resultatet -/// i nodens `metadata.pipeline`. -/// -/// Utførelse av kompilert pipeline kommer i oppgave 24.5. +/// Flyt: +/// 1. Henter orchestration-nodens `content` (menneskelig script) +/// 2. Kompilerer via script_compiler → lagrer pipeline i metadata +/// 3. Substituerer {event.*}-variabler fra trigger-kontekst +/// 4. Utfører steg sekvensielt via generisk dispatch (script_executor) +/// 5. VED_FEIL-håndtering: steg-fallback → global fallback → stopp +/// 6. Logger hvert steg i orchestration_log async fn handle_orchestrate( job: &JobRow, db: &PgPool, @@ -340,13 +343,47 @@ async fn handle_orchestrate( tracing::info!( orchestration_id = %orch_id, steps = compiled.steps.len(), - "Orchestration-script kompilert" + "Orchestration-script kompilert — starter utførelse" ); + // === Utførelse (oppgave 24.5) === + // Bygg ExecutionContext fra trigger_context i payload + let exec_ctx = script_executor::ExecutionContext::from_payload(&job.payload); + + let pipeline_result = script_executor::execute_pipeline( + db, + orch_id, + job.id, + &compiled.steps, + compiled.global_fallback.as_ref(), + &exec_ctx, + ) + .await; + + tracing::info!( + orchestration_id = %orch_id, + steps_run = pipeline_result.steps_run, + steps_ok = pipeline_result.steps_ok, + steps_failed = pipeline_result.steps_failed, + aborted = pipeline_result.aborted, + "Pipeline utført" + ); + + if pipeline_result.aborted { + return Err(format!( + "Pipeline avbrutt etter {}/{} steg: {}", + pipeline_result.steps_ok, + pipeline_result.steps_run, + pipeline_result.error.as_deref().unwrap_or("ukjent feil"), + )); + } + Ok(serde_json::json!({ - "status": "compiled", + "status": "executed", "orchestration_id": orch_id.to_string(), - "steps": compiled.steps.len(), + "steps_compiled": compiled.steps.len(), + "steps_run": pipeline_result.steps_run, + "steps_ok": pipeline_result.steps_ok, "technical": compiled.technical, })) } diff --git a/maskinrommet/src/main.rs b/maskinrommet/src/main.rs index 4edc52f..681a11f 100644 --- a/maskinrommet/src/main.rs +++ b/maskinrommet/src/main.rs @@ -27,6 +27,7 @@ pub mod ws; pub mod mixer; pub mod orchestration_trigger; pub mod script_compiler; +pub mod script_executor; pub mod tiptap; pub mod transcribe; pub mod tts; diff --git a/maskinrommet/src/script_executor.rs b/maskinrommet/src/script_executor.rs new file mode 100644 index 0000000..9e80529 --- /dev/null +++ b/maskinrommet/src/script_executor.rs @@ -0,0 +1,503 @@ +//! Script-executor — vaktmesteren parser kompilert script og +//! eksekverer steg sekvensielt via generisk dispatch. +//! +//! Substituerer {event.*}-variabler fra trigger-kontekst, +//! kjører hvert steg som CLI-prosess, håndterer VED_FEIL-fallback, +//! og logger hvert steg i `orchestration_log`. +//! +//! Ref: docs/concepts/orkestrering.md § 6 + +use serde_json::Value; +use sqlx::PgPool; +use std::process::Stdio; +use uuid::Uuid; + +use crate::cli_dispatch; +use crate::script_compiler::CompiledStep; + +/// Trigger-kontekst for variabel-substitusjon. +/// Bygges fra jobbens `payload.trigger_context`. +#[derive(Debug, Clone)] +pub struct ExecutionContext { + pub node_id: Option, + pub node_kind: Option, + pub edge_type: Option, + pub source_id: Option, + pub target_id: Option, + pub op: Option, + pub cas_hash: Option, + pub communication_id: Option, + pub collection_id: Option, +} + +impl ExecutionContext { + /// Bygg fra trigger_context-JSON i jobb-payload. + pub fn from_payload(payload: &Value) -> Self { + let ctx = payload.get("trigger_context").cloned().unwrap_or_default(); + let s = |key: &str| ctx.get(key).and_then(|v| v.as_str()).map(|s| s.to_string()); + ExecutionContext { + node_id: s("node_id"), + node_kind: s("node_kind"), + edge_type: s("edge_type"), + source_id: s("source_id"), + target_id: s("target_id"), + op: s("op"), + cas_hash: s("cas_hash"), + communication_id: s("communication_id"), + collection_id: s("collection_id"), + } + } + + /// Substituerer {event.*}-variabler i en streng. + fn substitute(&self, arg: &str) -> String { + if !arg.starts_with('{') || !arg.ends_with('}') { + return arg.to_string(); + } + let inner = &arg[1..arg.len() - 1]; + match inner { + "event.node_id" => self.node_id.clone(), + "event.node_kind" => self.node_kind.clone(), + "event.edge_type" => self.edge_type.clone(), + "event.source_id" => self.source_id.clone(), + "event.target_id" => self.target_id.clone(), + "event.op" => self.op.clone(), + "event.cas_hash" => self.cas_hash.clone(), + "event.communication_id" => self.communication_id.clone(), + "event.collection_id" => self.collection_id.clone(), + _ => None, + } + .unwrap_or_else(|| arg.to_string()) + } + + /// Substituerer alle args. + fn substitute_args(&self, args: &[String]) -> Vec { + args.iter().map(|a| self.substitute(a)).collect() + } +} + +/// Resultat av å kjøre hele pipelinen. +#[derive(Debug)] +pub struct PipelineResult { + pub steps_run: usize, + pub steps_ok: usize, + pub steps_failed: usize, + pub aborted: bool, + pub error: Option, +} + +/// Kjør en kompilert pipeline sekvensielt. +/// +/// For hvert steg: +/// 1. Substituerer variabler +/// 2. Kjører CLI-verktøyet +/// 3. Ved feil: prøver steg-fallback, deretter global fallback +/// 4. Logger i orchestration_log +/// +/// Stopper ved første feil som ikke håndteres av fallback. +pub async fn execute_pipeline( + db: &PgPool, + orchestration_id: Uuid, + job_id: Uuid, + steps: &[CompiledStep], + global_fallback: Option<&CompiledStep>, + ctx: &ExecutionContext, +) -> PipelineResult { + let mut result = PipelineResult { + steps_run: 0, + steps_ok: 0, + steps_failed: 0, + aborted: false, + error: None, + }; + + for step in steps { + result.steps_run += 1; + + let step_result = execute_step(db, orchestration_id, job_id, step, ctx).await; + + match step_result { + Ok(_) => { + result.steps_ok += 1; + } + Err(err) => { + tracing::warn!( + orchestration_id = %orchestration_id, + step = step.step_number, + binary = %step.binary, + error = %err, + "Steg feilet, prøver fallback" + ); + + // Prøv steg-fallback (VED_FEIL per steg) + let mut recovered = false; + if let Some(ref fallback) = step.fallback { + tracing::info!( + orchestration_id = %orchestration_id, + step = step.step_number, + fallback_binary = %fallback.binary, + "Kjører steg-fallback" + ); + match execute_step(db, orchestration_id, job_id, fallback, ctx).await { + Ok(_) => { + recovered = true; + result.steps_ok += 1; + } + Err(fb_err) => { + tracing::warn!( + orchestration_id = %orchestration_id, + step = step.step_number, + error = %fb_err, + "Steg-fallback feilet også" + ); + } + } + } + + if !recovered { + // Prøv global fallback + if let Some(gf) = global_fallback { + tracing::info!( + orchestration_id = %orchestration_id, + "Kjører global fallback" + ); + let gf_result = execute_step(db, orchestration_id, job_id, gf, ctx).await; + if let Err(gf_err) = &gf_result { + tracing::error!( + orchestration_id = %orchestration_id, + error = %gf_err, + "Global fallback feilet" + ); + } + // Uansett om global fallback lyktes: stopp pipelinen + } + + result.steps_failed += 1; + result.aborted = true; + result.error = Some(format!( + "Steg {} ({}) feilet: {}", + step.step_number, step.binary, err + )); + break; + } + } + } + } + + result +} + +/// Kjør ett steg: spawn CLI-prosess, vent på resultat, logg i orchestration_log. +async fn execute_step( + db: &PgPool, + orchestration_id: Uuid, + job_id: Uuid, + step: &CompiledStep, + ctx: &ExecutionContext, +) -> Result { + let resolved_args = ctx.substitute_args(&step.args); + let is_fallback = step.step_number == 0; + + tracing::info!( + orchestration_id = %orchestration_id, + step = step.step_number, + binary = %step.binary, + args = ?resolved_args, + is_fallback = is_fallback, + "Kjører steg" + ); + + let start = std::time::Instant::now(); + + // Spesialtilfelle: work_item oppretter en oppgave-node i PG + let result = if step.binary == "work_item" { + execute_work_item(db, orchestration_id, &resolved_args).await + } else { + execute_cli_tool(&step.binary, &resolved_args).await + }; + + let duration_ms = start.elapsed().as_millis() as i32; + + // Logg i orchestration_log + let (status, exit_code, result_json, error_msg) = match &result { + Ok(val) => ("ok", Some(0i16), Some(val.clone()), None), + Err(err) => ("error", None, None, Some(err.clone())), + }; + + if let Err(e) = log_step( + db, + orchestration_id, + job_id, + step.step_number as i16, + &step.binary, + &resolved_args, + is_fallback, + status, + exit_code, + result_json.as_ref(), + error_msg.as_deref(), + duration_ms, + ) + .await + { + tracing::error!( + orchestration_id = %orchestration_id, + error = %e, + "Kunne ikke logge steg i orchestration_log" + ); + } + + result +} + +/// Kjør et CLI-verktøy som subprosess. +async fn execute_cli_tool(binary: &str, args: &[String]) -> Result { + let mut cmd = tokio::process::Command::new(binary); + for arg in args { + cmd.arg(arg); + } + + // Sett DATABASE_URL for verktøy som trenger det + cli_dispatch::set_database_url(&mut cmd)?; + cli_dispatch::forward_env(&mut cmd, "CAS_ROOT"); + cli_dispatch::forward_env(&mut cmd, "WHISPER_URL"); + cli_dispatch::forward_env(&mut cmd, "LITELLM_URL"); + cli_dispatch::forward_env(&mut cmd, "LITELLM_API_KEY"); + + cmd.stdout(Stdio::piped()).stderr(Stdio::piped()); + + let child = cmd + .spawn() + .map_err(|e| format!("Kunne ikke starte {binary}: {e}"))?; + + let output = child + .wait_with_output() + .await + .map_err(|e| format!("Feil ved kjøring av {binary}: {e}"))?; + + let stderr = String::from_utf8_lossy(&output.stderr); + if !stderr.is_empty() { + tracing::info!(stderr = %stderr, "{binary} stderr"); + } + + if !output.status.success() { + let code = output.status.code().unwrap_or(-1); + return Err(format!("{binary} feilet (exit {code}): {stderr}")); + } + + let stdout = String::from_utf8_lossy(&output.stdout); + if stdout.trim().is_empty() { + // Noen verktøy returnerer ingenting — det er OK + return Ok(serde_json::json!({"status": "ok"})); + } + serde_json::from_str(&stdout) + .map_err(|e| format!("Kunne ikke parse {binary} output: {e}")) +} + +/// Spesialhåndtering av `work_item`: opprett en oppgave-node i grafen. +/// Args: ["", "--tag", "bug", "--tag", "feature", ...] +async fn execute_work_item( + db: &PgPool, + orchestration_id: Uuid, + args: &[String], +) -> Result { + // Ekstraher tittel (første arg, strip anførselstegn) + let title = args + .first() + .map(|s| s.trim_matches('"').to_string()) + .ok_or("work_item mangler tittel")?; + + // Ekstraher tags + let mut tags = Vec::new(); + let mut i = 1; + while i < args.len() { + if args[i] == "--tag" && i + 1 < args.len() { + tags.push(args[i + 1].clone()); + i += 2; + } else { + i += 1; + } + } + + let metadata = serde_json::json!({ + "source": "orchestration", + "orchestration_id": orchestration_id.to_string(), + "tags": tags, + }); + + // Opprett work_item-node + let node_id = sqlx::query_scalar::<_, Uuid>( + r#" + INSERT INTO nodes (node_kind, title, metadata, visibility) + VALUES ('work_item', $1, $2, 'private') + RETURNING id + "#, + ) + .bind(&title) + .bind(&metadata) + .fetch_one(db) + .await + .map_err(|e| format!("Feil ved oppretting av work_item: {e}"))?; + + // Opprett source_material-edge tilbake til orkestreringen + let _ = sqlx::query( + r#" + INSERT INTO edges (source_id, target_id, edge_type) + VALUES ($1, $2, 'source_material') + "#, + ) + .bind(node_id) + .bind(orchestration_id) + .execute(db) + .await; + + tracing::info!( + node_id = %node_id, + title = %title, + tags = ?tags, + "work_item opprettet" + ); + + Ok(serde_json::json!({ + "status": "ok", + "node_id": node_id.to_string(), + "title": title, + })) +} + +/// Skriv en rad i orchestration_log. +async fn log_step( + db: &PgPool, + orchestration_id: Uuid, + job_id: Uuid, + step_number: i16, + binary: &str, + args: &[String], + is_fallback: bool, + status: &str, + exit_code: Option, + result: Option<&Value>, + error_msg: Option<&str>, + duration_ms: i32, +) -> Result<(), sqlx::Error> { + let args_json = serde_json::to_value(args).unwrap_or_default(); + sqlx::query( + r#" + INSERT INTO orchestration_log + (orchestration_id, job_id, step_number, tool_binary, args, is_fallback, + status, exit_code, result, error_msg, duration_ms) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) + "#, + ) + .bind(orchestration_id) + .bind(job_id) + .bind(step_number) + .bind(binary) + .bind(&args_json) + .bind(is_fallback) + .bind(status) + .bind(exit_code) + .bind(result) + .bind(error_msg) + .bind(duration_ms) + .execute(db) + .await?; + Ok(()) +} + +// ============================================================================= +// Tester +// ============================================================================= + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + + #[test] + fn test_execution_context_from_payload() { + let payload = json!({ + "orchestration_id": "abc", + "trigger_event": "node.created", + "trigger_context": { + "node_id": "aaaa-bbbb", + "node_kind": "content", + "op": "INSERT", + "cas_hash": "sha256:deadbeef" + } + }); + let ctx = ExecutionContext::from_payload(&payload); + assert_eq!(ctx.node_id.as_deref(), Some("aaaa-bbbb")); + assert_eq!(ctx.node_kind.as_deref(), Some("content")); + assert_eq!(ctx.op.as_deref(), Some("INSERT")); + assert_eq!(ctx.cas_hash.as_deref(), Some("sha256:deadbeef")); + assert!(ctx.edge_type.is_none()); + assert!(ctx.communication_id.is_none()); + } + + #[test] + fn test_substitute_variables() { + let ctx = ExecutionContext { + node_id: Some("node-123".into()), + node_kind: Some("content".into()), + edge_type: None, + source_id: None, + target_id: None, + op: Some("INSERT".into()), + cas_hash: Some("sha256:abc".into()), + communication_id: Some("comm-456".into()), + collection_id: Some("coll-789".into()), + }; + + assert_eq!(ctx.substitute("{event.node_id}"), "node-123"); + assert_eq!(ctx.substitute("{event.cas_hash}"), "sha256:abc"); + assert_eq!(ctx.substitute("{event.communication_id}"), "comm-456"); + assert_eq!(ctx.substitute("{event.collection_id}"), "coll-789"); + // Ukjent variabel returneres uendret + assert_eq!(ctx.substitute("{event.unknown}"), "{event.unknown}"); + // Ikke-variabel returneres uendret + assert_eq!(ctx.substitute("--model"), "--model"); + assert_eq!(ctx.substitute("large"), "large"); + } + + #[test] + fn test_substitute_args() { + let ctx = ExecutionContext { + node_id: None, + node_kind: None, + edge_type: None, + source_id: None, + target_id: None, + op: None, + cas_hash: Some("sha256:abc".into()), + communication_id: None, + collection_id: None, + }; + + let args = vec![ + "--cas-hash".to_string(), + "{event.cas_hash}".to_string(), + "--model".to_string(), + "large".to_string(), + ]; + let resolved = ctx.substitute_args(&args); + assert_eq!(resolved, vec!["--cas-hash", "sha256:abc", "--model", "large"]); + } + + #[test] + fn test_substitute_missing_variable_keeps_template() { + let ctx = ExecutionContext { + node_id: None, + node_kind: None, + edge_type: None, + source_id: None, + target_id: None, + op: None, + cas_hash: None, + communication_id: None, + collection_id: None, + }; + + // Manglende variabel beholder {event.*}-syntaks + assert_eq!(ctx.substitute("{event.cas_hash}"), "{event.cas_hash}"); + } +} diff --git a/migrations/023_orchestration_log.sql b/migrations/023_orchestration_log.sql new file mode 100644 index 0000000..0df19ad --- /dev/null +++ b/migrations/023_orchestration_log.sql @@ -0,0 +1,22 @@ +-- Orchestration execution log. +-- Hver rad er ett kjørt steg (eller fallback) i en orkestrering. +-- Vaktmesteren logger hit under script-utførelse. + +CREATE TABLE IF NOT EXISTS orchestration_log ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + orchestration_id UUID NOT NULL REFERENCES nodes(id) ON DELETE CASCADE, + job_id UUID REFERENCES job_queue(id) ON DELETE SET NULL, + step_number SMALLINT NOT NULL, -- 0 = global fallback + tool_binary TEXT NOT NULL, -- CLI-verktøy som ble kjørt + args JSONB NOT NULL DEFAULT '[]', -- argumenter (etter variabel-substitusjon) + is_fallback BOOLEAN NOT NULL DEFAULT false, + status TEXT NOT NULL CHECK (status IN ('ok', 'error', 'skipped')), + exit_code SMALLINT, -- prosessens exit-kode + result JSONB, -- stdout JSON fra verktøyet + error_msg TEXT, -- stderr / feilmelding + duration_ms INTEGER, -- kjøretid i millisekunder + created_at TIMESTAMPTZ NOT NULL DEFAULT now() +); + +CREATE INDEX idx_orchestration_log_orch ON orchestration_log (orchestration_id, created_at DESC); +CREATE INDEX idx_orchestration_log_job ON orchestration_log (job_id); diff --git a/tasks.md b/tasks.md index 588603e..8fc9217 100644 --- a/tasks.md +++ b/tasks.md @@ -320,8 +320,7 @@ automatisk eskalering av intelligens ved feil, kompilering av velprøvde mønstr - [x] 24.2 Trigger-evaluering i portvokteren: ved node/edge-events, sjekk om noen `orchestration`-noder matcher triggeren. Effektiv lookup (indeksert på `metadata.trigger.event`). Ingen LLM-kall for trigger-matching. - [x] 24.3 Script-kompilator: parser menneskelig scriptspråk ("transkriber lydfilen (stor modell)") og kompilerer til tekniske CLI-kall. Matcher verb mot `cli_tool`-noders `aliases`, argumenter mot `args_hints`, variabler fra trigger-kontekst. Rust-stil kompileringsfeil med forslag. - [x] 24.4 cli_tool alias-metadata: utvid alle `cli_tool`-noder med `aliases` (norske verb) og `args_hints` (menneskelige argumenter → CLI-flagg). Seed for alle eksisterende verktøy. -- [~] 24.5 Script-executor: vaktmesteren parser kompilert script og eksekverer steg sekvensielt via generisk dispatch. VED_FEIL-håndtering. Logger i `orchestration_log`. - > Påbegynt: 2026-03-18T17:10 +- [x] 24.5 Script-executor: vaktmesteren parser kompilert script og eksekverer steg sekvensielt via generisk dispatch. VED_FEIL-håndtering. Logger i `orchestration_log`. - [ ] 24.6 Orchestration UI: editor med tre visninger (Enkel/Teknisk/Kompilert) som tabber. Sanntids kompileringsfeil. Trigger-velger, "Test kjøring"-knapp, kjørehistorikk. Ref: `docs/concepts/orkestrering.md`. - [ ] 24.7 AI-assistert oppretting: `synops-ai` med auto-generert systemprompt (fra cli_tool-noder) foreslår script fra fritekst-beskrivelse. Vaktmesteren validerer. Eventually-modus: lagre som work_item for Claude Code. - [ ] 24.8 Kaskade: `triggers`-edge mellom orkestreringer. Output fra én trigger neste. Syklusdeteksjon for å unngå uendelige loops.