Jobbkø-dispatcher: spawn CLI-verktøy i stedet for inline-kode (oppgave 21.15)
Alle jobbkø-handlere i maskinrommet delegerer nå til CLI-verktøy
(Command::new("synops-X")) i stedet for å kjøre logikk inline.
Stdout → jobbresultat (JSON), stderr → feillogg, exitkode → status.
Konverterte handlere:
- tts_generate → synops-tts (beholder voice-oppslag + STDB-synk)
- suggest_edges → synops-suggest-edges (STDB-synk utelatt, topics er bakgrunnsdata)
- render_article → synops-render --render-type article
- render_index → synops-render --render-type index
- audio_process → synops-audio (beholder STDB-synk)
Allerede CLI-delegerende (uendret):
- whisper_transcribe → synops-transcribe
- agent_respond → synops-respond
- summarize_communication → synops-summarize
Fortsatt inline (mangler CLI-verktøy):
- ai_process — planlagt i fremtidig oppgave
Ny felles modul: cli_dispatch.rs med run_cli_tool() og env-helpers.
synops-tts modifisert til å inkludere media_node_id i output.
Ref: docs/retninger/unix_filosofi.md
This commit is contained in:
parent
bde4285c15
commit
bfc88b9a80
9 changed files with 393 additions and 839 deletions
|
|
@ -40,17 +40,37 @@ CREATE INDEX idx_job_queue_pending ON job_queue (priority DESC, scheduled_for AS
|
|||
## 4. Worker-arkitektur (Rust)
|
||||
|
||||
### 4.1 Designprinsipp: Orkestrator, ikke prosesseringsmotor
|
||||
Workeren gjør lite tung prosessering selv. Den er en **orkestrator** som koordinerer eksterne tjenester:
|
||||
Workeren gjør lite tung prosessering selv. Den er en **orkestrator** som spawner
|
||||
CLI-verktøy (`Command::new("synops-X")`) som gjør selve jobben.
|
||||
|
||||
| Jobbtype | Hva workeren gjør | Tung logikk i workeren? |
|
||||
**Kontrakt mellom maskinrommet og CLI-verktøy:**
|
||||
- **Stdout** → jobbresultat (JSON). Parses og lagres i `job_queue.result`.
|
||||
- **Stderr** → feillogg. Logges av maskinrommet.
|
||||
- **Exitkode** → status. 0 = suksess, != 0 = feil.
|
||||
- **`--write` flag** → CLI-verktøyet skriver til PG selv. Uten flagget: kun stdout.
|
||||
|
||||
**Hva maskinrommet beholder inline:**
|
||||
- Payload-parsing og validering
|
||||
- Sikkerhetskontroller (kill switch, rate limiting, loop-prevensjon for agent)
|
||||
- Voice/model-oppslag fra node metadata (orchestrator-logikk)
|
||||
- STDB-synk etter CLI fullført (sanntidsvisning)
|
||||
- Jobbstatus-håndtering (complete/fail/retry)
|
||||
|
||||
| Jobbtype | CLI-verktøy | Maskinrommet beholder |
|
||||
|---|---|---|
|
||||
| `whisper_transcribe` | HTTP-kall til faster-whisper-server (SRT), parse → `transcription_segments` | Lett SRT-parsing |
|
||||
| `openrouter_analyze` | HTTP-kall til AI Gateway | Nei — venter på svar |
|
||||
| `stats_parse` | Parser Caddy-loggfiler, skriver til PG | Lett I/O |
|
||||
| `research_clip` | HTTP-kall til AI Gateway | Nei — venter på svar |
|
||||
| `generate_embeddings` | HTTP-kall til AI Gateway | Nei — venter på svar |
|
||||
| `whisper_transcribe` | `synops-transcribe` | — |
|
||||
| `agent_respond` | `synops-respond` | Kill switch, rate limit, loop-prevensjon, STDB-synk |
|
||||
| `suggest_edges` | `synops-suggest-edges` | — |
|
||||
| `summarize_communication` | `synops-summarize` | — |
|
||||
| `tts_generate` | `synops-tts` | Voice-oppslag fra node metadata, STDB-synk |
|
||||
| `audio_process` | `synops-audio` | STDB-synk |
|
||||
| `render_article` | `synops-render` | — |
|
||||
| `render_index` | `synops-render` | — |
|
||||
| `ai_process` | *(inline — mangler CLI)* | Alt (planlagt migrasjon) |
|
||||
|
||||
Ny jobbtype = ny handler-funksjon (bygg request, håndter respons, feilhåndtering). Tynt glue-code. Rekompilering er triviell og inkrementell.
|
||||
**Felles dispatcher-kode:** `cli_dispatch.rs` — `run_cli_tool()` og env-helpers.
|
||||
|
||||
Ny jobbtype = nytt CLI-verktøy + tynn dispatcher-handler. Ref: `docs/retninger/unix_filosofi.md`.
|
||||
|
||||
### 4.2 Én worker, prioritetsstyrt
|
||||
Én enkelt worker-prosess håndterer **alle jobbtyper**. Prioritering skjer via `priority`-kolonnen i køen — SQL-spørringen plukker alltid viktigste jobb først. Ingen behov for separate prosesser per jobbtype.
|
||||
|
|
|
|||
|
|
@ -1,136 +1,38 @@
|
|||
// AI-foreslåtte edges — sender innhold til LLM, foreslår mentions og topics.
|
||||
// AI edge-forslag dispatcher — delegerer til synops-suggest-edges CLI.
|
||||
//
|
||||
// Maskinrommet orkestrerer, CLI-verktøyet gjør jobben (LLM-kall,
|
||||
// topic-opprettelse, edge-skriving, ressurslogging).
|
||||
//
|
||||
// STDB-synk for topic-noder og mentions-edges hoppes over her —
|
||||
// topics er bakgrunnsdata i kunnskapsgrafen og trenger ikke
|
||||
// sanntidsvisning. De synkes ved neste STDB-refresh.
|
||||
//
|
||||
// Jobbtype: "suggest_edges"
|
||||
// Payload: { "node_id": "<uuid>" }
|
||||
//
|
||||
// Flyten:
|
||||
// 1. Hent nodens innhold fra PG
|
||||
// 2. Hent eksisterende topic-noder for gjenbruk
|
||||
// 3. Send til LiteLLM (OpenAI-kompatibelt API via AI Gateway)
|
||||
// 4. Parse LLM-svar (JSON med topics og mentions)
|
||||
// 5. Opprett nye topic-noder for ukjente topics
|
||||
// 6. Opprett mentions-edges fra innholdsnode til topic/entity-noder
|
||||
//
|
||||
// Ref: docs/infra/ai_gateway.md, docs/concepts/kunnskapsgrafen.md
|
||||
// Ref: docs/retninger/unix_filosofi.md, docs/infra/ai_gateway.md,
|
||||
// docs/concepts/kunnskapsgrafen.md
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
use sqlx::PgPool;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::cli_dispatch;
|
||||
use crate::jobs::JobRow;
|
||||
use crate::resource_usage;
|
||||
use crate::stdb::StdbClient;
|
||||
|
||||
/// Eksisterende topic-node fra PG.
|
||||
#[derive(sqlx::FromRow)]
|
||||
struct TopicRow {
|
||||
id: Uuid,
|
||||
title: String,
|
||||
/// Synops-suggest-edges binary path.
|
||||
fn suggest_edges_bin() -> String {
|
||||
std::env::var("SYNOPS_SUGGEST_EDGES_BIN")
|
||||
.unwrap_or_else(|_| "synops-suggest-edges".to_string())
|
||||
}
|
||||
|
||||
/// Kilde-node fra PG.
|
||||
#[derive(sqlx::FromRow)]
|
||||
struct SourceNode {
|
||||
title: Option<String>,
|
||||
content: Option<String>,
|
||||
created_by: Option<Uuid>,
|
||||
}
|
||||
|
||||
/// LLM-respons: foreslåtte topics og mentions.
|
||||
#[derive(Deserialize, Debug)]
|
||||
struct AiSuggestion {
|
||||
/// Emnene innholdet handler om (nye eller eksisterende).
|
||||
#[serde(default)]
|
||||
topics: Vec<String>,
|
||||
/// Entiteter nevnt i innholdet (personer, organisasjoner, steder).
|
||||
#[serde(default)]
|
||||
mentions: Vec<MentionSuggestion>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Debug)]
|
||||
struct MentionSuggestion {
|
||||
/// Navn på entiteten.
|
||||
name: String,
|
||||
/// Type: person, organisasjon, sted, konsept.
|
||||
#[serde(default = "default_entity_type")]
|
||||
entity_type: String,
|
||||
}
|
||||
|
||||
fn default_entity_type() -> String {
|
||||
"person".to_string()
|
||||
}
|
||||
|
||||
/// OpenAI-kompatibel chat completion request.
|
||||
#[derive(Serialize)]
|
||||
struct ChatRequest {
|
||||
model: String,
|
||||
messages: Vec<ChatMessage>,
|
||||
temperature: f32,
|
||||
response_format: ResponseFormat,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
struct ResponseFormat {
|
||||
r#type: String,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
struct ChatMessage {
|
||||
role: String,
|
||||
content: String,
|
||||
}
|
||||
|
||||
/// OpenAI-kompatibel chat completion response.
|
||||
#[derive(Deserialize)]
|
||||
struct ChatResponse {
|
||||
choices: Vec<Choice>,
|
||||
#[serde(default)]
|
||||
usage: Option<UsageInfo>,
|
||||
#[serde(default)]
|
||||
model: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Clone)]
|
||||
struct UsageInfo {
|
||||
#[serde(default)]
|
||||
prompt_tokens: i64,
|
||||
#[serde(default)]
|
||||
completion_tokens: i64,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct Choice {
|
||||
message: MessageContent,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct MessageContent {
|
||||
content: Option<String>,
|
||||
}
|
||||
|
||||
const SYSTEM_PROMPT: &str = r#"Du er en innholdsanalysator for en norsk redaksjonsplattform. Analyser teksten og ekstraher:
|
||||
|
||||
1. **topics**: Emner/temaer teksten handler om. Bruk korte, presise norske termer (f.eks. "skolepolitikk", "klimaendringer", "statsbudsjettet"). Maks 5 topics.
|
||||
|
||||
2. **mentions**: Navngitte entiteter (personer, organisasjoner, steder) som er eksplisitt nevnt. Inkluder entity_type ("person", "organisasjon", "sted", "konsept").
|
||||
|
||||
Returner KUN et JSON-objekt med denne strukturen:
|
||||
{
|
||||
"topics": ["emne1", "emne2"],
|
||||
"mentions": [{"name": "Navn", "entity_type": "person"}]
|
||||
}
|
||||
|
||||
Regler:
|
||||
- Returner tom liste hvis teksten ikke har meningsfullt innhold (hilsener, korte svar, etc.)
|
||||
- Bruk eksisterende topics fra listen nedenfor der det passer, i stedet for å lage nye varianter
|
||||
- Ikke inkluder generiske termer som "samtale" eller "diskusjon"
|
||||
- Navngi entiteter med full, autoritativ form (f.eks. "Jonas Gahr Støre", ikke "Støre")"#;
|
||||
|
||||
/// Håndterer suggest_edges-jobb.
|
||||
///
|
||||
/// Spawner synops-suggest-edges med --write for å gjøre alt arbeidet:
|
||||
/// LLM-kall, topic-opprettelse, edge-skriving, ressurslogging.
|
||||
pub async fn handle_suggest_edges(
|
||||
job: &JobRow,
|
||||
db: &PgPool,
|
||||
stdb: &StdbClient,
|
||||
_db: &sqlx::PgPool,
|
||||
_stdb: &StdbClient,
|
||||
) -> Result<serde_json::Value, String> {
|
||||
let node_id: Uuid = job
|
||||
.payload
|
||||
|
|
@ -139,387 +41,43 @@ pub async fn handle_suggest_edges(
|
|||
.and_then(|s| s.parse().ok())
|
||||
.ok_or("Mangler gyldig node_id i payload")?;
|
||||
|
||||
tracing::info!(node_id = %node_id, "Starter AI edge-forslag");
|
||||
// requested_by er valgfri — bruk node_id som fallback for --write
|
||||
let requested_by = job
|
||||
.payload
|
||||
.get("requested_by")
|
||||
.and_then(|v| v.as_str())
|
||||
.unwrap_or(&node_id.to_string())
|
||||
.to_string();
|
||||
|
||||
// 1. Hent kildenode
|
||||
let source = sqlx::query_as::<_, SourceNode>(
|
||||
"SELECT title, content, created_by FROM nodes WHERE id = $1",
|
||||
)
|
||||
.bind(node_id)
|
||||
.fetch_optional(db)
|
||||
.await
|
||||
.map_err(|e| format!("PG-feil ved henting av node: {e}"))?
|
||||
.ok_or_else(|| format!("Node {node_id} finnes ikke"))?;
|
||||
// Bygg kommando
|
||||
let bin = suggest_edges_bin();
|
||||
let mut cmd = tokio::process::Command::new(&bin);
|
||||
|
||||
let title = source.title.unwrap_or_default();
|
||||
let content = source.content.unwrap_or_default();
|
||||
cmd.arg("--node-id").arg(node_id.to_string())
|
||||
.arg("--requested-by").arg(&requested_by)
|
||||
.arg("--write");
|
||||
|
||||
// Ikke analyser tomme noder eller veldig korte meldinger
|
||||
let text = format!("{title}\n{content}").trim().to_string();
|
||||
if text.len() < 20 {
|
||||
tracing::info!(node_id = %node_id, len = text.len(), "For kort innhold, hopper over AI-analyse");
|
||||
return Ok(serde_json::json!({
|
||||
"status": "skipped",
|
||||
"reason": "content_too_short"
|
||||
}));
|
||||
}
|
||||
|
||||
// 2. Hent eksisterende topic-noder for kontekst
|
||||
let existing_topics = sqlx::query_as::<_, TopicRow>(
|
||||
"SELECT id, title FROM nodes WHERE node_kind = 'topic' ORDER BY created_at DESC LIMIT 100",
|
||||
)
|
||||
.fetch_all(db)
|
||||
.await
|
||||
.map_err(|e| format!("PG-feil ved henting av topics: {e}"))?;
|
||||
|
||||
let topic_list: Vec<&str> = existing_topics.iter().map(|t| t.title.as_str()).collect();
|
||||
|
||||
// 3. Bygg prompt og kall LiteLLM
|
||||
let user_content = if topic_list.is_empty() {
|
||||
format!("Analyser følgende tekst:\n\n{text}")
|
||||
} else {
|
||||
format!(
|
||||
"Eksisterende topics: {}\n\nAnalyser følgende tekst:\n\n{text}",
|
||||
topic_list.join(", ")
|
||||
)
|
||||
};
|
||||
|
||||
let (suggestion, llm_usage, llm_model) = call_llm(&user_content).await?;
|
||||
// Sett miljøvariabler CLI-verktøyet trenger
|
||||
cli_dispatch::set_database_url(&mut cmd)?;
|
||||
cli_dispatch::forward_env(&mut cmd, "AI_GATEWAY_URL");
|
||||
cli_dispatch::forward_env(&mut cmd, "LITELLM_MASTER_KEY");
|
||||
cli_dispatch::forward_env(&mut cmd, "AI_EDGES_MODEL");
|
||||
|
||||
tracing::info!(
|
||||
node_id = %node_id,
|
||||
topics = ?suggestion.topics,
|
||||
mentions = suggestion.mentions.len(),
|
||||
"LLM-forslag mottatt"
|
||||
bin = %bin,
|
||||
"Starter synops-suggest-edges"
|
||||
);
|
||||
|
||||
// 4. Opprett topic-noder og mentions-edges
|
||||
let created_by = source.created_by.unwrap_or(node_id);
|
||||
let mut created_topics = 0u32;
|
||||
let mut created_edges = 0u32;
|
||||
|
||||
// Prosesser topics
|
||||
for topic_name in &suggestion.topics {
|
||||
let topic_name = topic_name.trim();
|
||||
if topic_name.is_empty() {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Finn eksisterende topic med case-insensitivt match
|
||||
let existing = existing_topics
|
||||
.iter()
|
||||
.find(|t| t.title.to_lowercase() == topic_name.to_lowercase());
|
||||
|
||||
let topic_id = if let Some(t) = existing {
|
||||
t.id
|
||||
} else {
|
||||
// Opprett ny topic-node
|
||||
let new_id = Uuid::now_v7();
|
||||
create_topic_node(db, stdb, new_id, topic_name, created_by).await?;
|
||||
created_topics += 1;
|
||||
new_id
|
||||
};
|
||||
|
||||
// Opprett mentions-edge: innholdsnode → topic
|
||||
if create_mentions_edge(db, stdb, node_id, topic_id, created_by).await? {
|
||||
created_edges += 1;
|
||||
}
|
||||
}
|
||||
|
||||
// Prosesser mentions (entiteter)
|
||||
for mention in &suggestion.mentions {
|
||||
let name = mention.name.trim();
|
||||
if name.is_empty() {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Søk etter eksisterende entitet med samme tittel
|
||||
let existing_entity = sqlx::query_scalar::<_, Uuid>(
|
||||
"SELECT id FROM nodes WHERE node_kind = 'topic' AND LOWER(title) = LOWER($1) LIMIT 1",
|
||||
)
|
||||
.bind(name)
|
||||
.fetch_optional(db)
|
||||
.await
|
||||
.map_err(|e| format!("PG-feil ved entitet-søk: {e}"))?;
|
||||
|
||||
let entity_id = if let Some(id) = existing_entity {
|
||||
id
|
||||
} else {
|
||||
// Opprett ny entitet som topic-node med entity_type i metadata
|
||||
let new_id = Uuid::now_v7();
|
||||
create_entity_node(db, stdb, new_id, name, &mention.entity_type, created_by).await?;
|
||||
created_topics += 1;
|
||||
new_id
|
||||
};
|
||||
|
||||
// Opprett mentions-edge: innholdsnode → entitet
|
||||
if create_mentions_edge(db, stdb, node_id, entity_id, created_by).await? {
|
||||
created_edges += 1;
|
||||
}
|
||||
}
|
||||
|
||||
let result = serde_json::json!({
|
||||
"status": "completed",
|
||||
"topics_created": created_topics,
|
||||
"edges_created": created_edges,
|
||||
"suggestions": {
|
||||
"topics": suggestion.topics,
|
||||
"mentions": suggestion.mentions.iter().map(|m| &m.name).collect::<Vec<_>>()
|
||||
}
|
||||
});
|
||||
|
||||
// Logg AI-ressursforbruk
|
||||
let collection_id = resource_usage::find_collection_for_node(db, node_id).await;
|
||||
let (tokens_in, tokens_out) = llm_usage
|
||||
.map(|u| (u.prompt_tokens, u.completion_tokens))
|
||||
.unwrap_or((0, 0));
|
||||
|
||||
if let Err(e) = resource_usage::log(
|
||||
db,
|
||||
node_id,
|
||||
source.created_by,
|
||||
collection_id,
|
||||
"ai",
|
||||
serde_json::json!({
|
||||
"model_level": "fast",
|
||||
"model_id": llm_model.unwrap_or_else(|| "unknown".to_string()),
|
||||
"tokens_in": tokens_in,
|
||||
"tokens_out": tokens_out,
|
||||
"job_type": "suggest_edges"
|
||||
}),
|
||||
)
|
||||
.await
|
||||
{
|
||||
tracing::warn!(error = %e, "Kunne ikke logge AI-ressursforbruk for edge-forslag");
|
||||
}
|
||||
let result = cli_dispatch::run_cli_tool(&bin, &mut cmd).await?;
|
||||
|
||||
tracing::info!(
|
||||
node_id = %node_id,
|
||||
topics_created = created_topics,
|
||||
edges_created = created_edges,
|
||||
"AI edge-forslag fullført"
|
||||
topics_created = result["topics_created"].as_u64().unwrap_or(0),
|
||||
edges_created = result["edges_created"].as_u64().unwrap_or(0),
|
||||
status = result["status"].as_str().unwrap_or("unknown"),
|
||||
"synops-suggest-edges fullført"
|
||||
);
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
/// Kall LiteLLM (OpenAI-kompatibelt API) for å analysere innhold.
|
||||
/// Returnerer (forslag, usage, model).
|
||||
async fn call_llm(user_content: &str) -> Result<(AiSuggestion, Option<UsageInfo>, Option<String>), String> {
|
||||
let gateway_url = std::env::var("AI_GATEWAY_URL")
|
||||
.unwrap_or_else(|_| "http://localhost:4000".to_string());
|
||||
let api_key = std::env::var("LITELLM_MASTER_KEY")
|
||||
.unwrap_or_default();
|
||||
|
||||
// Bruk sidelinja/rutine (billig modell) for edge-forslag
|
||||
let model = std::env::var("AI_EDGES_MODEL")
|
||||
.unwrap_or_else(|_| "sidelinja/rutine".to_string());
|
||||
|
||||
let request = ChatRequest {
|
||||
model,
|
||||
messages: vec![
|
||||
ChatMessage {
|
||||
role: "system".to_string(),
|
||||
content: SYSTEM_PROMPT.to_string(),
|
||||
},
|
||||
ChatMessage {
|
||||
role: "user".to_string(),
|
||||
content: user_content.to_string(),
|
||||
},
|
||||
],
|
||||
temperature: 0.2,
|
||||
response_format: ResponseFormat {
|
||||
r#type: "json_object".to_string(),
|
||||
},
|
||||
};
|
||||
|
||||
let client = reqwest::Client::new();
|
||||
let url = format!("{gateway_url}/v1/chat/completions");
|
||||
|
||||
let resp = client
|
||||
.post(&url)
|
||||
.header("Authorization", format!("Bearer {api_key}"))
|
||||
.header("Content-Type", "application/json")
|
||||
.json(&request)
|
||||
.timeout(std::time::Duration::from_secs(30))
|
||||
.send()
|
||||
.await
|
||||
.map_err(|e| format!("LiteLLM-kall feilet: {e}"))?;
|
||||
|
||||
if !resp.status().is_success() {
|
||||
let status = resp.status();
|
||||
let body = resp.text().await.unwrap_or_default();
|
||||
return Err(format!("LiteLLM returnerte {status}: {body}"));
|
||||
}
|
||||
|
||||
let chat_resp: ChatResponse = resp
|
||||
.json()
|
||||
.await
|
||||
.map_err(|e| format!("Kunne ikke parse LiteLLM-respons: {e}"))?;
|
||||
|
||||
let content = chat_resp
|
||||
.choices
|
||||
.first()
|
||||
.and_then(|c| c.message.content.as_deref())
|
||||
.ok_or("LiteLLM returnerte ingen content")?;
|
||||
|
||||
// Parse JSON fra LLM-output
|
||||
let suggestion: AiSuggestion = serde_json::from_str(content)
|
||||
.map_err(|e| format!("Kunne ikke parse LLM JSON: {e}. Rå output: {content}"))?;
|
||||
|
||||
Ok((suggestion, chat_resp.usage, chat_resp.model))
|
||||
}
|
||||
|
||||
/// Opprett en topic-node i PG og STDB.
|
||||
async fn create_topic_node(
|
||||
db: &PgPool,
|
||||
stdb: &StdbClient,
|
||||
id: Uuid,
|
||||
title: &str,
|
||||
created_by: Uuid,
|
||||
) -> Result<(), String> {
|
||||
let metadata = serde_json::json!({"ai_generated": true});
|
||||
|
||||
// STDB først
|
||||
stdb.create_node(
|
||||
&id.to_string(),
|
||||
"topic",
|
||||
title,
|
||||
"",
|
||||
"discoverable",
|
||||
&metadata.to_string(),
|
||||
&created_by.to_string(),
|
||||
)
|
||||
.await
|
||||
.map_err(|e| format!("STDB create_node (topic) feilet: {e}"))?;
|
||||
|
||||
// PG
|
||||
sqlx::query(
|
||||
r#"
|
||||
INSERT INTO nodes (id, node_kind, title, content, visibility, metadata, created_by)
|
||||
VALUES ($1, 'topic', $2, '', 'discoverable', $3, $4)
|
||||
ON CONFLICT (id) DO NOTHING
|
||||
"#,
|
||||
)
|
||||
.bind(id)
|
||||
.bind(title)
|
||||
.bind(&metadata)
|
||||
.bind(created_by)
|
||||
.execute(db)
|
||||
.await
|
||||
.map_err(|e| format!("PG insert topic feilet: {e}"))?;
|
||||
|
||||
tracing::info!(topic_id = %id, title = %title, "Ny topic-node opprettet (AI)");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Opprett en entitet-node (person, org, sted) i PG og STDB.
|
||||
async fn create_entity_node(
|
||||
db: &PgPool,
|
||||
stdb: &StdbClient,
|
||||
id: Uuid,
|
||||
name: &str,
|
||||
entity_type: &str,
|
||||
created_by: Uuid,
|
||||
) -> Result<(), String> {
|
||||
let metadata = serde_json::json!({
|
||||
"ai_generated": true,
|
||||
"entity_type": entity_type
|
||||
});
|
||||
|
||||
stdb.create_node(
|
||||
&id.to_string(),
|
||||
"topic",
|
||||
name,
|
||||
"",
|
||||
"discoverable",
|
||||
&metadata.to_string(),
|
||||
&created_by.to_string(),
|
||||
)
|
||||
.await
|
||||
.map_err(|e| format!("STDB create_node (entity) feilet: {e}"))?;
|
||||
|
||||
sqlx::query(
|
||||
r#"
|
||||
INSERT INTO nodes (id, node_kind, title, content, visibility, metadata, created_by)
|
||||
VALUES ($1, 'topic', $2, '', 'discoverable', $3, $4)
|
||||
ON CONFLICT (id) DO NOTHING
|
||||
"#,
|
||||
)
|
||||
.bind(id)
|
||||
.bind(name)
|
||||
.bind(&metadata)
|
||||
.bind(created_by)
|
||||
.execute(db)
|
||||
.await
|
||||
.map_err(|e| format!("PG insert entity feilet: {e}"))?;
|
||||
|
||||
tracing::info!(entity_id = %id, name = %name, entity_type = %entity_type, "Ny entitet-node opprettet (AI)");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Opprett en mentions-edge fra innholdsnode til target.
|
||||
/// Returnerer true hvis ny edge ble opprettet, false hvis den allerede eksisterte.
|
||||
async fn create_mentions_edge(
|
||||
db: &PgPool,
|
||||
stdb: &StdbClient,
|
||||
source_id: Uuid,
|
||||
target_id: Uuid,
|
||||
created_by: Uuid,
|
||||
) -> Result<bool, String> {
|
||||
// Sjekk om edge allerede finnes
|
||||
let exists = sqlx::query_scalar::<_, bool>(
|
||||
"SELECT EXISTS(SELECT 1 FROM edges WHERE source_id = $1 AND target_id = $2 AND edge_type = 'mentions')",
|
||||
)
|
||||
.bind(source_id)
|
||||
.bind(target_id)
|
||||
.fetch_one(db)
|
||||
.await
|
||||
.map_err(|e| format!("PG-feil ved edge-sjekk: {e}"))?;
|
||||
|
||||
if exists {
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
let edge_id = Uuid::now_v7();
|
||||
let metadata = serde_json::json!({"origin": "ai"});
|
||||
|
||||
// STDB først
|
||||
stdb.create_edge(
|
||||
&edge_id.to_string(),
|
||||
&source_id.to_string(),
|
||||
&target_id.to_string(),
|
||||
"mentions",
|
||||
&metadata.to_string(),
|
||||
false,
|
||||
&created_by.to_string(),
|
||||
)
|
||||
.await
|
||||
.map_err(|e| format!("STDB create_edge (mentions) feilet: {e}"))?;
|
||||
|
||||
// PG
|
||||
sqlx::query(
|
||||
r#"
|
||||
INSERT INTO edges (id, source_id, target_id, edge_type, metadata, system, created_by)
|
||||
VALUES ($1, $2, $3, 'mentions', $4, false, $5)
|
||||
ON CONFLICT (source_id, target_id, edge_type) DO NOTHING
|
||||
"#,
|
||||
)
|
||||
.bind(edge_id)
|
||||
.bind(source_id)
|
||||
.bind(target_id)
|
||||
.bind(&metadata)
|
||||
.bind(created_by)
|
||||
.execute(db)
|
||||
.await
|
||||
.map_err(|e| format!("PG insert mentions-edge feilet: {e}"))?;
|
||||
|
||||
tracing::info!(
|
||||
edge_id = %edge_id,
|
||||
source = %source_id,
|
||||
target = %target_id,
|
||||
"Mentions-edge opprettet (AI)"
|
||||
);
|
||||
|
||||
Ok(true)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -757,10 +757,23 @@ async fn resolve_silence_cuts(
|
|||
Ok(cuts)
|
||||
}
|
||||
|
||||
// ─── Jobbhåndterer ───────────────────────────────────────────────
|
||||
// ─── Jobbhåndterer — delegerer til synops-audio CLI ────────────────
|
||||
//
|
||||
// Maskinrommet beholder: STDB-synk for sanntidsvisning.
|
||||
// CLI gjør: FFmpeg-prosessering, CAS-lagring, PG-skriving, ressurslogging.
|
||||
|
||||
/// Synops-audio binary path.
|
||||
fn audio_bin() -> String {
|
||||
std::env::var("SYNOPS_AUDIO_BIN")
|
||||
.unwrap_or_else(|_| "synops-audio".to_string())
|
||||
}
|
||||
|
||||
/// Håndterer `audio_process`-jobber fra jobbkøen.
|
||||
///
|
||||
/// Spawner synops-audio med --write for å gjøre alt arbeidet:
|
||||
/// FFmpeg-prosessering, CAS-lagring, PG-skriving, ressurslogging.
|
||||
/// Maskinrommet gjør etterpå STDB-synk for sanntidsvisning.
|
||||
///
|
||||
/// Payload:
|
||||
/// ```json
|
||||
/// {
|
||||
|
|
@ -772,7 +785,7 @@ async fn resolve_silence_cuts(
|
|||
/// ```
|
||||
pub async fn handle_audio_process_job(
|
||||
job: &JobRow,
|
||||
db: &PgPool,
|
||||
_db: &PgPool,
|
||||
stdb: &StdbClient,
|
||||
cas: &CasStore,
|
||||
) -> Result<serde_json::Value, String> {
|
||||
|
|
@ -781,8 +794,14 @@ pub async fn handle_audio_process_job(
|
|||
.and_then(|s| s.parse().ok())
|
||||
.ok_or("Mangler media_node_id i payload")?;
|
||||
|
||||
let edl: EdlDocument = serde_json::from_value(job.payload["edl"].clone())
|
||||
.map_err(|e| format!("Ugyldig EDL i payload: {e}"))?;
|
||||
let edl_value = &job.payload["edl"];
|
||||
let edl_json = serde_json::to_string(edl_value)
|
||||
.map_err(|e| format!("Kunne ikke serialisere EDL: {e}"))?;
|
||||
|
||||
// Verifiser at source_hash finnes i EDL
|
||||
let cas_hash = edl_value["source_hash"]
|
||||
.as_str()
|
||||
.ok_or("Mangler source_hash i EDL")?;
|
||||
|
||||
let output_format = job.payload["output_format"]
|
||||
.as_str()
|
||||
|
|
@ -793,111 +812,75 @@ pub async fn handle_audio_process_job(
|
|||
.and_then(|s| s.parse().ok())
|
||||
.ok_or("Mangler requested_by i payload")?;
|
||||
|
||||
// Kjør prosessering
|
||||
let (result_hash, result_size) = process_audio(cas, &edl, output_format).await?;
|
||||
// Bygg kommando
|
||||
let bin = audio_bin();
|
||||
let mut cmd = tokio::process::Command::new(&bin);
|
||||
|
||||
// Bestem MIME-type
|
||||
let mime = match output_format {
|
||||
"mp3" => "audio/mpeg",
|
||||
"wav" => "audio/wav",
|
||||
"flac" => "audio/flac",
|
||||
"ogg" => "audio/ogg",
|
||||
_ => "audio/mpeg",
|
||||
};
|
||||
cmd.arg("--cas-hash").arg(cas_hash)
|
||||
.arg("--edl").arg(&edl_json)
|
||||
.arg("--output-format").arg(output_format)
|
||||
.arg("--node-id").arg(media_node_id.to_string())
|
||||
.arg("--requested-by").arg(requested_by.to_string())
|
||||
.arg("--write");
|
||||
|
||||
// Opprett ny medienode for den prosesserte filen
|
||||
let processed_node_id = Uuid::now_v7();
|
||||
let metadata = serde_json::json!({
|
||||
"cas_hash": result_hash,
|
||||
"mime": mime,
|
||||
"size_bytes": result_size,
|
||||
"source_hash": edl.source_hash,
|
||||
"edl": edl,
|
||||
});
|
||||
// Sett miljøvariabler
|
||||
crate::cli_dispatch::set_database_url(&mut cmd)?;
|
||||
cmd.env("CAS_ROOT", cas.root().to_string_lossy().to_string());
|
||||
|
||||
// Hent tittel fra original node
|
||||
let original_title: Option<String> = sqlx::query_scalar(
|
||||
"SELECT title FROM nodes WHERE id = $1"
|
||||
)
|
||||
.bind(media_node_id)
|
||||
.fetch_optional(db)
|
||||
.await
|
||||
.map_err(|e| format!("DB-feil: {e}"))?
|
||||
.flatten();
|
||||
tracing::info!(
|
||||
media_node_id = %media_node_id,
|
||||
cas_hash = %cas_hash,
|
||||
operations = edl_value["operations"].as_array().map(|a| a.len()).unwrap_or(0),
|
||||
bin = %bin,
|
||||
"Starter synops-audio"
|
||||
);
|
||||
|
||||
let title = original_title
|
||||
.map(|t| format!("{t} (prosessert)"))
|
||||
.unwrap_or_else(|| "Prosessert lyd".to_string());
|
||||
let result = crate::cli_dispatch::run_cli_tool(&bin, &mut cmd).await?;
|
||||
|
||||
// Insert processed media node
|
||||
sqlx::query(
|
||||
r#"
|
||||
INSERT INTO nodes (id, node_kind, title, visibility, metadata, created_by)
|
||||
VALUES ($1, 'media', $2, 'hidden', $3, $4)
|
||||
"#,
|
||||
)
|
||||
.bind(processed_node_id)
|
||||
.bind(&title)
|
||||
.bind(&metadata)
|
||||
.bind(requested_by)
|
||||
.execute(db)
|
||||
.await
|
||||
.map_err(|e| format!("Kunne ikke opprette prosessert node: {e}"))?;
|
||||
// --- STDB-synk for sanntidsvisning ---
|
||||
if let Some(processed_node_id) = result["processed_node_id"].as_str() {
|
||||
let result_hash = result["cas_hash"].as_str().unwrap_or("");
|
||||
let result_size = result["size_bytes"].as_u64().unwrap_or(0);
|
||||
let mime = match output_format {
|
||||
"mp3" => "audio/mpeg",
|
||||
"wav" => "audio/wav",
|
||||
"flac" => "audio/flac",
|
||||
"ogg" => "audio/ogg",
|
||||
_ => "audio/mpeg",
|
||||
};
|
||||
|
||||
// Opprett derived_from edge: processed → original
|
||||
let edge_id = Uuid::now_v7();
|
||||
sqlx::query(
|
||||
r#"
|
||||
INSERT INTO edges (id, source_id, target_id, edge_type, system, created_by)
|
||||
VALUES ($1, $2, $3, 'derived_from', true, $4)
|
||||
"#,
|
||||
)
|
||||
.bind(edge_id)
|
||||
.bind(processed_node_id)
|
||||
.bind(media_node_id)
|
||||
.bind(requested_by)
|
||||
.execute(db)
|
||||
.await
|
||||
.map_err(|e| format!("Kunne ikke opprette derived_from edge: {e}"))?;
|
||||
let metadata = serde_json::json!({
|
||||
"cas_hash": result_hash,
|
||||
"mime": mime,
|
||||
"size_bytes": result_size,
|
||||
"source_hash": cas_hash,
|
||||
});
|
||||
|
||||
// Synk til SpacetimeDB
|
||||
let metadata_str = serde_json::to_string(&metadata).unwrap_or_default();
|
||||
let _ = stdb
|
||||
.create_node(
|
||||
&processed_node_id.to_string(),
|
||||
"media",
|
||||
&title,
|
||||
"",
|
||||
"hidden",
|
||||
&metadata_str,
|
||||
&requested_by.to_string(),
|
||||
)
|
||||
.await;
|
||||
if let Err(e) = stdb.create_node(
|
||||
processed_node_id, "media", "Prosessert lyd", "", "hidden",
|
||||
&metadata.to_string(), &requested_by.to_string(),
|
||||
).await {
|
||||
tracing::warn!(error = %e, "STDB create_node (audio) feilet (PG er allerede skrevet)");
|
||||
}
|
||||
|
||||
let _ = stdb
|
||||
.create_edge(
|
||||
&edge_id.to_string(),
|
||||
&processed_node_id.to_string(),
|
||||
&media_node_id.to_string(),
|
||||
"derived_from",
|
||||
"{}",
|
||||
true,
|
||||
&requested_by.to_string(),
|
||||
)
|
||||
.await;
|
||||
// derived_from edge: processed → original
|
||||
let edge_id = Uuid::now_v7().to_string();
|
||||
if let Err(e) = stdb.create_edge(
|
||||
&edge_id, processed_node_id, &media_node_id.to_string(),
|
||||
"derived_from", "{}", true, &requested_by.to_string(),
|
||||
).await {
|
||||
tracing::warn!(error = %e, "STDB create_edge (derived_from) feilet (PG er allerede skrevet)");
|
||||
}
|
||||
}
|
||||
|
||||
tracing::info!(
|
||||
original = %media_node_id,
|
||||
processed = %processed_node_id,
|
||||
hash = %result_hash,
|
||||
"Audio process-jobb fullført"
|
||||
processed = result["processed_node_id"].as_str().unwrap_or("n/a"),
|
||||
hash = result["cas_hash"].as_str().unwrap_or("n/a"),
|
||||
"synops-audio fullført"
|
||||
);
|
||||
|
||||
Ok(serde_json::json!({
|
||||
"processed_node_id": processed_node_id.to_string(),
|
||||
"cas_hash": result_hash,
|
||||
"size_bytes": result_size,
|
||||
}))
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
// ─── Tester ──────────────────────────────────────────────────────
|
||||
|
|
|
|||
62
maskinrommet/src/cli_dispatch.rs
Normal file
62
maskinrommet/src/cli_dispatch.rs
Normal file
|
|
@ -0,0 +1,62 @@
|
|||
// Felles hjelpefunksjoner for å spawne CLI-verktøy fra jobbkø-handlere.
|
||||
//
|
||||
// Mønsteret: maskinrommet orkestrerer (payload-parsing, sikkerhetskontroller,
|
||||
// STDB-synk), CLI-verktøyet gjør jobben (API-kall, DB-skriving, prosessering).
|
||||
// Stdout → jobbresultat (JSON), stderr → feillogg, exitkode → status.
|
||||
//
|
||||
// Ref: docs/retninger/unix_filosofi.md
|
||||
|
||||
use std::process::Stdio;
|
||||
|
||||
/// Kjør et CLI-verktøy og returner stdout som JSON.
|
||||
///
|
||||
/// - `bin_name`: Verktøynavn for logging (f.eks. "synops-tts")
|
||||
/// - `cmd`: Ferdig konfigurert Command med args og env
|
||||
///
|
||||
/// Stderr logges. Non-zero exit code gir Err med stderr-innhold.
|
||||
pub async fn run_cli_tool(
|
||||
bin_name: &str,
|
||||
cmd: &mut tokio::process::Command,
|
||||
) -> Result<serde_json::Value, String> {
|
||||
cmd.stdout(Stdio::piped()).stderr(Stdio::piped());
|
||||
|
||||
let child = cmd
|
||||
.spawn()
|
||||
.map_err(|e| format!("Kunne ikke starte {bin_name}: {e}"))?;
|
||||
|
||||
let output = child
|
||||
.wait_with_output()
|
||||
.await
|
||||
.map_err(|e| format!("Feil ved kjøring av {bin_name}: {e}"))?;
|
||||
|
||||
let stderr = String::from_utf8_lossy(&output.stderr);
|
||||
if !stderr.is_empty() {
|
||||
tracing::info!(stderr = %stderr, "{bin_name} stderr");
|
||||
}
|
||||
|
||||
if !output.status.success() {
|
||||
let code = output.status.code().unwrap_or(-1);
|
||||
return Err(format!(
|
||||
"{bin_name} feilet (exit {code}): {stderr}"
|
||||
));
|
||||
}
|
||||
|
||||
let stdout = String::from_utf8_lossy(&output.stdout);
|
||||
serde_json::from_str(&stdout)
|
||||
.map_err(|e| format!("Kunne ikke parse {bin_name} output: {e}"))
|
||||
}
|
||||
|
||||
/// Videresend en miljøvariabel til en Command hvis den er satt.
|
||||
pub fn forward_env(cmd: &mut tokio::process::Command, key: &str) {
|
||||
if let Ok(v) = std::env::var(key) {
|
||||
cmd.env(key, v);
|
||||
}
|
||||
}
|
||||
|
||||
/// Sett DATABASE_URL på en Command. Feiler hvis variabelen mangler.
|
||||
pub fn set_database_url(cmd: &mut tokio::process::Command) -> Result<(), String> {
|
||||
let db_url = std::env::var("DATABASE_URL")
|
||||
.map_err(|_| "DATABASE_URL ikke satt".to_string())?;
|
||||
cmd.env("DATABASE_URL", &db_url);
|
||||
Ok(())
|
||||
}
|
||||
|
|
@ -19,8 +19,8 @@ use crate::ai_edges;
|
|||
use crate::ai_process;
|
||||
use crate::audio;
|
||||
use crate::cas::CasStore;
|
||||
use crate::cli_dispatch;
|
||||
use crate::maintenance::MaintenanceState;
|
||||
use crate::publishing;
|
||||
use crate::resources::{self, PriorityRules};
|
||||
use crate::stdb::StdbClient;
|
||||
use crate::summarize;
|
||||
|
|
@ -159,6 +159,12 @@ async fn fail_job(db: &PgPool, job: &JobRow, error_msg: &str) -> Result<(), sqlx
|
|||
}
|
||||
|
||||
/// Dispatcher — kjører riktig handler basert på job_type.
|
||||
///
|
||||
/// Alle handlere delegerer til CLI-verktøy (Command::new("synops-X"))
|
||||
/// i tråd med Unix-filosofien. Unntaket er ai_process som fortsatt
|
||||
/// kjører inline (mangler CLI-verktøy, planlagt i fremtidig oppgave).
|
||||
///
|
||||
/// Ref: docs/retninger/unix_filosofi.md
|
||||
async fn dispatch(
|
||||
job: &JobRow,
|
||||
db: &PgPool,
|
||||
|
|
@ -180,32 +186,38 @@ async fn dispatch(
|
|||
summarize::handle_summarize_communication(job, db, stdb).await
|
||||
}
|
||||
"tts_generate" => {
|
||||
tts::handle_tts_job(job, db, stdb, cas).await
|
||||
tts::handle_tts_job(job, db, stdb).await
|
||||
}
|
||||
"audio_process" => {
|
||||
audio::handle_audio_process_job(job, db, stdb, cas).await
|
||||
}
|
||||
"ai_process" => {
|
||||
// Fortsatt inline — ingen CLI-verktøy ennå.
|
||||
// TODO: Lag synops-ai-process og migrer hit.
|
||||
ai_process::handle_ai_process(job, db, stdb).await
|
||||
}
|
||||
"render_article" => {
|
||||
handle_render_article(job, db, cas).await
|
||||
handle_render_article(job, cas).await
|
||||
}
|
||||
"render_index" => {
|
||||
handle_render_index(job, db, cas).await
|
||||
handle_render_index(job, cas).await
|
||||
}
|
||||
other => Err(format!("Ukjent jobbtype: {other}")),
|
||||
}
|
||||
}
|
||||
|
||||
/// Handler for `render_article`-jobb.
|
||||
/// Synops-render binary path.
|
||||
fn render_bin() -> String {
|
||||
std::env::var("SYNOPS_RENDER_BIN")
|
||||
.unwrap_or_else(|_| "synops-render".to_string())
|
||||
}
|
||||
|
||||
/// Handler for `render_article`-jobb — delegerer til synops-render CLI.
|
||||
///
|
||||
/// Payload: `{ "node_id": "...", "collection_id": "..." }`
|
||||
/// Rendrer artikkelens metadata.document til HTML via Tera, lagrer i CAS,
|
||||
/// oppdaterer nodens metadata.rendered.
|
||||
/// CLI-verktøyet gjør Tera-rendering, CAS-lagring og metadata-oppdatering.
|
||||
async fn handle_render_article(
|
||||
job: &JobRow,
|
||||
db: &PgPool,
|
||||
cas: &CasStore,
|
||||
) -> Result<serde_json::Value, String> {
|
||||
let node_id: Uuid = job
|
||||
|
|
@ -222,17 +234,41 @@ async fn handle_render_article(
|
|||
.and_then(|s| s.parse().ok())
|
||||
.ok_or("Mangler collection_id i payload")?;
|
||||
|
||||
publishing::render_article_to_cas(db, cas, node_id, collection_id).await
|
||||
let bin = render_bin();
|
||||
let mut cmd = tokio::process::Command::new(&bin);
|
||||
|
||||
cmd.arg("--node-id").arg(node_id.to_string())
|
||||
.arg("--collection-id").arg(collection_id.to_string())
|
||||
.arg("--render-type").arg("article")
|
||||
.arg("--write");
|
||||
|
||||
cli_dispatch::set_database_url(&mut cmd)?;
|
||||
cmd.env("CAS_ROOT", cas.root().to_string_lossy().to_string());
|
||||
|
||||
tracing::info!(
|
||||
node_id = %node_id,
|
||||
collection_id = %collection_id,
|
||||
bin = %bin,
|
||||
"Starter synops-render (article)"
|
||||
);
|
||||
|
||||
let result = cli_dispatch::run_cli_tool(&bin, &mut cmd).await?;
|
||||
|
||||
tracing::info!(
|
||||
node_id = %node_id,
|
||||
html_hash = result["html_hash"].as_str().unwrap_or("n/a"),
|
||||
"synops-render (article) fullført"
|
||||
);
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
/// Handler for `render_index`-jobb.
|
||||
/// Handler for `render_index`-jobb — delegerer til synops-render CLI.
|
||||
///
|
||||
/// Payload: `{ "collection_id": "..." }`
|
||||
/// Rendrer forsiden til HTML via Tera, lagrer i CAS,
|
||||
/// oppdaterer samlingens metadata.rendered_index.
|
||||
/// CLI-verktøyet gjør Tera-rendering, CAS-lagring og metadata-oppdatering.
|
||||
async fn handle_render_index(
|
||||
job: &JobRow,
|
||||
db: &PgPool,
|
||||
cas: &CasStore,
|
||||
) -> Result<serde_json::Value, String> {
|
||||
let collection_id: Uuid = job
|
||||
|
|
@ -242,7 +278,31 @@ async fn handle_render_index(
|
|||
.and_then(|s| s.parse().ok())
|
||||
.ok_or("Mangler collection_id i payload")?;
|
||||
|
||||
publishing::render_index_to_cas(db, cas, collection_id).await
|
||||
let bin = render_bin();
|
||||
let mut cmd = tokio::process::Command::new(&bin);
|
||||
|
||||
cmd.arg("--node-id").arg(collection_id.to_string())
|
||||
.arg("--render-type").arg("index")
|
||||
.arg("--write");
|
||||
|
||||
cli_dispatch::set_database_url(&mut cmd)?;
|
||||
cmd.env("CAS_ROOT", cas.root().to_string_lossy().to_string());
|
||||
|
||||
tracing::info!(
|
||||
collection_id = %collection_id,
|
||||
bin = %bin,
|
||||
"Starter synops-render (index)"
|
||||
);
|
||||
|
||||
let result = cli_dispatch::run_cli_tool(&bin, &mut cmd).await?;
|
||||
|
||||
tracing::info!(
|
||||
collection_id = %collection_id,
|
||||
html_hash = result["html_hash"].as_str().unwrap_or("n/a"),
|
||||
"synops-render (index) fullført"
|
||||
);
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
// =============================================================================
|
||||
|
|
|
|||
|
|
@ -6,6 +6,7 @@ pub mod audio;
|
|||
pub mod bandwidth;
|
||||
mod auth;
|
||||
pub mod cas;
|
||||
pub mod cli_dispatch;
|
||||
mod custom_domain;
|
||||
mod intentions;
|
||||
pub mod jobs;
|
||||
|
|
|
|||
|
|
@ -1,57 +1,40 @@
|
|||
// TTS-pipeline — tekst til lyd via ElevenLabs.
|
||||
// TTS-dispatcher — delegerer til synops-tts CLI.
|
||||
//
|
||||
// Maskinrommet beholder: voice_id-oppslag (payload > node metadata > env),
|
||||
// og STDB-skriving (sanntidsvisning). Alt annet (ElevenLabs-kall, CAS-lagring,
|
||||
// PG-skriving) gjøres av synops-tts.
|
||||
//
|
||||
// Jobbtype: "tts_generate"
|
||||
// Payload: {
|
||||
// "text": "<tekst som skal leses opp>",
|
||||
// "voice_id": "<elevenlabs voice_id>", (valgfritt, bruker default)
|
||||
// "language": "no", (valgfritt)
|
||||
// "source_node_id": "<uuid>", (noden teksten tilhører)
|
||||
// "requested_by": "<uuid>"
|
||||
// }
|
||||
// Payload: { "text", "voice_id"?, "language"?, "source_node_id"?, "requested_by" }
|
||||
//
|
||||
// Flyten:
|
||||
// 1. Hent tekst og voice-preferanse fra payload
|
||||
// 2. Sjekk mottaker-preferanse i source-nodens metadata (voice_preference)
|
||||
// 3. Kall ElevenLabs API
|
||||
// 4. Lagre lyd i CAS
|
||||
// 5. Opprett media-node med has_media-edge til kilde
|
||||
// 6. Logg ressursforbruk
|
||||
//
|
||||
// Ref: docs/features/ressursforbruk.md, docs/proposals/ghost_host_tts.md
|
||||
// Ref: docs/retninger/unix_filosofi.md, docs/proposals/ghost_host_tts.md
|
||||
|
||||
use sqlx::PgPool;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::cas::CasStore;
|
||||
use crate::cli_dispatch;
|
||||
use crate::jobs::JobRow;
|
||||
use crate::resource_usage;
|
||||
use crate::stdb::StdbClient;
|
||||
|
||||
/// Maks tekst-lengde for TTS (ElevenLabs grense er 5000 tegn per kall).
|
||||
const MAX_TEXT_LENGTH: usize = 5000;
|
||||
/// Synops-tts binary path.
|
||||
fn tts_bin() -> String {
|
||||
std::env::var("SYNOPS_TTS_BIN")
|
||||
.unwrap_or_else(|_| "synops-tts".to_string())
|
||||
}
|
||||
|
||||
/// Håndterer tts_generate-jobb.
|
||||
///
|
||||
/// Spawner synops-tts med --write for å gjøre alt arbeidet:
|
||||
/// ElevenLabs-kall, CAS-lagring, PG-skriving, ressurslogging.
|
||||
/// Maskinrommet gjør etterpå STDB-synk for sanntidsvisning.
|
||||
pub async fn handle_tts_job(
|
||||
job: &JobRow,
|
||||
db: &PgPool,
|
||||
stdb: &StdbClient,
|
||||
cas: &CasStore,
|
||||
) -> Result<serde_json::Value, String> {
|
||||
let text = job.payload["text"]
|
||||
.as_str()
|
||||
.ok_or("Mangler 'text' i payload")?
|
||||
.to_string();
|
||||
|
||||
if text.is_empty() {
|
||||
return Err("Tom tekst — ingenting å generere".to_string());
|
||||
}
|
||||
if text.len() > MAX_TEXT_LENGTH {
|
||||
return Err(format!(
|
||||
"Tekst for lang: {} tegn (maks {})",
|
||||
text.len(),
|
||||
MAX_TEXT_LENGTH
|
||||
));
|
||||
}
|
||||
.ok_or("Mangler 'text' i payload")?;
|
||||
|
||||
let source_node_id: Option<Uuid> = job.payload["source_node_id"]
|
||||
.as_str()
|
||||
|
|
@ -62,168 +45,102 @@ pub async fn handle_tts_job(
|
|||
.and_then(|s| s.parse().ok())
|
||||
.ok_or("Mangler gyldig 'requested_by' i payload")?;
|
||||
|
||||
let language = job.payload["language"]
|
||||
.as_str()
|
||||
.unwrap_or("no");
|
||||
|
||||
// Bestem voice_id: payload > source-node metadata > env default
|
||||
let voice_id = resolve_voice_id(job, db, source_node_id).await?;
|
||||
|
||||
let language = job.payload["language"]
|
||||
.as_str()
|
||||
.unwrap_or("no")
|
||||
.to_string();
|
||||
// Bygg kommando
|
||||
let bin = tts_bin();
|
||||
let mut cmd = tokio::process::Command::new(&bin);
|
||||
|
||||
cmd.arg("--text").arg(text)
|
||||
.arg("--voice").arg(&voice_id)
|
||||
.arg("--language").arg(language)
|
||||
.arg("--requested-by").arg(requested_by.to_string())
|
||||
.arg("--write");
|
||||
|
||||
if let Some(source_id) = source_node_id {
|
||||
cmd.arg("--source-node-id").arg(source_id.to_string());
|
||||
}
|
||||
|
||||
// Sett miljøvariabler CLI-verktøyet trenger
|
||||
cli_dispatch::set_database_url(&mut cmd)?;
|
||||
cli_dispatch::forward_env(&mut cmd, "CAS_ROOT");
|
||||
cli_dispatch::forward_env(&mut cmd, "ELEVENLABS_API_KEY");
|
||||
cli_dispatch::forward_env(&mut cmd, "ELEVENLABS_MODEL");
|
||||
cli_dispatch::forward_env(&mut cmd, "ELEVENLABS_DEFAULT_VOICE");
|
||||
|
||||
tracing::info!(
|
||||
text_len = text.len(),
|
||||
voice_id = %voice_id,
|
||||
language = %language,
|
||||
"Starter TTS-generering"
|
||||
bin = %bin,
|
||||
"Starter synops-tts"
|
||||
);
|
||||
|
||||
// 1. Kall ElevenLabs API
|
||||
let audio_bytes = call_elevenlabs(&text, &voice_id).await?;
|
||||
let result = cli_dispatch::run_cli_tool(&bin, &mut cmd).await?;
|
||||
|
||||
tracing::info!(
|
||||
audio_size = audio_bytes.len(),
|
||||
"Mottok lyd fra ElevenLabs"
|
||||
);
|
||||
|
||||
// 2. Lagre i CAS
|
||||
let cas_result = cas
|
||||
.store(&audio_bytes)
|
||||
.await
|
||||
.map_err(|e| format!("CAS-lagring feilet: {e}"))?;
|
||||
|
||||
tracing::info!(
|
||||
cas_hash = %cas_result.hash,
|
||||
size = cas_result.size,
|
||||
dedup = cas_result.already_existed,
|
||||
"Lyd lagret i CAS"
|
||||
);
|
||||
|
||||
// 3. Opprett media-node for lydfilen
|
||||
let media_node_id = Uuid::now_v7();
|
||||
let title = format!("TTS: {}", truncate(&text, 60));
|
||||
let metadata = serde_json::json!({
|
||||
"cas_hash": cas_result.hash,
|
||||
"mime": "audio/mpeg",
|
||||
"size_bytes": cas_result.size,
|
||||
"tts": {
|
||||
"provider": "elevenlabs",
|
||||
"voice_id": voice_id,
|
||||
"language": language,
|
||||
"characters": text.len(),
|
||||
}
|
||||
});
|
||||
let metadata_str = metadata.to_string();
|
||||
|
||||
// STDB først (sanntid)
|
||||
stdb.create_node(
|
||||
&media_node_id.to_string(),
|
||||
"content",
|
||||
&title,
|
||||
"",
|
||||
"hidden",
|
||||
&metadata_str,
|
||||
&requested_by.to_string(),
|
||||
)
|
||||
.await
|
||||
.map_err(|e| format!("STDB create_node feilet: {e}"))?;
|
||||
|
||||
// PG (persistering)
|
||||
sqlx::query(
|
||||
r#"
|
||||
INSERT INTO nodes (id, node_kind, title, content, visibility, metadata, created_by)
|
||||
VALUES ($1, 'content', $2, '', 'hidden'::visibility, $3, $4)
|
||||
"#,
|
||||
)
|
||||
.bind(media_node_id)
|
||||
.bind(&title)
|
||||
.bind(&metadata)
|
||||
.bind(requested_by)
|
||||
.execute(db)
|
||||
.await
|
||||
.map_err(|e| format!("PG insert media-node feilet: {e}"))?;
|
||||
|
||||
// 4. Opprett has_media-edge fra kilde-node til TTS-noden (hvis kilde finnes)
|
||||
if let Some(source_id) = source_node_id {
|
||||
let edge_id = Uuid::now_v7();
|
||||
let edge_meta = serde_json::json!({
|
||||
"media_type": "tts_audio",
|
||||
"generated_at": chrono::Utc::now().to_rfc3339()
|
||||
// --- STDB-synk for sanntidsvisning ---
|
||||
// synops-tts skriver PG. Vi synker til STDB for umiddelbar visning.
|
||||
if let Some(media_node_id) = result["media_node_id"].as_str() {
|
||||
let cas_hash = result["cas_hash"].as_str().unwrap_or("");
|
||||
let characters = result["characters"].as_u64().unwrap_or(0);
|
||||
let title = format!("TTS: {}", truncate(text, 60));
|
||||
let metadata = serde_json::json!({
|
||||
"cas_hash": cas_hash,
|
||||
"mime": "audio/mpeg",
|
||||
"tts": { "voice_id": &voice_id, "characters": characters }
|
||||
});
|
||||
let empty_meta = edge_meta.to_string();
|
||||
|
||||
stdb.create_edge(
|
||||
&edge_id.to_string(),
|
||||
&source_id.to_string(),
|
||||
&media_node_id.to_string(),
|
||||
"has_media",
|
||||
&empty_meta,
|
||||
false,
|
||||
&requested_by.to_string(),
|
||||
)
|
||||
.await
|
||||
.map_err(|e| format!("STDB create_edge (has_media) feilet: {e}"))?;
|
||||
if let Err(e) = stdb.create_node(
|
||||
media_node_id, "content", &title, "", "hidden",
|
||||
&metadata.to_string(), &requested_by.to_string(),
|
||||
).await {
|
||||
tracing::warn!(error = %e, "STDB create_node (tts) feilet (PG er allerede skrevet)");
|
||||
}
|
||||
|
||||
sqlx::query(
|
||||
r#"
|
||||
INSERT INTO edges (id, source_id, target_id, edge_type, metadata, system, created_by)
|
||||
VALUES ($1, $2, $3, 'has_media', $4, false, $5)
|
||||
"#,
|
||||
)
|
||||
.bind(edge_id)
|
||||
.bind(source_id)
|
||||
.bind(media_node_id)
|
||||
.bind(&edge_meta)
|
||||
.bind(requested_by)
|
||||
.execute(db)
|
||||
.await
|
||||
.map_err(|e| format!("PG insert has_media-edge feilet: {e}"))?;
|
||||
// has_media-edge
|
||||
if let Some(source_id) = source_node_id {
|
||||
let edge_id = Uuid::now_v7().to_string();
|
||||
let edge_meta = serde_json::json!({
|
||||
"media_type": "tts_audio",
|
||||
"generated_at": chrono::Utc::now().to_rfc3339()
|
||||
});
|
||||
|
||||
if let Err(e) = stdb.create_edge(
|
||||
&edge_id, &source_id.to_string(), media_node_id, "has_media",
|
||||
&edge_meta.to_string(), false, &requested_by.to_string(),
|
||||
).await {
|
||||
tracing::warn!(error = %e, "STDB create_edge (has_media) feilet (PG er allerede skrevet)");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 5. Logg ressursforbruk
|
||||
let collection_id = resource_usage::find_collection_for_node(db, media_node_id).await;
|
||||
if let Err(e) = resource_usage::log(
|
||||
db,
|
||||
media_node_id,
|
||||
Some(requested_by),
|
||||
collection_id,
|
||||
"tts",
|
||||
serde_json::json!({
|
||||
"provider": "elevenlabs",
|
||||
"characters": text.len(),
|
||||
"voice_id": voice_id
|
||||
}),
|
||||
)
|
||||
.await
|
||||
{
|
||||
tracing::warn!(error = %e, "Kunne ikke logge TTS-ressursforbruk");
|
||||
}
|
||||
tracing::info!(
|
||||
cas_hash = result["cas_hash"].as_str().unwrap_or("n/a"),
|
||||
media_node_id = result["media_node_id"].as_str().unwrap_or("n/a"),
|
||||
"synops-tts fullført"
|
||||
);
|
||||
|
||||
Ok(serde_json::json!({
|
||||
"status": "completed",
|
||||
"media_node_id": media_node_id.to_string(),
|
||||
"cas_hash": cas_result.hash,
|
||||
"characters": text.len(),
|
||||
"audio_size_bytes": cas_result.size,
|
||||
"voice_id": voice_id,
|
||||
}))
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
/// Bestem voice_id: payload-verdi > source-node metadata.voice_preference > env default.
|
||||
/// "Mottaker-preferanse i metadata" betyr at en node kan ha
|
||||
/// metadata.voice_preference.voice_id som overstyrer default.
|
||||
async fn resolve_voice_id(
|
||||
job: &JobRow,
|
||||
db: &PgPool,
|
||||
source_node_id: Option<Uuid>,
|
||||
) -> Result<String, String> {
|
||||
// 1. Eksplisitt i payload — alltid høyest prioritet
|
||||
// 1. Eksplisitt i payload
|
||||
if let Some(vid) = job.payload["voice_id"].as_str() {
|
||||
if !vid.is_empty() {
|
||||
return Ok(vid.to_string());
|
||||
}
|
||||
}
|
||||
|
||||
// 2. Sjekk source-nodens metadata.voice_preference.voice_id
|
||||
// 2. Source-nodens metadata.voice_preference.voice_id
|
||||
if let Some(node_id) = source_node_id {
|
||||
let meta: Option<serde_json::Value> = sqlx::query_scalar(
|
||||
"SELECT metadata FROM nodes WHERE id = $1",
|
||||
|
|
@ -241,11 +158,7 @@ async fn resolve_voice_id(
|
|||
.and_then(|v| v.as_str())
|
||||
{
|
||||
if !vid.is_empty() {
|
||||
tracing::info!(
|
||||
node_id = %node_id,
|
||||
voice_id = %vid,
|
||||
"Bruker mottaker-preferanse fra node-metadata"
|
||||
);
|
||||
tracing::info!(node_id = %node_id, voice_id = %vid, "Bruker mottaker-preferanse");
|
||||
return Ok(vid.to_string());
|
||||
}
|
||||
}
|
||||
|
|
@ -253,57 +166,8 @@ async fn resolve_voice_id(
|
|||
}
|
||||
|
||||
// 3. Miljøvariabel-default
|
||||
let default = std::env::var("ELEVENLABS_DEFAULT_VOICE")
|
||||
.unwrap_or_else(|_| "21m00Tcm4TlvDq8ikWAM".to_string()); // Rachel (ElevenLabs premade)
|
||||
|
||||
Ok(default)
|
||||
}
|
||||
|
||||
/// Kall ElevenLabs text-to-speech API.
|
||||
/// Returnerer rå MP3-bytes.
|
||||
async fn call_elevenlabs(text: &str, voice_id: &str) -> Result<Vec<u8>, String> {
|
||||
let api_key = std::env::var("ELEVENLABS_API_KEY")
|
||||
.map_err(|_| "ELEVENLABS_API_KEY er ikke satt".to_string())?;
|
||||
|
||||
let model_id = std::env::var("ELEVENLABS_MODEL")
|
||||
.unwrap_or_else(|_| "eleven_multilingual_v2".to_string());
|
||||
|
||||
let url = format!(
|
||||
"https://api.elevenlabs.io/v1/text-to-speech/{voice_id}"
|
||||
);
|
||||
|
||||
let payload = serde_json::json!({
|
||||
"text": text,
|
||||
"model_id": model_id,
|
||||
"voice_settings": {
|
||||
"stability": 0.5,
|
||||
"similarity_boost": 0.75,
|
||||
"style": 0.0,
|
||||
"use_speaker_boost": true
|
||||
}
|
||||
});
|
||||
|
||||
let client = reqwest::Client::new();
|
||||
let resp = client
|
||||
.post(&url)
|
||||
.header("xi-api-key", &api_key)
|
||||
.header("Accept", "audio/mpeg")
|
||||
.json(&payload)
|
||||
.timeout(std::time::Duration::from_secs(30))
|
||||
.send()
|
||||
.await
|
||||
.map_err(|e| format!("ElevenLabs HTTP-feil: {e}"))?;
|
||||
|
||||
if !resp.status().is_success() {
|
||||
let status = resp.status();
|
||||
let body = resp.text().await.unwrap_or_default();
|
||||
return Err(format!("ElevenLabs returnerte {status}: {body}"));
|
||||
}
|
||||
|
||||
resp.bytes()
|
||||
.await
|
||||
.map(|b| b.to_vec())
|
||||
.map_err(|e| format!("Kunne ikke lese ElevenLabs-respons: {e}"))
|
||||
Ok(std::env::var("ELEVENLABS_DEFAULT_VOICE")
|
||||
.unwrap_or_else(|_| "21m00Tcm4TlvDq8ikWAM".to_string()))
|
||||
}
|
||||
|
||||
/// Forkorter en streng til maks `max_len` tegn med "..." suffix.
|
||||
|
|
|
|||
3
tasks.md
3
tasks.md
|
|
@ -261,8 +261,7 @@ kaller dem direkte. Samme verktøy, to brukere.
|
|||
|
||||
### Infrastruktur
|
||||
|
||||
- [~] 21.15 Jobbkø-dispatcher: endre maskinrommets jobbkø-handlere til å spawne CLI-verktøy (`Command::new("synops-X")`) i stedet for inline-kode. Stdout → jobbresultat, stderr → feillogg, exitkode → status.
|
||||
> Påbegynt: 2026-03-18T10:25
|
||||
- [x] 21.15 Jobbkø-dispatcher: endre maskinrommets jobbkø-handlere til å spawne CLI-verktøy (`Command::new("synops-X")`) i stedet for inline-kode. Stdout → jobbresultat, stderr → feillogg, exitkode → status.
|
||||
- [ ] 21.16 Felles lib: `synops-common` crate med PG-tilkobling, CAS-helpers, og node/edge-typer. Deles mellom alle CLI-verktøy for å unngå duplisering.
|
||||
|
||||
## Fase 12: Herding
|
||||
|
|
|
|||
|
|
@ -112,8 +112,17 @@ async fn run(cli: Cli) -> Result<(), String> {
|
|||
|
||||
tracing::info!(cas_hash = %cas_hash, "Lyd lagret i CAS");
|
||||
|
||||
// 3. Bygg resultat-JSON
|
||||
let result = serde_json::json!({
|
||||
// 3. Skriv til database hvis --write, og hent media_node_id
|
||||
let media_node_id = if cli.write {
|
||||
let requested_by = cli.requested_by.unwrap(); // Allerede validert
|
||||
Some(write_to_db(&cas_hash, &cli.text, &voice_id, &cli.language, audio_bytes.len(),
|
||||
cli.source_node_id, requested_by).await?)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
// 4. Bygg resultat-JSON
|
||||
let mut result = serde_json::json!({
|
||||
"cas_hash": cas_hash,
|
||||
"size_bytes": audio_bytes.len(),
|
||||
"voice_id": voice_id,
|
||||
|
|
@ -122,11 +131,8 @@ async fn run(cli: Cli) -> Result<(), String> {
|
|||
"mime": "audio/mpeg",
|
||||
});
|
||||
|
||||
// 4. Skriv til database hvis --write
|
||||
if cli.write {
|
||||
let requested_by = cli.requested_by.unwrap(); // Allerede validert
|
||||
write_to_db(&cas_hash, &cli.text, &voice_id, &cli.language, audio_bytes.len(),
|
||||
cli.source_node_id, requested_by).await?;
|
||||
if let Some(id) = media_node_id {
|
||||
result["media_node_id"] = serde_json::Value::String(id.to_string());
|
||||
}
|
||||
|
||||
// 5. Output JSON til stdout
|
||||
|
|
@ -223,6 +229,7 @@ async fn store_in_cas(cas_root: &str, data: &[u8]) -> Result<String, String> {
|
|||
}
|
||||
|
||||
/// Opprett media-node og has_media-edge i PostgreSQL.
|
||||
/// Returnerer media_node_id for STDB-synk.
|
||||
async fn write_to_db(
|
||||
cas_hash: &str,
|
||||
text: &str,
|
||||
|
|
@ -231,7 +238,7 @@ async fn write_to_db(
|
|||
size_bytes: usize,
|
||||
source_node_id: Option<Uuid>,
|
||||
requested_by: Uuid,
|
||||
) -> Result<(), String> {
|
||||
) -> Result<Uuid, String> {
|
||||
let db_url = std::env::var("DATABASE_URL")
|
||||
.map_err(|_| "DATABASE_URL må settes med --write".to_string())?;
|
||||
|
||||
|
|
@ -325,7 +332,7 @@ async fn write_to_db(
|
|||
tracing::warn!(error = %e, "Kunne ikke logge TTS-ressursforbruk");
|
||||
}
|
||||
|
||||
Ok(())
|
||||
Ok(media_node_id)
|
||||
}
|
||||
|
||||
/// Forkorter en streng til maks `max_len` tegn med "..." suffix.
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue