//! Script-executor — vaktmesteren parser kompilert script og //! eksekverer steg sekvensielt via generisk dispatch. //! //! Substituerer {event.*}-variabler fra trigger-kontekst, //! kjører hvert steg som CLI-prosess, håndterer VED_FEIL-fallback, //! og logger hvert steg i `orchestration_log`. //! //! Ref: docs/concepts/orkestrering.md § 6 use serde_json::Value; use sqlx::PgPool; use std::process::Stdio; use uuid::Uuid; use crate::cli_dispatch; use crate::script_compiler::CompiledStep; /// Trigger-kontekst for variabel-substitusjon. /// Bygges fra jobbens `payload.trigger_context`. #[derive(Debug, Clone)] pub struct ExecutionContext { pub node_id: Option, pub node_kind: Option, pub edge_type: Option, pub source_id: Option, pub target_id: Option, pub op: Option, pub cas_hash: Option, pub communication_id: Option, pub collection_id: Option, } impl ExecutionContext { /// Bygg fra trigger_context-JSON i jobb-payload. pub fn from_payload(payload: &Value) -> Self { let ctx = payload.get("trigger_context").cloned().unwrap_or_default(); let s = |key: &str| ctx.get(key).and_then(|v| v.as_str()).map(|s| s.to_string()); ExecutionContext { node_id: s("node_id"), node_kind: s("node_kind"), edge_type: s("edge_type"), source_id: s("source_id"), target_id: s("target_id"), op: s("op"), cas_hash: s("cas_hash"), communication_id: s("communication_id"), collection_id: s("collection_id"), } } /// Substituerer {event.*}-variabler i en streng. fn substitute(&self, arg: &str) -> String { if !arg.starts_with('{') || !arg.ends_with('}') { return arg.to_string(); } let inner = &arg[1..arg.len() - 1]; match inner { "event.node_id" => self.node_id.clone(), "event.node_kind" => self.node_kind.clone(), "event.edge_type" => self.edge_type.clone(), "event.source_id" => self.source_id.clone(), "event.target_id" => self.target_id.clone(), "event.op" => self.op.clone(), "event.cas_hash" => self.cas_hash.clone(), "event.communication_id" => self.communication_id.clone(), "event.collection_id" => self.collection_id.clone(), _ => None, } .unwrap_or_else(|| arg.to_string()) } /// Substituerer alle args. fn substitute_args(&self, args: &[String]) -> Vec { args.iter().map(|a| self.substitute(a)).collect() } } /// Resultat av å kjøre hele pipelinen. #[derive(Debug)] pub struct PipelineResult { pub steps_run: usize, pub steps_ok: usize, pub steps_failed: usize, pub aborted: bool, pub error: Option, } /// Kjør en kompilert pipeline sekvensielt. /// /// For hvert steg: /// 1. Substituerer variabler /// 2. Kjører CLI-verktøyet /// 3. Ved feil: prøver steg-fallback, deretter global fallback /// 4. Logger i orchestration_log /// /// Stopper ved første feil som ikke håndteres av fallback. pub async fn execute_pipeline( db: &PgPool, orchestration_id: Uuid, job_id: Uuid, steps: &[CompiledStep], global_fallback: Option<&CompiledStep>, ctx: &ExecutionContext, ) -> PipelineResult { let mut result = PipelineResult { steps_run: 0, steps_ok: 0, steps_failed: 0, aborted: false, error: None, }; for step in steps { result.steps_run += 1; let step_result = execute_step(db, orchestration_id, job_id, step, ctx).await; match step_result { Ok(_) => { result.steps_ok += 1; } Err(err) => { tracing::warn!( orchestration_id = %orchestration_id, step = step.step_number, binary = %step.binary, error = %err, "Steg feilet, prøver fallback" ); // Prøv steg-fallback (VED_FEIL per steg) let mut recovered = false; if let Some(ref fallback) = step.fallback { tracing::info!( orchestration_id = %orchestration_id, step = step.step_number, fallback_binary = %fallback.binary, "Kjører steg-fallback" ); match execute_step(db, orchestration_id, job_id, fallback, ctx).await { Ok(_) => { recovered = true; result.steps_ok += 1; } Err(fb_err) => { tracing::warn!( orchestration_id = %orchestration_id, step = step.step_number, error = %fb_err, "Steg-fallback feilet også" ); } } } if !recovered { // Prøv global fallback if let Some(gf) = global_fallback { tracing::info!( orchestration_id = %orchestration_id, "Kjører global fallback" ); let gf_result = execute_step(db, orchestration_id, job_id, gf, ctx).await; if let Err(gf_err) = &gf_result { tracing::error!( orchestration_id = %orchestration_id, error = %gf_err, "Global fallback feilet" ); } // Uansett om global fallback lyktes: stopp pipelinen } result.steps_failed += 1; result.aborted = true; result.error = Some(format!( "Steg {} ({}) feilet: {}", step.step_number, step.binary, err )); break; } } } } result } /// Kjør ett steg: spawn CLI-prosess, vent på resultat, logg i orchestration_log. async fn execute_step( db: &PgPool, orchestration_id: Uuid, job_id: Uuid, step: &CompiledStep, ctx: &ExecutionContext, ) -> Result { let resolved_args = ctx.substitute_args(&step.args); let is_fallback = step.step_number == 0; tracing::info!( orchestration_id = %orchestration_id, step = step.step_number, binary = %step.binary, args = ?resolved_args, is_fallback = is_fallback, "Kjører steg" ); let start = std::time::Instant::now(); // Spesialtilfelle: work_item oppretter en oppgave-node i PG let result = if step.binary == "work_item" { execute_work_item(db, orchestration_id, &resolved_args).await } else { execute_cli_tool(&step.binary, &resolved_args).await }; let duration_ms = start.elapsed().as_millis() as i32; // Logg i orchestration_log let (status, exit_code, result_json, error_msg) = match &result { Ok(val) => ("ok", Some(0i16), Some(val.clone()), None), Err(err) => ("error", None, None, Some(err.clone())), }; if let Err(e) = log_step( db, orchestration_id, job_id, step.step_number as i16, &step.binary, &resolved_args, is_fallback, status, exit_code, result_json.as_ref(), error_msg.as_deref(), duration_ms, ) .await { tracing::error!( orchestration_id = %orchestration_id, error = %e, "Kunne ikke logge steg i orchestration_log" ); } result } /// Kjør et CLI-verktøy som subprosess. async fn execute_cli_tool(binary: &str, args: &[String]) -> Result { let mut cmd = tokio::process::Command::new(binary); for arg in args { cmd.arg(arg); } // Sett DATABASE_URL for verktøy som trenger det cli_dispatch::set_database_url(&mut cmd)?; cli_dispatch::forward_env(&mut cmd, "CAS_ROOT"); cli_dispatch::forward_env(&mut cmd, "WHISPER_URL"); cli_dispatch::forward_env(&mut cmd, "LITELLM_URL"); cli_dispatch::forward_env(&mut cmd, "LITELLM_API_KEY"); cmd.stdout(Stdio::piped()).stderr(Stdio::piped()); let child = cmd .spawn() .map_err(|e| format!("Kunne ikke starte {binary}: {e}"))?; let output = child .wait_with_output() .await .map_err(|e| format!("Feil ved kjøring av {binary}: {e}"))?; let stderr = String::from_utf8_lossy(&output.stderr); if !stderr.is_empty() { tracing::info!(stderr = %stderr, "{binary} stderr"); } if !output.status.success() { let code = output.status.code().unwrap_or(-1); return Err(format!("{binary} feilet (exit {code}): {stderr}")); } let stdout = String::from_utf8_lossy(&output.stdout); if stdout.trim().is_empty() { // Noen verktøy returnerer ingenting — det er OK return Ok(serde_json::json!({"status": "ok"})); } serde_json::from_str(&stdout) .map_err(|e| format!("Kunne ikke parse {binary} output: {e}")) } /// Spesialhåndtering av `work_item`: opprett en oppgave-node i grafen. /// Args: ["", "--tag", "bug", "--tag", "feature", ...] async fn execute_work_item( db: &PgPool, orchestration_id: Uuid, args: &[String], ) -> Result { // Ekstraher tittel (første arg, strip anførselstegn) let title = args .first() .map(|s| s.trim_matches('"').to_string()) .ok_or("work_item mangler tittel")?; // Ekstraher tags let mut tags = Vec::new(); let mut i = 1; while i < args.len() { if args[i] == "--tag" && i + 1 < args.len() { tags.push(args[i + 1].clone()); i += 2; } else { i += 1; } } let metadata = serde_json::json!({ "source": "orchestration", "orchestration_id": orchestration_id.to_string(), "tags": tags, }); // Opprett work_item-node let node_id = sqlx::query_scalar::<_, Uuid>( r#" INSERT INTO nodes (node_kind, title, metadata, visibility) VALUES ('work_item', $1, $2, 'private') RETURNING id "#, ) .bind(&title) .bind(&metadata) .fetch_one(db) .await .map_err(|e| format!("Feil ved oppretting av work_item: {e}"))?; // Opprett source_material-edge tilbake til orkestreringen let _ = sqlx::query( r#" INSERT INTO edges (source_id, target_id, edge_type) VALUES ($1, $2, 'source_material') "#, ) .bind(node_id) .bind(orchestration_id) .execute(db) .await; tracing::info!( node_id = %node_id, title = %title, tags = ?tags, "work_item opprettet" ); Ok(serde_json::json!({ "status": "ok", "node_id": node_id.to_string(), "title": title, })) } /// Skriv en rad i orchestration_log. async fn log_step( db: &PgPool, orchestration_id: Uuid, job_id: Uuid, step_number: i16, binary: &str, args: &[String], is_fallback: bool, status: &str, exit_code: Option, result: Option<&Value>, error_msg: Option<&str>, duration_ms: i32, ) -> Result<(), sqlx::Error> { let args_json = serde_json::to_value(args).unwrap_or_default(); sqlx::query( r#" INSERT INTO orchestration_log (orchestration_id, job_id, step_number, tool_binary, args, is_fallback, status, exit_code, result, error_msg, duration_ms) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) "#, ) .bind(orchestration_id) .bind(job_id) .bind(step_number) .bind(binary) .bind(&args_json) .bind(is_fallback) .bind(status) .bind(exit_code) .bind(result) .bind(error_msg) .bind(duration_ms) .execute(db) .await?; Ok(()) } // ============================================================================= // Tester // ============================================================================= #[cfg(test)] mod tests { use super::*; use serde_json::json; #[test] fn test_execution_context_from_payload() { let payload = json!({ "orchestration_id": "abc", "trigger_event": "node.created", "trigger_context": { "node_id": "aaaa-bbbb", "node_kind": "content", "op": "INSERT", "cas_hash": "sha256:deadbeef" } }); let ctx = ExecutionContext::from_payload(&payload); assert_eq!(ctx.node_id.as_deref(), Some("aaaa-bbbb")); assert_eq!(ctx.node_kind.as_deref(), Some("content")); assert_eq!(ctx.op.as_deref(), Some("INSERT")); assert_eq!(ctx.cas_hash.as_deref(), Some("sha256:deadbeef")); assert!(ctx.edge_type.is_none()); assert!(ctx.communication_id.is_none()); } #[test] fn test_substitute_variables() { let ctx = ExecutionContext { node_id: Some("node-123".into()), node_kind: Some("content".into()), edge_type: None, source_id: None, target_id: None, op: Some("INSERT".into()), cas_hash: Some("sha256:abc".into()), communication_id: Some("comm-456".into()), collection_id: Some("coll-789".into()), }; assert_eq!(ctx.substitute("{event.node_id}"), "node-123"); assert_eq!(ctx.substitute("{event.cas_hash}"), "sha256:abc"); assert_eq!(ctx.substitute("{event.communication_id}"), "comm-456"); assert_eq!(ctx.substitute("{event.collection_id}"), "coll-789"); // Ukjent variabel returneres uendret assert_eq!(ctx.substitute("{event.unknown}"), "{event.unknown}"); // Ikke-variabel returneres uendret assert_eq!(ctx.substitute("--model"), "--model"); assert_eq!(ctx.substitute("large"), "large"); } #[test] fn test_substitute_args() { let ctx = ExecutionContext { node_id: None, node_kind: None, edge_type: None, source_id: None, target_id: None, op: None, cas_hash: Some("sha256:abc".into()), communication_id: None, collection_id: None, }; let args = vec![ "--cas-hash".to_string(), "{event.cas_hash}".to_string(), "--model".to_string(), "large".to_string(), ]; let resolved = ctx.substitute_args(&args); assert_eq!(resolved, vec!["--cas-hash", "sha256:abc", "--model", "large"]); } #[test] fn test_substitute_missing_variable_keeps_template() { let ctx = ExecutionContext { node_id: None, node_kind: None, edge_type: None, source_id: None, target_id: None, op: None, cas_hash: None, communication_id: None, collection_id: None, }; // Manglende variabel beholder {event.*}-syntaks assert_eq!(ctx.substitute("{event.cas_hash}"), "{event.cas_hash}"); } }