synops/maskinrommet/src/script_executor.rs
vegard 8af4265b6e synops-clip orkestrering-støtte: cli_tool-registrering + clip_url jobb/API (oppgave 25.4)
Gjør synops-clip tilgjengelig i orkestreringer ved å:

1. Registrere synops-clip som cli_tool-node (migration 026) med norske
   aliases (clip, klipp, hent artikkel) og args_hints for script-kompilatoren.
   Orkestreringer kan nå skrive "1. clip fra event (lagre node, bruker)"
   som kompileres til "synops-clip --url {event.url} --write --created-by ...".

2. Legge til clip_url som jobbtype i jobbkøen (clip.rs) — spawner
   synops-clip med riktige env-variabler (DATABASE_URL, AI_GATEWAY_URL, etc).

3. Legge til POST /intentions/clip_url API-endepunkt slik at frontend
   og andre klienter kan trigge URL-klipping direkte.

4. Utvide trigger-konteksten med event.url og event.created_by slik at
   orkestreringer som reagerer på URL-deling kan videresende URL til
   synops-clip via variabel-substitusjon.
2026-03-18 18:55:11 +00:00

526 lines
16 KiB
Rust

//! Script-executor — vaktmesteren parser kompilert script og
//! eksekverer steg sekvensielt via generisk dispatch.
//!
//! Substituerer {event.*}-variabler fra trigger-kontekst,
//! kjører hvert steg som CLI-prosess, håndterer VED_FEIL-fallback,
//! og logger hvert steg i `orchestration_log`.
//!
//! Ref: docs/concepts/orkestrering.md § 6
use serde_json::Value;
use sqlx::PgPool;
use std::process::Stdio;
use uuid::Uuid;
use crate::cli_dispatch;
use crate::script_compiler::CompiledStep;
/// Trigger-kontekst for variabel-substitusjon.
/// Bygges fra jobbens `payload.trigger_context`.
#[derive(Debug, Clone)]
pub struct ExecutionContext {
pub node_id: Option<String>,
pub node_kind: Option<String>,
pub edge_type: Option<String>,
pub source_id: Option<String>,
pub target_id: Option<String>,
pub op: Option<String>,
pub cas_hash: Option<String>,
pub communication_id: Option<String>,
pub collection_id: Option<String>,
/// URL fra trigger-kontekst (f.eks. URL delt i chat)
pub url: Option<String>,
/// Bruker-ID som utløste eventet
pub created_by: Option<String>,
/// ID til oppstrøms orkestrering (ved kaskade via triggers-edge)
pub upstream_orchestration_id: Option<String>,
}
impl ExecutionContext {
/// Bygg fra trigger_context-JSON i jobb-payload.
pub fn from_payload(payload: &Value) -> Self {
let ctx = payload.get("trigger_context").cloned().unwrap_or_default();
let s = |key: &str| ctx.get(key).and_then(|v| v.as_str()).map(|s| s.to_string());
ExecutionContext {
node_id: s("node_id"),
node_kind: s("node_kind"),
edge_type: s("edge_type"),
source_id: s("source_id"),
target_id: s("target_id"),
op: s("op"),
cas_hash: s("cas_hash"),
communication_id: s("communication_id"),
collection_id: s("collection_id"),
url: s("url"),
created_by: s("created_by"),
upstream_orchestration_id: s("upstream_orchestration_id"),
}
}
/// Substituerer {event.*}-variabler i en streng.
fn substitute(&self, arg: &str) -> String {
if !arg.starts_with('{') || !arg.ends_with('}') {
return arg.to_string();
}
let inner = &arg[1..arg.len() - 1];
match inner {
"event.node_id" => self.node_id.clone(),
"event.node_kind" => self.node_kind.clone(),
"event.edge_type" => self.edge_type.clone(),
"event.source_id" => self.source_id.clone(),
"event.target_id" => self.target_id.clone(),
"event.op" => self.op.clone(),
"event.cas_hash" => self.cas_hash.clone(),
"event.communication_id" => self.communication_id.clone(),
"event.collection_id" => self.collection_id.clone(),
"event.url" => self.url.clone(),
"event.created_by" => self.created_by.clone(),
"event.upstream_orchestration_id" => self.upstream_orchestration_id.clone(),
_ => None,
}
.unwrap_or_else(|| arg.to_string())
}
/// Substituerer alle args.
fn substitute_args(&self, args: &[String]) -> Vec<String> {
args.iter().map(|a| self.substitute(a)).collect()
}
}
/// Resultat av å kjøre hele pipelinen.
#[derive(Debug)]
pub struct PipelineResult {
pub steps_run: usize,
pub steps_ok: usize,
pub steps_failed: usize,
pub aborted: bool,
pub error: Option<String>,
}
/// Kjør en kompilert pipeline sekvensielt.
///
/// For hvert steg:
/// 1. Substituerer variabler
/// 2. Kjører CLI-verktøyet
/// 3. Ved feil: prøver steg-fallback, deretter global fallback
/// 4. Logger i orchestration_log
///
/// Stopper ved første feil som ikke håndteres av fallback.
pub async fn execute_pipeline(
db: &PgPool,
orchestration_id: Uuid,
job_id: Uuid,
steps: &[CompiledStep],
global_fallback: Option<&CompiledStep>,
ctx: &ExecutionContext,
) -> PipelineResult {
let mut result = PipelineResult {
steps_run: 0,
steps_ok: 0,
steps_failed: 0,
aborted: false,
error: None,
};
for step in steps {
result.steps_run += 1;
let step_result = execute_step(db, orchestration_id, job_id, step, ctx).await;
match step_result {
Ok(_) => {
result.steps_ok += 1;
}
Err(err) => {
tracing::warn!(
orchestration_id = %orchestration_id,
step = step.step_number,
binary = %step.binary,
error = %err,
"Steg feilet, prøver fallback"
);
// Prøv steg-fallback (VED_FEIL per steg)
let mut recovered = false;
if let Some(ref fallback) = step.fallback {
tracing::info!(
orchestration_id = %orchestration_id,
step = step.step_number,
fallback_binary = %fallback.binary,
"Kjører steg-fallback"
);
match execute_step(db, orchestration_id, job_id, fallback, ctx).await {
Ok(_) => {
recovered = true;
result.steps_ok += 1;
}
Err(fb_err) => {
tracing::warn!(
orchestration_id = %orchestration_id,
step = step.step_number,
error = %fb_err,
"Steg-fallback feilet også"
);
}
}
}
if !recovered {
// Prøv global fallback
if let Some(gf) = global_fallback {
tracing::info!(
orchestration_id = %orchestration_id,
"Kjører global fallback"
);
let gf_result = execute_step(db, orchestration_id, job_id, gf, ctx).await;
if let Err(gf_err) = &gf_result {
tracing::error!(
orchestration_id = %orchestration_id,
error = %gf_err,
"Global fallback feilet"
);
}
// Uansett om global fallback lyktes: stopp pipelinen
}
result.steps_failed += 1;
result.aborted = true;
result.error = Some(format!(
"Steg {} ({}) feilet: {}",
step.step_number, step.binary, err
));
break;
}
}
}
}
result
}
/// Kjør ett steg: spawn CLI-prosess, vent på resultat, logg i orchestration_log.
async fn execute_step(
db: &PgPool,
orchestration_id: Uuid,
job_id: Uuid,
step: &CompiledStep,
ctx: &ExecutionContext,
) -> Result<Value, String> {
let resolved_args = ctx.substitute_args(&step.args);
let is_fallback = step.step_number == 0;
tracing::info!(
orchestration_id = %orchestration_id,
step = step.step_number,
binary = %step.binary,
args = ?resolved_args,
is_fallback = is_fallback,
"Kjører steg"
);
let start = std::time::Instant::now();
// Spesialtilfelle: work_item oppretter en oppgave-node i PG
let result = if step.binary == "work_item" {
execute_work_item(db, orchestration_id, &resolved_args).await
} else {
execute_cli_tool(&step.binary, &resolved_args).await
};
let duration_ms = start.elapsed().as_millis() as i32;
// Logg i orchestration_log
let (status, exit_code, result_json, error_msg) = match &result {
Ok(val) => ("ok", Some(0i16), Some(val.clone()), None),
Err(err) => ("error", None, None, Some(err.clone())),
};
if let Err(e) = log_step(
db,
orchestration_id,
job_id,
step.step_number as i16,
&step.binary,
&resolved_args,
is_fallback,
status,
exit_code,
result_json.as_ref(),
error_msg.as_deref(),
duration_ms,
)
.await
{
tracing::error!(
orchestration_id = %orchestration_id,
error = %e,
"Kunne ikke logge steg i orchestration_log"
);
}
result
}
/// Kjør et CLI-verktøy som subprosess.
async fn execute_cli_tool(binary: &str, args: &[String]) -> Result<Value, String> {
let mut cmd = tokio::process::Command::new(binary);
for arg in args {
cmd.arg(arg);
}
// Sett DATABASE_URL for verktøy som trenger det
cli_dispatch::set_database_url(&mut cmd)?;
cli_dispatch::forward_env(&mut cmd, "CAS_ROOT");
cli_dispatch::forward_env(&mut cmd, "WHISPER_URL");
cli_dispatch::forward_env(&mut cmd, "LITELLM_URL");
cli_dispatch::forward_env(&mut cmd, "LITELLM_API_KEY");
cmd.stdout(Stdio::piped()).stderr(Stdio::piped());
let child = cmd
.spawn()
.map_err(|e| format!("Kunne ikke starte {binary}: {e}"))?;
let output = child
.wait_with_output()
.await
.map_err(|e| format!("Feil ved kjøring av {binary}: {e}"))?;
let stderr = String::from_utf8_lossy(&output.stderr);
if !stderr.is_empty() {
tracing::info!(stderr = %stderr, "{binary} stderr");
}
if !output.status.success() {
let code = output.status.code().unwrap_or(-1);
return Err(format!("{binary} feilet (exit {code}): {stderr}"));
}
let stdout = String::from_utf8_lossy(&output.stdout);
if stdout.trim().is_empty() {
// Noen verktøy returnerer ingenting — det er OK
return Ok(serde_json::json!({"status": "ok"}));
}
serde_json::from_str(&stdout)
.map_err(|e| format!("Kunne ikke parse {binary} output: {e}"))
}
/// Spesialhåndtering av `work_item`: opprett en oppgave-node i grafen.
/// Args: ["<tittel>", "--tag", "bug", "--tag", "feature", ...]
async fn execute_work_item(
db: &PgPool,
orchestration_id: Uuid,
args: &[String],
) -> Result<Value, String> {
// Ekstraher tittel (første arg, strip anførselstegn)
let title = args
.first()
.map(|s| s.trim_matches('"').to_string())
.ok_or("work_item mangler tittel")?;
// Ekstraher tags
let mut tags = Vec::new();
let mut i = 1;
while i < args.len() {
if args[i] == "--tag" && i + 1 < args.len() {
tags.push(args[i + 1].clone());
i += 2;
} else {
i += 1;
}
}
let metadata = serde_json::json!({
"source": "orchestration",
"orchestration_id": orchestration_id.to_string(),
"tags": tags,
});
// Opprett work_item-node
let node_id = sqlx::query_scalar::<_, Uuid>(
r#"
INSERT INTO nodes (node_kind, title, metadata, visibility)
VALUES ('work_item', $1, $2, 'private')
RETURNING id
"#,
)
.bind(&title)
.bind(&metadata)
.fetch_one(db)
.await
.map_err(|e| format!("Feil ved oppretting av work_item: {e}"))?;
// Opprett source_material-edge tilbake til orkestreringen
let _ = sqlx::query(
r#"
INSERT INTO edges (source_id, target_id, edge_type)
VALUES ($1, $2, 'source_material')
"#,
)
.bind(node_id)
.bind(orchestration_id)
.execute(db)
.await;
tracing::info!(
node_id = %node_id,
title = %title,
tags = ?tags,
"work_item opprettet"
);
Ok(serde_json::json!({
"status": "ok",
"node_id": node_id.to_string(),
"title": title,
}))
}
/// Skriv en rad i orchestration_log.
async fn log_step(
db: &PgPool,
orchestration_id: Uuid,
job_id: Uuid,
step_number: i16,
binary: &str,
args: &[String],
is_fallback: bool,
status: &str,
exit_code: Option<i16>,
result: Option<&Value>,
error_msg: Option<&str>,
duration_ms: i32,
) -> Result<(), sqlx::Error> {
let args_json = serde_json::to_value(args).unwrap_or_default();
sqlx::query(
r#"
INSERT INTO orchestration_log
(orchestration_id, job_id, step_number, tool_binary, args, is_fallback,
status, exit_code, result, error_msg, duration_ms)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
"#,
)
.bind(orchestration_id)
.bind(job_id)
.bind(step_number)
.bind(binary)
.bind(&args_json)
.bind(is_fallback)
.bind(status)
.bind(exit_code)
.bind(result)
.bind(error_msg)
.bind(duration_ms)
.execute(db)
.await?;
Ok(())
}
// =============================================================================
// Tester
// =============================================================================
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn test_execution_context_from_payload() {
let payload = json!({
"orchestration_id": "abc",
"trigger_event": "node.created",
"trigger_context": {
"node_id": "aaaa-bbbb",
"node_kind": "content",
"op": "INSERT",
"cas_hash": "sha256:deadbeef"
}
});
let ctx = ExecutionContext::from_payload(&payload);
assert_eq!(ctx.node_id.as_deref(), Some("aaaa-bbbb"));
assert_eq!(ctx.node_kind.as_deref(), Some("content"));
assert_eq!(ctx.op.as_deref(), Some("INSERT"));
assert_eq!(ctx.cas_hash.as_deref(), Some("sha256:deadbeef"));
assert!(ctx.edge_type.is_none());
assert!(ctx.communication_id.is_none());
}
#[test]
fn test_substitute_variables() {
let ctx = ExecutionContext {
node_id: Some("node-123".into()),
node_kind: Some("content".into()),
edge_type: None,
source_id: None,
target_id: None,
op: Some("INSERT".into()),
cas_hash: Some("sha256:abc".into()),
communication_id: Some("comm-456".into()),
collection_id: Some("coll-789".into()),
url: Some("https://example.com/article".into()),
created_by: Some("user-999".into()),
upstream_orchestration_id: None,
};
assert_eq!(ctx.substitute("{event.node_id}"), "node-123");
assert_eq!(ctx.substitute("{event.cas_hash}"), "sha256:abc");
assert_eq!(ctx.substitute("{event.communication_id}"), "comm-456");
assert_eq!(ctx.substitute("{event.collection_id}"), "coll-789");
assert_eq!(ctx.substitute("{event.url}"), "https://example.com/article");
assert_eq!(ctx.substitute("{event.created_by}"), "user-999");
// Ukjent variabel returneres uendret
assert_eq!(ctx.substitute("{event.unknown}"), "{event.unknown}");
// Ikke-variabel returneres uendret
assert_eq!(ctx.substitute("--model"), "--model");
assert_eq!(ctx.substitute("large"), "large");
}
#[test]
fn test_substitute_args() {
let ctx = ExecutionContext {
node_id: None,
node_kind: None,
edge_type: None,
source_id: None,
target_id: None,
op: None,
cas_hash: Some("sha256:abc".into()),
communication_id: None,
collection_id: None,
url: None,
created_by: None,
upstream_orchestration_id: None,
};
let args = vec![
"--cas-hash".to_string(),
"{event.cas_hash}".to_string(),
"--model".to_string(),
"large".to_string(),
];
let resolved = ctx.substitute_args(&args);
assert_eq!(resolved, vec!["--cas-hash", "sha256:abc", "--model", "large"]);
}
#[test]
fn test_substitute_missing_variable_keeps_template() {
let ctx = ExecutionContext {
node_id: None,
node_kind: None,
edge_type: None,
source_id: None,
target_id: None,
op: None,
cas_hash: None,
communication_id: None,
collection_id: None,
url: None,
created_by: None,
upstream_orchestration_id: None,
};
// Manglende variabel beholder {event.*}-syntaks
assert_eq!(ctx.substitute("{event.cas_hash}"), "{event.cas_hash}");
}
}