diff --git a/docs/infra/jobbkø.md b/docs/infra/jobbkø.md index ddcec9e..6da3bd9 100644 --- a/docs/infra/jobbkø.md +++ b/docs/infra/jobbkø.md @@ -40,17 +40,37 @@ CREATE INDEX idx_job_queue_pending ON job_queue (priority DESC, scheduled_for AS ## 4. Worker-arkitektur (Rust) ### 4.1 Designprinsipp: Orkestrator, ikke prosesseringsmotor -Workeren gjør lite tung prosessering selv. Den er en **orkestrator** som koordinerer eksterne tjenester: +Workeren gjør lite tung prosessering selv. Den er en **orkestrator** som spawner +CLI-verktøy (`Command::new("synops-X")`) som gjør selve jobben. -| Jobbtype | Hva workeren gjør | Tung logikk i workeren? | +**Kontrakt mellom maskinrommet og CLI-verktøy:** +- **Stdout** → jobbresultat (JSON). Parses og lagres i `job_queue.result`. +- **Stderr** → feillogg. Logges av maskinrommet. +- **Exitkode** → status. 0 = suksess, != 0 = feil. +- **`--write` flag** → CLI-verktøyet skriver til PG selv. Uten flagget: kun stdout. + +**Hva maskinrommet beholder inline:** +- Payload-parsing og validering +- Sikkerhetskontroller (kill switch, rate limiting, loop-prevensjon for agent) +- Voice/model-oppslag fra node metadata (orchestrator-logikk) +- STDB-synk etter CLI fullført (sanntidsvisning) +- Jobbstatus-håndtering (complete/fail/retry) + +| Jobbtype | CLI-verktøy | Maskinrommet beholder | |---|---|---| -| `whisper_transcribe` | HTTP-kall til faster-whisper-server (SRT), parse → `transcription_segments` | Lett SRT-parsing | -| `openrouter_analyze` | HTTP-kall til AI Gateway | Nei — venter på svar | -| `stats_parse` | Parser Caddy-loggfiler, skriver til PG | Lett I/O | -| `research_clip` | HTTP-kall til AI Gateway | Nei — venter på svar | -| `generate_embeddings` | HTTP-kall til AI Gateway | Nei — venter på svar | +| `whisper_transcribe` | `synops-transcribe` | — | +| `agent_respond` | `synops-respond` | Kill switch, rate limit, loop-prevensjon, STDB-synk | +| `suggest_edges` | `synops-suggest-edges` | — | +| `summarize_communication` | `synops-summarize` | — | +| `tts_generate` | `synops-tts` | Voice-oppslag fra node metadata, STDB-synk | +| `audio_process` | `synops-audio` | STDB-synk | +| `render_article` | `synops-render` | — | +| `render_index` | `synops-render` | — | +| `ai_process` | *(inline — mangler CLI)* | Alt (planlagt migrasjon) | -Ny jobbtype = ny handler-funksjon (bygg request, håndter respons, feilhåndtering). Tynt glue-code. Rekompilering er triviell og inkrementell. +**Felles dispatcher-kode:** `cli_dispatch.rs` — `run_cli_tool()` og env-helpers. + +Ny jobbtype = nytt CLI-verktøy + tynn dispatcher-handler. Ref: `docs/retninger/unix_filosofi.md`. ### 4.2 Én worker, prioritetsstyrt Én enkelt worker-prosess håndterer **alle jobbtyper**. Prioritering skjer via `priority`-kolonnen i køen — SQL-spørringen plukker alltid viktigste jobb først. Ingen behov for separate prosesser per jobbtype. diff --git a/maskinrommet/src/ai_edges.rs b/maskinrommet/src/ai_edges.rs index 44ea49f..f0a038a 100644 --- a/maskinrommet/src/ai_edges.rs +++ b/maskinrommet/src/ai_edges.rs @@ -1,136 +1,38 @@ -// AI-foreslåtte edges — sender innhold til LLM, foreslår mentions og topics. +// AI edge-forslag dispatcher — delegerer til synops-suggest-edges CLI. +// +// Maskinrommet orkestrerer, CLI-verktøyet gjør jobben (LLM-kall, +// topic-opprettelse, edge-skriving, ressurslogging). +// +// STDB-synk for topic-noder og mentions-edges hoppes over her — +// topics er bakgrunnsdata i kunnskapsgrafen og trenger ikke +// sanntidsvisning. De synkes ved neste STDB-refresh. // // Jobbtype: "suggest_edges" // Payload: { "node_id": "" } // -// Flyten: -// 1. Hent nodens innhold fra PG -// 2. Hent eksisterende topic-noder for gjenbruk -// 3. Send til LiteLLM (OpenAI-kompatibelt API via AI Gateway) -// 4. Parse LLM-svar (JSON med topics og mentions) -// 5. Opprett nye topic-noder for ukjente topics -// 6. Opprett mentions-edges fra innholdsnode til topic/entity-noder -// -// Ref: docs/infra/ai_gateway.md, docs/concepts/kunnskapsgrafen.md +// Ref: docs/retninger/unix_filosofi.md, docs/infra/ai_gateway.md, +// docs/concepts/kunnskapsgrafen.md -use serde::{Deserialize, Serialize}; -use sqlx::PgPool; use uuid::Uuid; +use crate::cli_dispatch; use crate::jobs::JobRow; -use crate::resource_usage; use crate::stdb::StdbClient; -/// Eksisterende topic-node fra PG. -#[derive(sqlx::FromRow)] -struct TopicRow { - id: Uuid, - title: String, +/// Synops-suggest-edges binary path. +fn suggest_edges_bin() -> String { + std::env::var("SYNOPS_SUGGEST_EDGES_BIN") + .unwrap_or_else(|_| "synops-suggest-edges".to_string()) } -/// Kilde-node fra PG. -#[derive(sqlx::FromRow)] -struct SourceNode { - title: Option, - content: Option, - created_by: Option, -} - -/// LLM-respons: foreslåtte topics og mentions. -#[derive(Deserialize, Debug)] -struct AiSuggestion { - /// Emnene innholdet handler om (nye eller eksisterende). - #[serde(default)] - topics: Vec, - /// Entiteter nevnt i innholdet (personer, organisasjoner, steder). - #[serde(default)] - mentions: Vec, -} - -#[derive(Deserialize, Debug)] -struct MentionSuggestion { - /// Navn på entiteten. - name: String, - /// Type: person, organisasjon, sted, konsept. - #[serde(default = "default_entity_type")] - entity_type: String, -} - -fn default_entity_type() -> String { - "person".to_string() -} - -/// OpenAI-kompatibel chat completion request. -#[derive(Serialize)] -struct ChatRequest { - model: String, - messages: Vec, - temperature: f32, - response_format: ResponseFormat, -} - -#[derive(Serialize)] -struct ResponseFormat { - r#type: String, -} - -#[derive(Serialize)] -struct ChatMessage { - role: String, - content: String, -} - -/// OpenAI-kompatibel chat completion response. -#[derive(Deserialize)] -struct ChatResponse { - choices: Vec, - #[serde(default)] - usage: Option, - #[serde(default)] - model: Option, -} - -#[derive(Deserialize, Clone)] -struct UsageInfo { - #[serde(default)] - prompt_tokens: i64, - #[serde(default)] - completion_tokens: i64, -} - -#[derive(Deserialize)] -struct Choice { - message: MessageContent, -} - -#[derive(Deserialize)] -struct MessageContent { - content: Option, -} - -const SYSTEM_PROMPT: &str = r#"Du er en innholdsanalysator for en norsk redaksjonsplattform. Analyser teksten og ekstraher: - -1. **topics**: Emner/temaer teksten handler om. Bruk korte, presise norske termer (f.eks. "skolepolitikk", "klimaendringer", "statsbudsjettet"). Maks 5 topics. - -2. **mentions**: Navngitte entiteter (personer, organisasjoner, steder) som er eksplisitt nevnt. Inkluder entity_type ("person", "organisasjon", "sted", "konsept"). - -Returner KUN et JSON-objekt med denne strukturen: -{ - "topics": ["emne1", "emne2"], - "mentions": [{"name": "Navn", "entity_type": "person"}] -} - -Regler: -- Returner tom liste hvis teksten ikke har meningsfullt innhold (hilsener, korte svar, etc.) -- Bruk eksisterende topics fra listen nedenfor der det passer, i stedet for å lage nye varianter -- Ikke inkluder generiske termer som "samtale" eller "diskusjon" -- Navngi entiteter med full, autoritativ form (f.eks. "Jonas Gahr Støre", ikke "Støre")"#; - /// Håndterer suggest_edges-jobb. +/// +/// Spawner synops-suggest-edges med --write for å gjøre alt arbeidet: +/// LLM-kall, topic-opprettelse, edge-skriving, ressurslogging. pub async fn handle_suggest_edges( job: &JobRow, - db: &PgPool, - stdb: &StdbClient, + _db: &sqlx::PgPool, + _stdb: &StdbClient, ) -> Result { let node_id: Uuid = job .payload @@ -139,387 +41,43 @@ pub async fn handle_suggest_edges( .and_then(|s| s.parse().ok()) .ok_or("Mangler gyldig node_id i payload")?; - tracing::info!(node_id = %node_id, "Starter AI edge-forslag"); + // requested_by er valgfri — bruk node_id som fallback for --write + let requested_by = job + .payload + .get("requested_by") + .and_then(|v| v.as_str()) + .unwrap_or(&node_id.to_string()) + .to_string(); - // 1. Hent kildenode - let source = sqlx::query_as::<_, SourceNode>( - "SELECT title, content, created_by FROM nodes WHERE id = $1", - ) - .bind(node_id) - .fetch_optional(db) - .await - .map_err(|e| format!("PG-feil ved henting av node: {e}"))? - .ok_or_else(|| format!("Node {node_id} finnes ikke"))?; + // Bygg kommando + let bin = suggest_edges_bin(); + let mut cmd = tokio::process::Command::new(&bin); - let title = source.title.unwrap_or_default(); - let content = source.content.unwrap_or_default(); + cmd.arg("--node-id").arg(node_id.to_string()) + .arg("--requested-by").arg(&requested_by) + .arg("--write"); - // Ikke analyser tomme noder eller veldig korte meldinger - let text = format!("{title}\n{content}").trim().to_string(); - if text.len() < 20 { - tracing::info!(node_id = %node_id, len = text.len(), "For kort innhold, hopper over AI-analyse"); - return Ok(serde_json::json!({ - "status": "skipped", - "reason": "content_too_short" - })); - } - - // 2. Hent eksisterende topic-noder for kontekst - let existing_topics = sqlx::query_as::<_, TopicRow>( - "SELECT id, title FROM nodes WHERE node_kind = 'topic' ORDER BY created_at DESC LIMIT 100", - ) - .fetch_all(db) - .await - .map_err(|e| format!("PG-feil ved henting av topics: {e}"))?; - - let topic_list: Vec<&str> = existing_topics.iter().map(|t| t.title.as_str()).collect(); - - // 3. Bygg prompt og kall LiteLLM - let user_content = if topic_list.is_empty() { - format!("Analyser følgende tekst:\n\n{text}") - } else { - format!( - "Eksisterende topics: {}\n\nAnalyser følgende tekst:\n\n{text}", - topic_list.join(", ") - ) - }; - - let (suggestion, llm_usage, llm_model) = call_llm(&user_content).await?; + // Sett miljøvariabler CLI-verktøyet trenger + cli_dispatch::set_database_url(&mut cmd)?; + cli_dispatch::forward_env(&mut cmd, "AI_GATEWAY_URL"); + cli_dispatch::forward_env(&mut cmd, "LITELLM_MASTER_KEY"); + cli_dispatch::forward_env(&mut cmd, "AI_EDGES_MODEL"); tracing::info!( node_id = %node_id, - topics = ?suggestion.topics, - mentions = suggestion.mentions.len(), - "LLM-forslag mottatt" + bin = %bin, + "Starter synops-suggest-edges" ); - // 4. Opprett topic-noder og mentions-edges - let created_by = source.created_by.unwrap_or(node_id); - let mut created_topics = 0u32; - let mut created_edges = 0u32; - - // Prosesser topics - for topic_name in &suggestion.topics { - let topic_name = topic_name.trim(); - if topic_name.is_empty() { - continue; - } - - // Finn eksisterende topic med case-insensitivt match - let existing = existing_topics - .iter() - .find(|t| t.title.to_lowercase() == topic_name.to_lowercase()); - - let topic_id = if let Some(t) = existing { - t.id - } else { - // Opprett ny topic-node - let new_id = Uuid::now_v7(); - create_topic_node(db, stdb, new_id, topic_name, created_by).await?; - created_topics += 1; - new_id - }; - - // Opprett mentions-edge: innholdsnode → topic - if create_mentions_edge(db, stdb, node_id, topic_id, created_by).await? { - created_edges += 1; - } - } - - // Prosesser mentions (entiteter) - for mention in &suggestion.mentions { - let name = mention.name.trim(); - if name.is_empty() { - continue; - } - - // Søk etter eksisterende entitet med samme tittel - let existing_entity = sqlx::query_scalar::<_, Uuid>( - "SELECT id FROM nodes WHERE node_kind = 'topic' AND LOWER(title) = LOWER($1) LIMIT 1", - ) - .bind(name) - .fetch_optional(db) - .await - .map_err(|e| format!("PG-feil ved entitet-søk: {e}"))?; - - let entity_id = if let Some(id) = existing_entity { - id - } else { - // Opprett ny entitet som topic-node med entity_type i metadata - let new_id = Uuid::now_v7(); - create_entity_node(db, stdb, new_id, name, &mention.entity_type, created_by).await?; - created_topics += 1; - new_id - }; - - // Opprett mentions-edge: innholdsnode → entitet - if create_mentions_edge(db, stdb, node_id, entity_id, created_by).await? { - created_edges += 1; - } - } - - let result = serde_json::json!({ - "status": "completed", - "topics_created": created_topics, - "edges_created": created_edges, - "suggestions": { - "topics": suggestion.topics, - "mentions": suggestion.mentions.iter().map(|m| &m.name).collect::>() - } - }); - - // Logg AI-ressursforbruk - let collection_id = resource_usage::find_collection_for_node(db, node_id).await; - let (tokens_in, tokens_out) = llm_usage - .map(|u| (u.prompt_tokens, u.completion_tokens)) - .unwrap_or((0, 0)); - - if let Err(e) = resource_usage::log( - db, - node_id, - source.created_by, - collection_id, - "ai", - serde_json::json!({ - "model_level": "fast", - "model_id": llm_model.unwrap_or_else(|| "unknown".to_string()), - "tokens_in": tokens_in, - "tokens_out": tokens_out, - "job_type": "suggest_edges" - }), - ) - .await - { - tracing::warn!(error = %e, "Kunne ikke logge AI-ressursforbruk for edge-forslag"); - } + let result = cli_dispatch::run_cli_tool(&bin, &mut cmd).await?; tracing::info!( node_id = %node_id, - topics_created = created_topics, - edges_created = created_edges, - "AI edge-forslag fullført" + topics_created = result["topics_created"].as_u64().unwrap_or(0), + edges_created = result["edges_created"].as_u64().unwrap_or(0), + status = result["status"].as_str().unwrap_or("unknown"), + "synops-suggest-edges fullført" ); Ok(result) } - -/// Kall LiteLLM (OpenAI-kompatibelt API) for å analysere innhold. -/// Returnerer (forslag, usage, model). -async fn call_llm(user_content: &str) -> Result<(AiSuggestion, Option, Option), String> { - let gateway_url = std::env::var("AI_GATEWAY_URL") - .unwrap_or_else(|_| "http://localhost:4000".to_string()); - let api_key = std::env::var("LITELLM_MASTER_KEY") - .unwrap_or_default(); - - // Bruk sidelinja/rutine (billig modell) for edge-forslag - let model = std::env::var("AI_EDGES_MODEL") - .unwrap_or_else(|_| "sidelinja/rutine".to_string()); - - let request = ChatRequest { - model, - messages: vec![ - ChatMessage { - role: "system".to_string(), - content: SYSTEM_PROMPT.to_string(), - }, - ChatMessage { - role: "user".to_string(), - content: user_content.to_string(), - }, - ], - temperature: 0.2, - response_format: ResponseFormat { - r#type: "json_object".to_string(), - }, - }; - - let client = reqwest::Client::new(); - let url = format!("{gateway_url}/v1/chat/completions"); - - let resp = client - .post(&url) - .header("Authorization", format!("Bearer {api_key}")) - .header("Content-Type", "application/json") - .json(&request) - .timeout(std::time::Duration::from_secs(30)) - .send() - .await - .map_err(|e| format!("LiteLLM-kall feilet: {e}"))?; - - if !resp.status().is_success() { - let status = resp.status(); - let body = resp.text().await.unwrap_or_default(); - return Err(format!("LiteLLM returnerte {status}: {body}")); - } - - let chat_resp: ChatResponse = resp - .json() - .await - .map_err(|e| format!("Kunne ikke parse LiteLLM-respons: {e}"))?; - - let content = chat_resp - .choices - .first() - .and_then(|c| c.message.content.as_deref()) - .ok_or("LiteLLM returnerte ingen content")?; - - // Parse JSON fra LLM-output - let suggestion: AiSuggestion = serde_json::from_str(content) - .map_err(|e| format!("Kunne ikke parse LLM JSON: {e}. Rå output: {content}"))?; - - Ok((suggestion, chat_resp.usage, chat_resp.model)) -} - -/// Opprett en topic-node i PG og STDB. -async fn create_topic_node( - db: &PgPool, - stdb: &StdbClient, - id: Uuid, - title: &str, - created_by: Uuid, -) -> Result<(), String> { - let metadata = serde_json::json!({"ai_generated": true}); - - // STDB først - stdb.create_node( - &id.to_string(), - "topic", - title, - "", - "discoverable", - &metadata.to_string(), - &created_by.to_string(), - ) - .await - .map_err(|e| format!("STDB create_node (topic) feilet: {e}"))?; - - // PG - sqlx::query( - r#" - INSERT INTO nodes (id, node_kind, title, content, visibility, metadata, created_by) - VALUES ($1, 'topic', $2, '', 'discoverable', $3, $4) - ON CONFLICT (id) DO NOTHING - "#, - ) - .bind(id) - .bind(title) - .bind(&metadata) - .bind(created_by) - .execute(db) - .await - .map_err(|e| format!("PG insert topic feilet: {e}"))?; - - tracing::info!(topic_id = %id, title = %title, "Ny topic-node opprettet (AI)"); - Ok(()) -} - -/// Opprett en entitet-node (person, org, sted) i PG og STDB. -async fn create_entity_node( - db: &PgPool, - stdb: &StdbClient, - id: Uuid, - name: &str, - entity_type: &str, - created_by: Uuid, -) -> Result<(), String> { - let metadata = serde_json::json!({ - "ai_generated": true, - "entity_type": entity_type - }); - - stdb.create_node( - &id.to_string(), - "topic", - name, - "", - "discoverable", - &metadata.to_string(), - &created_by.to_string(), - ) - .await - .map_err(|e| format!("STDB create_node (entity) feilet: {e}"))?; - - sqlx::query( - r#" - INSERT INTO nodes (id, node_kind, title, content, visibility, metadata, created_by) - VALUES ($1, 'topic', $2, '', 'discoverable', $3, $4) - ON CONFLICT (id) DO NOTHING - "#, - ) - .bind(id) - .bind(name) - .bind(&metadata) - .bind(created_by) - .execute(db) - .await - .map_err(|e| format!("PG insert entity feilet: {e}"))?; - - tracing::info!(entity_id = %id, name = %name, entity_type = %entity_type, "Ny entitet-node opprettet (AI)"); - Ok(()) -} - -/// Opprett en mentions-edge fra innholdsnode til target. -/// Returnerer true hvis ny edge ble opprettet, false hvis den allerede eksisterte. -async fn create_mentions_edge( - db: &PgPool, - stdb: &StdbClient, - source_id: Uuid, - target_id: Uuid, - created_by: Uuid, -) -> Result { - // Sjekk om edge allerede finnes - let exists = sqlx::query_scalar::<_, bool>( - "SELECT EXISTS(SELECT 1 FROM edges WHERE source_id = $1 AND target_id = $2 AND edge_type = 'mentions')", - ) - .bind(source_id) - .bind(target_id) - .fetch_one(db) - .await - .map_err(|e| format!("PG-feil ved edge-sjekk: {e}"))?; - - if exists { - return Ok(false); - } - - let edge_id = Uuid::now_v7(); - let metadata = serde_json::json!({"origin": "ai"}); - - // STDB først - stdb.create_edge( - &edge_id.to_string(), - &source_id.to_string(), - &target_id.to_string(), - "mentions", - &metadata.to_string(), - false, - &created_by.to_string(), - ) - .await - .map_err(|e| format!("STDB create_edge (mentions) feilet: {e}"))?; - - // PG - sqlx::query( - r#" - INSERT INTO edges (id, source_id, target_id, edge_type, metadata, system, created_by) - VALUES ($1, $2, $3, 'mentions', $4, false, $5) - ON CONFLICT (source_id, target_id, edge_type) DO NOTHING - "#, - ) - .bind(edge_id) - .bind(source_id) - .bind(target_id) - .bind(&metadata) - .bind(created_by) - .execute(db) - .await - .map_err(|e| format!("PG insert mentions-edge feilet: {e}"))?; - - tracing::info!( - edge_id = %edge_id, - source = %source_id, - target = %target_id, - "Mentions-edge opprettet (AI)" - ); - - Ok(true) -} diff --git a/maskinrommet/src/audio.rs b/maskinrommet/src/audio.rs index 06a939a..05d6657 100644 --- a/maskinrommet/src/audio.rs +++ b/maskinrommet/src/audio.rs @@ -757,10 +757,23 @@ async fn resolve_silence_cuts( Ok(cuts) } -// ─── Jobbhåndterer ─────────────────────────────────────────────── +// ─── Jobbhåndterer — delegerer til synops-audio CLI ──────────────── +// +// Maskinrommet beholder: STDB-synk for sanntidsvisning. +// CLI gjør: FFmpeg-prosessering, CAS-lagring, PG-skriving, ressurslogging. + +/// Synops-audio binary path. +fn audio_bin() -> String { + std::env::var("SYNOPS_AUDIO_BIN") + .unwrap_or_else(|_| "synops-audio".to_string()) +} /// Håndterer `audio_process`-jobber fra jobbkøen. /// +/// Spawner synops-audio med --write for å gjøre alt arbeidet: +/// FFmpeg-prosessering, CAS-lagring, PG-skriving, ressurslogging. +/// Maskinrommet gjør etterpå STDB-synk for sanntidsvisning. +/// /// Payload: /// ```json /// { @@ -772,7 +785,7 @@ async fn resolve_silence_cuts( /// ``` pub async fn handle_audio_process_job( job: &JobRow, - db: &PgPool, + _db: &PgPool, stdb: &StdbClient, cas: &CasStore, ) -> Result { @@ -781,8 +794,14 @@ pub async fn handle_audio_process_job( .and_then(|s| s.parse().ok()) .ok_or("Mangler media_node_id i payload")?; - let edl: EdlDocument = serde_json::from_value(job.payload["edl"].clone()) - .map_err(|e| format!("Ugyldig EDL i payload: {e}"))?; + let edl_value = &job.payload["edl"]; + let edl_json = serde_json::to_string(edl_value) + .map_err(|e| format!("Kunne ikke serialisere EDL: {e}"))?; + + // Verifiser at source_hash finnes i EDL + let cas_hash = edl_value["source_hash"] + .as_str() + .ok_or("Mangler source_hash i EDL")?; let output_format = job.payload["output_format"] .as_str() @@ -793,111 +812,75 @@ pub async fn handle_audio_process_job( .and_then(|s| s.parse().ok()) .ok_or("Mangler requested_by i payload")?; - // Kjør prosessering - let (result_hash, result_size) = process_audio(cas, &edl, output_format).await?; + // Bygg kommando + let bin = audio_bin(); + let mut cmd = tokio::process::Command::new(&bin); - // Bestem MIME-type - let mime = match output_format { - "mp3" => "audio/mpeg", - "wav" => "audio/wav", - "flac" => "audio/flac", - "ogg" => "audio/ogg", - _ => "audio/mpeg", - }; + cmd.arg("--cas-hash").arg(cas_hash) + .arg("--edl").arg(&edl_json) + .arg("--output-format").arg(output_format) + .arg("--node-id").arg(media_node_id.to_string()) + .arg("--requested-by").arg(requested_by.to_string()) + .arg("--write"); - // Opprett ny medienode for den prosesserte filen - let processed_node_id = Uuid::now_v7(); - let metadata = serde_json::json!({ - "cas_hash": result_hash, - "mime": mime, - "size_bytes": result_size, - "source_hash": edl.source_hash, - "edl": edl, - }); + // Sett miljøvariabler + crate::cli_dispatch::set_database_url(&mut cmd)?; + cmd.env("CAS_ROOT", cas.root().to_string_lossy().to_string()); - // Hent tittel fra original node - let original_title: Option = sqlx::query_scalar( - "SELECT title FROM nodes WHERE id = $1" - ) - .bind(media_node_id) - .fetch_optional(db) - .await - .map_err(|e| format!("DB-feil: {e}"))? - .flatten(); + tracing::info!( + media_node_id = %media_node_id, + cas_hash = %cas_hash, + operations = edl_value["operations"].as_array().map(|a| a.len()).unwrap_or(0), + bin = %bin, + "Starter synops-audio" + ); - let title = original_title - .map(|t| format!("{t} (prosessert)")) - .unwrap_or_else(|| "Prosessert lyd".to_string()); + let result = crate::cli_dispatch::run_cli_tool(&bin, &mut cmd).await?; - // Insert processed media node - sqlx::query( - r#" - INSERT INTO nodes (id, node_kind, title, visibility, metadata, created_by) - VALUES ($1, 'media', $2, 'hidden', $3, $4) - "#, - ) - .bind(processed_node_id) - .bind(&title) - .bind(&metadata) - .bind(requested_by) - .execute(db) - .await - .map_err(|e| format!("Kunne ikke opprette prosessert node: {e}"))?; + // --- STDB-synk for sanntidsvisning --- + if let Some(processed_node_id) = result["processed_node_id"].as_str() { + let result_hash = result["cas_hash"].as_str().unwrap_or(""); + let result_size = result["size_bytes"].as_u64().unwrap_or(0); + let mime = match output_format { + "mp3" => "audio/mpeg", + "wav" => "audio/wav", + "flac" => "audio/flac", + "ogg" => "audio/ogg", + _ => "audio/mpeg", + }; - // Opprett derived_from edge: processed → original - let edge_id = Uuid::now_v7(); - sqlx::query( - r#" - INSERT INTO edges (id, source_id, target_id, edge_type, system, created_by) - VALUES ($1, $2, $3, 'derived_from', true, $4) - "#, - ) - .bind(edge_id) - .bind(processed_node_id) - .bind(media_node_id) - .bind(requested_by) - .execute(db) - .await - .map_err(|e| format!("Kunne ikke opprette derived_from edge: {e}"))?; + let metadata = serde_json::json!({ + "cas_hash": result_hash, + "mime": mime, + "size_bytes": result_size, + "source_hash": cas_hash, + }); - // Synk til SpacetimeDB - let metadata_str = serde_json::to_string(&metadata).unwrap_or_default(); - let _ = stdb - .create_node( - &processed_node_id.to_string(), - "media", - &title, - "", - "hidden", - &metadata_str, - &requested_by.to_string(), - ) - .await; + if let Err(e) = stdb.create_node( + processed_node_id, "media", "Prosessert lyd", "", "hidden", + &metadata.to_string(), &requested_by.to_string(), + ).await { + tracing::warn!(error = %e, "STDB create_node (audio) feilet (PG er allerede skrevet)"); + } - let _ = stdb - .create_edge( - &edge_id.to_string(), - &processed_node_id.to_string(), - &media_node_id.to_string(), - "derived_from", - "{}", - true, - &requested_by.to_string(), - ) - .await; + // derived_from edge: processed → original + let edge_id = Uuid::now_v7().to_string(); + if let Err(e) = stdb.create_edge( + &edge_id, processed_node_id, &media_node_id.to_string(), + "derived_from", "{}", true, &requested_by.to_string(), + ).await { + tracing::warn!(error = %e, "STDB create_edge (derived_from) feilet (PG er allerede skrevet)"); + } + } tracing::info!( original = %media_node_id, - processed = %processed_node_id, - hash = %result_hash, - "Audio process-jobb fullført" + processed = result["processed_node_id"].as_str().unwrap_or("n/a"), + hash = result["cas_hash"].as_str().unwrap_or("n/a"), + "synops-audio fullført" ); - Ok(serde_json::json!({ - "processed_node_id": processed_node_id.to_string(), - "cas_hash": result_hash, - "size_bytes": result_size, - })) + Ok(result) } // ─── Tester ────────────────────────────────────────────────────── diff --git a/maskinrommet/src/cli_dispatch.rs b/maskinrommet/src/cli_dispatch.rs new file mode 100644 index 0000000..f4ebe2a --- /dev/null +++ b/maskinrommet/src/cli_dispatch.rs @@ -0,0 +1,62 @@ +// Felles hjelpefunksjoner for å spawne CLI-verktøy fra jobbkø-handlere. +// +// Mønsteret: maskinrommet orkestrerer (payload-parsing, sikkerhetskontroller, +// STDB-synk), CLI-verktøyet gjør jobben (API-kall, DB-skriving, prosessering). +// Stdout → jobbresultat (JSON), stderr → feillogg, exitkode → status. +// +// Ref: docs/retninger/unix_filosofi.md + +use std::process::Stdio; + +/// Kjør et CLI-verktøy og returner stdout som JSON. +/// +/// - `bin_name`: Verktøynavn for logging (f.eks. "synops-tts") +/// - `cmd`: Ferdig konfigurert Command med args og env +/// +/// Stderr logges. Non-zero exit code gir Err med stderr-innhold. +pub async fn run_cli_tool( + bin_name: &str, + cmd: &mut tokio::process::Command, +) -> Result { + cmd.stdout(Stdio::piped()).stderr(Stdio::piped()); + + let child = cmd + .spawn() + .map_err(|e| format!("Kunne ikke starte {bin_name}: {e}"))?; + + let output = child + .wait_with_output() + .await + .map_err(|e| format!("Feil ved kjøring av {bin_name}: {e}"))?; + + let stderr = String::from_utf8_lossy(&output.stderr); + if !stderr.is_empty() { + tracing::info!(stderr = %stderr, "{bin_name} stderr"); + } + + if !output.status.success() { + let code = output.status.code().unwrap_or(-1); + return Err(format!( + "{bin_name} feilet (exit {code}): {stderr}" + )); + } + + let stdout = String::from_utf8_lossy(&output.stdout); + serde_json::from_str(&stdout) + .map_err(|e| format!("Kunne ikke parse {bin_name} output: {e}")) +} + +/// Videresend en miljøvariabel til en Command hvis den er satt. +pub fn forward_env(cmd: &mut tokio::process::Command, key: &str) { + if let Ok(v) = std::env::var(key) { + cmd.env(key, v); + } +} + +/// Sett DATABASE_URL på en Command. Feiler hvis variabelen mangler. +pub fn set_database_url(cmd: &mut tokio::process::Command) -> Result<(), String> { + let db_url = std::env::var("DATABASE_URL") + .map_err(|_| "DATABASE_URL ikke satt".to_string())?; + cmd.env("DATABASE_URL", &db_url); + Ok(()) +} diff --git a/maskinrommet/src/jobs.rs b/maskinrommet/src/jobs.rs index a174309..a021ecf 100644 --- a/maskinrommet/src/jobs.rs +++ b/maskinrommet/src/jobs.rs @@ -19,8 +19,8 @@ use crate::ai_edges; use crate::ai_process; use crate::audio; use crate::cas::CasStore; +use crate::cli_dispatch; use crate::maintenance::MaintenanceState; -use crate::publishing; use crate::resources::{self, PriorityRules}; use crate::stdb::StdbClient; use crate::summarize; @@ -159,6 +159,12 @@ async fn fail_job(db: &PgPool, job: &JobRow, error_msg: &str) -> Result<(), sqlx } /// Dispatcher — kjører riktig handler basert på job_type. +/// +/// Alle handlere delegerer til CLI-verktøy (Command::new("synops-X")) +/// i tråd med Unix-filosofien. Unntaket er ai_process som fortsatt +/// kjører inline (mangler CLI-verktøy, planlagt i fremtidig oppgave). +/// +/// Ref: docs/retninger/unix_filosofi.md async fn dispatch( job: &JobRow, db: &PgPool, @@ -180,32 +186,38 @@ async fn dispatch( summarize::handle_summarize_communication(job, db, stdb).await } "tts_generate" => { - tts::handle_tts_job(job, db, stdb, cas).await + tts::handle_tts_job(job, db, stdb).await } "audio_process" => { audio::handle_audio_process_job(job, db, stdb, cas).await } "ai_process" => { + // Fortsatt inline — ingen CLI-verktøy ennå. + // TODO: Lag synops-ai-process og migrer hit. ai_process::handle_ai_process(job, db, stdb).await } "render_article" => { - handle_render_article(job, db, cas).await + handle_render_article(job, cas).await } "render_index" => { - handle_render_index(job, db, cas).await + handle_render_index(job, cas).await } other => Err(format!("Ukjent jobbtype: {other}")), } } -/// Handler for `render_article`-jobb. +/// Synops-render binary path. +fn render_bin() -> String { + std::env::var("SYNOPS_RENDER_BIN") + .unwrap_or_else(|_| "synops-render".to_string()) +} + +/// Handler for `render_article`-jobb — delegerer til synops-render CLI. /// /// Payload: `{ "node_id": "...", "collection_id": "..." }` -/// Rendrer artikkelens metadata.document til HTML via Tera, lagrer i CAS, -/// oppdaterer nodens metadata.rendered. +/// CLI-verktøyet gjør Tera-rendering, CAS-lagring og metadata-oppdatering. async fn handle_render_article( job: &JobRow, - db: &PgPool, cas: &CasStore, ) -> Result { let node_id: Uuid = job @@ -222,17 +234,41 @@ async fn handle_render_article( .and_then(|s| s.parse().ok()) .ok_or("Mangler collection_id i payload")?; - publishing::render_article_to_cas(db, cas, node_id, collection_id).await + let bin = render_bin(); + let mut cmd = tokio::process::Command::new(&bin); + + cmd.arg("--node-id").arg(node_id.to_string()) + .arg("--collection-id").arg(collection_id.to_string()) + .arg("--render-type").arg("article") + .arg("--write"); + + cli_dispatch::set_database_url(&mut cmd)?; + cmd.env("CAS_ROOT", cas.root().to_string_lossy().to_string()); + + tracing::info!( + node_id = %node_id, + collection_id = %collection_id, + bin = %bin, + "Starter synops-render (article)" + ); + + let result = cli_dispatch::run_cli_tool(&bin, &mut cmd).await?; + + tracing::info!( + node_id = %node_id, + html_hash = result["html_hash"].as_str().unwrap_or("n/a"), + "synops-render (article) fullført" + ); + + Ok(result) } -/// Handler for `render_index`-jobb. +/// Handler for `render_index`-jobb — delegerer til synops-render CLI. /// /// Payload: `{ "collection_id": "..." }` -/// Rendrer forsiden til HTML via Tera, lagrer i CAS, -/// oppdaterer samlingens metadata.rendered_index. +/// CLI-verktøyet gjør Tera-rendering, CAS-lagring og metadata-oppdatering. async fn handle_render_index( job: &JobRow, - db: &PgPool, cas: &CasStore, ) -> Result { let collection_id: Uuid = job @@ -242,7 +278,31 @@ async fn handle_render_index( .and_then(|s| s.parse().ok()) .ok_or("Mangler collection_id i payload")?; - publishing::render_index_to_cas(db, cas, collection_id).await + let bin = render_bin(); + let mut cmd = tokio::process::Command::new(&bin); + + cmd.arg("--node-id").arg(collection_id.to_string()) + .arg("--render-type").arg("index") + .arg("--write"); + + cli_dispatch::set_database_url(&mut cmd)?; + cmd.env("CAS_ROOT", cas.root().to_string_lossy().to_string()); + + tracing::info!( + collection_id = %collection_id, + bin = %bin, + "Starter synops-render (index)" + ); + + let result = cli_dispatch::run_cli_tool(&bin, &mut cmd).await?; + + tracing::info!( + collection_id = %collection_id, + html_hash = result["html_hash"].as_str().unwrap_or("n/a"), + "synops-render (index) fullført" + ); + + Ok(result) } // ============================================================================= diff --git a/maskinrommet/src/main.rs b/maskinrommet/src/main.rs index cafe07d..4592471 100644 --- a/maskinrommet/src/main.rs +++ b/maskinrommet/src/main.rs @@ -6,6 +6,7 @@ pub mod audio; pub mod bandwidth; mod auth; pub mod cas; +pub mod cli_dispatch; mod custom_domain; mod intentions; pub mod jobs; diff --git a/maskinrommet/src/tts.rs b/maskinrommet/src/tts.rs index f0b3569..55774f7 100644 --- a/maskinrommet/src/tts.rs +++ b/maskinrommet/src/tts.rs @@ -1,57 +1,40 @@ -// TTS-pipeline — tekst til lyd via ElevenLabs. +// TTS-dispatcher — delegerer til synops-tts CLI. +// +// Maskinrommet beholder: voice_id-oppslag (payload > node metadata > env), +// og STDB-skriving (sanntidsvisning). Alt annet (ElevenLabs-kall, CAS-lagring, +// PG-skriving) gjøres av synops-tts. // // Jobbtype: "tts_generate" -// Payload: { -// "text": "", -// "voice_id": "", (valgfritt, bruker default) -// "language": "no", (valgfritt) -// "source_node_id": "", (noden teksten tilhører) -// "requested_by": "" -// } +// Payload: { "text", "voice_id"?, "language"?, "source_node_id"?, "requested_by" } // -// Flyten: -// 1. Hent tekst og voice-preferanse fra payload -// 2. Sjekk mottaker-preferanse i source-nodens metadata (voice_preference) -// 3. Kall ElevenLabs API -// 4. Lagre lyd i CAS -// 5. Opprett media-node med has_media-edge til kilde -// 6. Logg ressursforbruk -// -// Ref: docs/features/ressursforbruk.md, docs/proposals/ghost_host_tts.md +// Ref: docs/retninger/unix_filosofi.md, docs/proposals/ghost_host_tts.md use sqlx::PgPool; use uuid::Uuid; -use crate::cas::CasStore; +use crate::cli_dispatch; use crate::jobs::JobRow; -use crate::resource_usage; use crate::stdb::StdbClient; -/// Maks tekst-lengde for TTS (ElevenLabs grense er 5000 tegn per kall). -const MAX_TEXT_LENGTH: usize = 5000; +/// Synops-tts binary path. +fn tts_bin() -> String { + std::env::var("SYNOPS_TTS_BIN") + .unwrap_or_else(|_| "synops-tts".to_string()) +} /// Håndterer tts_generate-jobb. +/// +/// Spawner synops-tts med --write for å gjøre alt arbeidet: +/// ElevenLabs-kall, CAS-lagring, PG-skriving, ressurslogging. +/// Maskinrommet gjør etterpå STDB-synk for sanntidsvisning. pub async fn handle_tts_job( job: &JobRow, db: &PgPool, stdb: &StdbClient, - cas: &CasStore, ) -> Result { let text = job.payload["text"] .as_str() - .ok_or("Mangler 'text' i payload")? - .to_string(); - - if text.is_empty() { - return Err("Tom tekst — ingenting å generere".to_string()); - } - if text.len() > MAX_TEXT_LENGTH { - return Err(format!( - "Tekst for lang: {} tegn (maks {})", - text.len(), - MAX_TEXT_LENGTH - )); - } + .ok_or("Mangler 'text' i payload")?; let source_node_id: Option = job.payload["source_node_id"] .as_str() @@ -62,168 +45,102 @@ pub async fn handle_tts_job( .and_then(|s| s.parse().ok()) .ok_or("Mangler gyldig 'requested_by' i payload")?; + let language = job.payload["language"] + .as_str() + .unwrap_or("no"); + // Bestem voice_id: payload > source-node metadata > env default let voice_id = resolve_voice_id(job, db, source_node_id).await?; - let language = job.payload["language"] - .as_str() - .unwrap_or("no") - .to_string(); + // Bygg kommando + let bin = tts_bin(); + let mut cmd = tokio::process::Command::new(&bin); + + cmd.arg("--text").arg(text) + .arg("--voice").arg(&voice_id) + .arg("--language").arg(language) + .arg("--requested-by").arg(requested_by.to_string()) + .arg("--write"); + + if let Some(source_id) = source_node_id { + cmd.arg("--source-node-id").arg(source_id.to_string()); + } + + // Sett miljøvariabler CLI-verktøyet trenger + cli_dispatch::set_database_url(&mut cmd)?; + cli_dispatch::forward_env(&mut cmd, "CAS_ROOT"); + cli_dispatch::forward_env(&mut cmd, "ELEVENLABS_API_KEY"); + cli_dispatch::forward_env(&mut cmd, "ELEVENLABS_MODEL"); + cli_dispatch::forward_env(&mut cmd, "ELEVENLABS_DEFAULT_VOICE"); tracing::info!( text_len = text.len(), voice_id = %voice_id, - language = %language, - "Starter TTS-generering" + bin = %bin, + "Starter synops-tts" ); - // 1. Kall ElevenLabs API - let audio_bytes = call_elevenlabs(&text, &voice_id).await?; + let result = cli_dispatch::run_cli_tool(&bin, &mut cmd).await?; - tracing::info!( - audio_size = audio_bytes.len(), - "Mottok lyd fra ElevenLabs" - ); - - // 2. Lagre i CAS - let cas_result = cas - .store(&audio_bytes) - .await - .map_err(|e| format!("CAS-lagring feilet: {e}"))?; - - tracing::info!( - cas_hash = %cas_result.hash, - size = cas_result.size, - dedup = cas_result.already_existed, - "Lyd lagret i CAS" - ); - - // 3. Opprett media-node for lydfilen - let media_node_id = Uuid::now_v7(); - let title = format!("TTS: {}", truncate(&text, 60)); - let metadata = serde_json::json!({ - "cas_hash": cas_result.hash, - "mime": "audio/mpeg", - "size_bytes": cas_result.size, - "tts": { - "provider": "elevenlabs", - "voice_id": voice_id, - "language": language, - "characters": text.len(), - } - }); - let metadata_str = metadata.to_string(); - - // STDB først (sanntid) - stdb.create_node( - &media_node_id.to_string(), - "content", - &title, - "", - "hidden", - &metadata_str, - &requested_by.to_string(), - ) - .await - .map_err(|e| format!("STDB create_node feilet: {e}"))?; - - // PG (persistering) - sqlx::query( - r#" - INSERT INTO nodes (id, node_kind, title, content, visibility, metadata, created_by) - VALUES ($1, 'content', $2, '', 'hidden'::visibility, $3, $4) - "#, - ) - .bind(media_node_id) - .bind(&title) - .bind(&metadata) - .bind(requested_by) - .execute(db) - .await - .map_err(|e| format!("PG insert media-node feilet: {e}"))?; - - // 4. Opprett has_media-edge fra kilde-node til TTS-noden (hvis kilde finnes) - if let Some(source_id) = source_node_id { - let edge_id = Uuid::now_v7(); - let edge_meta = serde_json::json!({ - "media_type": "tts_audio", - "generated_at": chrono::Utc::now().to_rfc3339() + // --- STDB-synk for sanntidsvisning --- + // synops-tts skriver PG. Vi synker til STDB for umiddelbar visning. + if let Some(media_node_id) = result["media_node_id"].as_str() { + let cas_hash = result["cas_hash"].as_str().unwrap_or(""); + let characters = result["characters"].as_u64().unwrap_or(0); + let title = format!("TTS: {}", truncate(text, 60)); + let metadata = serde_json::json!({ + "cas_hash": cas_hash, + "mime": "audio/mpeg", + "tts": { "voice_id": &voice_id, "characters": characters } }); - let empty_meta = edge_meta.to_string(); - stdb.create_edge( - &edge_id.to_string(), - &source_id.to_string(), - &media_node_id.to_string(), - "has_media", - &empty_meta, - false, - &requested_by.to_string(), - ) - .await - .map_err(|e| format!("STDB create_edge (has_media) feilet: {e}"))?; + if let Err(e) = stdb.create_node( + media_node_id, "content", &title, "", "hidden", + &metadata.to_string(), &requested_by.to_string(), + ).await { + tracing::warn!(error = %e, "STDB create_node (tts) feilet (PG er allerede skrevet)"); + } - sqlx::query( - r#" - INSERT INTO edges (id, source_id, target_id, edge_type, metadata, system, created_by) - VALUES ($1, $2, $3, 'has_media', $4, false, $5) - "#, - ) - .bind(edge_id) - .bind(source_id) - .bind(media_node_id) - .bind(&edge_meta) - .bind(requested_by) - .execute(db) - .await - .map_err(|e| format!("PG insert has_media-edge feilet: {e}"))?; + // has_media-edge + if let Some(source_id) = source_node_id { + let edge_id = Uuid::now_v7().to_string(); + let edge_meta = serde_json::json!({ + "media_type": "tts_audio", + "generated_at": chrono::Utc::now().to_rfc3339() + }); + + if let Err(e) = stdb.create_edge( + &edge_id, &source_id.to_string(), media_node_id, "has_media", + &edge_meta.to_string(), false, &requested_by.to_string(), + ).await { + tracing::warn!(error = %e, "STDB create_edge (has_media) feilet (PG er allerede skrevet)"); + } + } } - // 5. Logg ressursforbruk - let collection_id = resource_usage::find_collection_for_node(db, media_node_id).await; - if let Err(e) = resource_usage::log( - db, - media_node_id, - Some(requested_by), - collection_id, - "tts", - serde_json::json!({ - "provider": "elevenlabs", - "characters": text.len(), - "voice_id": voice_id - }), - ) - .await - { - tracing::warn!(error = %e, "Kunne ikke logge TTS-ressursforbruk"); - } + tracing::info!( + cas_hash = result["cas_hash"].as_str().unwrap_or("n/a"), + media_node_id = result["media_node_id"].as_str().unwrap_or("n/a"), + "synops-tts fullført" + ); - Ok(serde_json::json!({ - "status": "completed", - "media_node_id": media_node_id.to_string(), - "cas_hash": cas_result.hash, - "characters": text.len(), - "audio_size_bytes": cas_result.size, - "voice_id": voice_id, - })) + Ok(result) } /// Bestem voice_id: payload-verdi > source-node metadata.voice_preference > env default. -/// "Mottaker-preferanse i metadata" betyr at en node kan ha -/// metadata.voice_preference.voice_id som overstyrer default. async fn resolve_voice_id( job: &JobRow, db: &PgPool, source_node_id: Option, ) -> Result { - // 1. Eksplisitt i payload — alltid høyest prioritet + // 1. Eksplisitt i payload if let Some(vid) = job.payload["voice_id"].as_str() { if !vid.is_empty() { return Ok(vid.to_string()); } } - // 2. Sjekk source-nodens metadata.voice_preference.voice_id + // 2. Source-nodens metadata.voice_preference.voice_id if let Some(node_id) = source_node_id { let meta: Option = sqlx::query_scalar( "SELECT metadata FROM nodes WHERE id = $1", @@ -241,11 +158,7 @@ async fn resolve_voice_id( .and_then(|v| v.as_str()) { if !vid.is_empty() { - tracing::info!( - node_id = %node_id, - voice_id = %vid, - "Bruker mottaker-preferanse fra node-metadata" - ); + tracing::info!(node_id = %node_id, voice_id = %vid, "Bruker mottaker-preferanse"); return Ok(vid.to_string()); } } @@ -253,57 +166,8 @@ async fn resolve_voice_id( } // 3. Miljøvariabel-default - let default = std::env::var("ELEVENLABS_DEFAULT_VOICE") - .unwrap_or_else(|_| "21m00Tcm4TlvDq8ikWAM".to_string()); // Rachel (ElevenLabs premade) - - Ok(default) -} - -/// Kall ElevenLabs text-to-speech API. -/// Returnerer rå MP3-bytes. -async fn call_elevenlabs(text: &str, voice_id: &str) -> Result, String> { - let api_key = std::env::var("ELEVENLABS_API_KEY") - .map_err(|_| "ELEVENLABS_API_KEY er ikke satt".to_string())?; - - let model_id = std::env::var("ELEVENLABS_MODEL") - .unwrap_or_else(|_| "eleven_multilingual_v2".to_string()); - - let url = format!( - "https://api.elevenlabs.io/v1/text-to-speech/{voice_id}" - ); - - let payload = serde_json::json!({ - "text": text, - "model_id": model_id, - "voice_settings": { - "stability": 0.5, - "similarity_boost": 0.75, - "style": 0.0, - "use_speaker_boost": true - } - }); - - let client = reqwest::Client::new(); - let resp = client - .post(&url) - .header("xi-api-key", &api_key) - .header("Accept", "audio/mpeg") - .json(&payload) - .timeout(std::time::Duration::from_secs(30)) - .send() - .await - .map_err(|e| format!("ElevenLabs HTTP-feil: {e}"))?; - - if !resp.status().is_success() { - let status = resp.status(); - let body = resp.text().await.unwrap_or_default(); - return Err(format!("ElevenLabs returnerte {status}: {body}")); - } - - resp.bytes() - .await - .map(|b| b.to_vec()) - .map_err(|e| format!("Kunne ikke lese ElevenLabs-respons: {e}")) + Ok(std::env::var("ELEVENLABS_DEFAULT_VOICE") + .unwrap_or_else(|_| "21m00Tcm4TlvDq8ikWAM".to_string())) } /// Forkorter en streng til maks `max_len` tegn med "..." suffix. diff --git a/tasks.md b/tasks.md index c604940..0c58ab6 100644 --- a/tasks.md +++ b/tasks.md @@ -261,8 +261,7 @@ kaller dem direkte. Samme verktøy, to brukere. ### Infrastruktur -- [~] 21.15 Jobbkø-dispatcher: endre maskinrommets jobbkø-handlere til å spawne CLI-verktøy (`Command::new("synops-X")`) i stedet for inline-kode. Stdout → jobbresultat, stderr → feillogg, exitkode → status. - > Påbegynt: 2026-03-18T10:25 +- [x] 21.15 Jobbkø-dispatcher: endre maskinrommets jobbkø-handlere til å spawne CLI-verktøy (`Command::new("synops-X")`) i stedet for inline-kode. Stdout → jobbresultat, stderr → feillogg, exitkode → status. - [ ] 21.16 Felles lib: `synops-common` crate med PG-tilkobling, CAS-helpers, og node/edge-typer. Deles mellom alle CLI-verktøy for å unngå duplisering. ## Fase 12: Herding diff --git a/tools/synops-tts/src/main.rs b/tools/synops-tts/src/main.rs index ca3b088..3ebc13d 100644 --- a/tools/synops-tts/src/main.rs +++ b/tools/synops-tts/src/main.rs @@ -112,8 +112,17 @@ async fn run(cli: Cli) -> Result<(), String> { tracing::info!(cas_hash = %cas_hash, "Lyd lagret i CAS"); - // 3. Bygg resultat-JSON - let result = serde_json::json!({ + // 3. Skriv til database hvis --write, og hent media_node_id + let media_node_id = if cli.write { + let requested_by = cli.requested_by.unwrap(); // Allerede validert + Some(write_to_db(&cas_hash, &cli.text, &voice_id, &cli.language, audio_bytes.len(), + cli.source_node_id, requested_by).await?) + } else { + None + }; + + // 4. Bygg resultat-JSON + let mut result = serde_json::json!({ "cas_hash": cas_hash, "size_bytes": audio_bytes.len(), "voice_id": voice_id, @@ -122,11 +131,8 @@ async fn run(cli: Cli) -> Result<(), String> { "mime": "audio/mpeg", }); - // 4. Skriv til database hvis --write - if cli.write { - let requested_by = cli.requested_by.unwrap(); // Allerede validert - write_to_db(&cas_hash, &cli.text, &voice_id, &cli.language, audio_bytes.len(), - cli.source_node_id, requested_by).await?; + if let Some(id) = media_node_id { + result["media_node_id"] = serde_json::Value::String(id.to_string()); } // 5. Output JSON til stdout @@ -223,6 +229,7 @@ async fn store_in_cas(cas_root: &str, data: &[u8]) -> Result { } /// Opprett media-node og has_media-edge i PostgreSQL. +/// Returnerer media_node_id for STDB-synk. async fn write_to_db( cas_hash: &str, text: &str, @@ -231,7 +238,7 @@ async fn write_to_db( size_bytes: usize, source_node_id: Option, requested_by: Uuid, -) -> Result<(), String> { +) -> Result { let db_url = std::env::var("DATABASE_URL") .map_err(|_| "DATABASE_URL må settes med --write".to_string())?; @@ -325,7 +332,7 @@ async fn write_to_db( tracing::warn!(error = %e, "Kunne ikke logge TTS-ressursforbruk"); } - Ok(()) + Ok(media_node_id) } /// Forkorter en streng til maks `max_len` tegn med "..." suffix.