Implementer synops-transcribe CLI-verktøy (oppgave 21.1)

Bryter ut Whisper-transkribering fra maskinrommet til selvstendig
CLI-verktøy i tools/synops-transcribe/, i tråd med unix-filosofien.

Verktøyet:
- Leser lydfil fra CAS, sender til faster-whisper API (SRT-format)
- Parser SRT til segmenter, skriver JSON til stdout
- Med --write: skriver segmenter til PG, oppdaterer node metadata,
  logger ressursforbruk
- Støtter --cas-hash, --model, --initial-prompt, --language, --mime,
  --node-id, --requested-by

Maskinrommet sin transcribe.rs er nå en tynn dispatcher som spawner
synops-transcribe som subprosess med riktige env-variabler.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
vegard 2026-03-18 09:01:06 +00:00
parent cdd82e135d
commit bd12bed77e
7 changed files with 3504 additions and 402 deletions

View file

@ -168,7 +168,7 @@ async fn dispatch(
) -> Result<serde_json::Value, String> { ) -> Result<serde_json::Value, String> {
match job.job_type.as_str() { match job.job_type.as_str() {
"whisper_transcribe" => { "whisper_transcribe" => {
transcribe::handle_whisper_job(job, db, stdb, cas, whisper_url).await transcribe::handle_whisper_job(job, cas, whisper_url).await
} }
"agent_respond" => { "agent_respond" => {
agent::handle_agent_respond(job, db, stdb).await agent::handle_agent_respond(job, db, stdb).await

View file

@ -1,40 +1,34 @@
// Transkripsjons-pipeline — faster-whisper integrasjon. // Transkripsjons-dispatcher — delegerer til synops-transcribe CLI.
// //
// Henter lydfil fra CAS, sender til faster-whisper HTTP API (SRT-format), // Maskinrommet orkestrerer, CLI-verktøyet gjør jobben.
// parser SRT og skriver segmenter til transcription_segments-tabellen. // Ref: docs/retninger/unix_filosofi.md
// Universell tjeneste for all lyd: podcast, møter, voice memos.
//
// Ref: docs/concepts/podcastfabrikken.md
use sqlx::PgPool;
use uuid::Uuid;
use crate::cas::CasStore; use crate::cas::CasStore;
use crate::jobs::JobRow; use crate::jobs::JobRow;
use crate::resource_usage; use std::process::Stdio;
use crate::stdb::StdbClient; use uuid::Uuid;
/// Et parset SRT-segment. /// Synops-transcribe binary path.
#[derive(Debug)] /// Søker i PATH, men kan overrides med SYNOPS_TRANSCRIBE_BIN.
struct SrtSegment { fn transcribe_bin() -> String {
seq: i32, std::env::var("SYNOPS_TRANSCRIBE_BIN")
start_ms: i32, .unwrap_or_else(|_| "synops-transcribe".to_string())
end_ms: i32,
content: String,
} }
/// Handler for whisper_transcribe-jobber. /// Handler for whisper_transcribe-jobber.
/// ///
/// Spawner synops-transcribe med --write for å gjøre alt arbeidet:
/// Whisper-kall, SRT-parsing, DB-skriving, ressurslogging.
///
/// Payload forventer: /// Payload forventer:
/// - media_node_id: UUID — noden som skal oppdateres /// - media_node_id: UUID — noden som skal oppdateres
/// - cas_hash: String — CAS-nøkkel til lydfilen /// - cas_hash: String — CAS-nøkkel til lydfilen
/// - mime: String — MIME-type (brukes for filnavn-hint) /// - mime: String — MIME-type (brukes for filnavn-hint)
/// - language: String (valgfritt, default "no") /// - language: String (valgfritt, default "no")
/// - initial_prompt: String (valgfritt — navneliste for bedre egennavn) /// - initial_prompt: String (valgfritt — navneliste for bedre egennavn)
/// - requested_by: UUID (valgfritt — brukeren som utløste jobben)
pub async fn handle_whisper_job( pub async fn handle_whisper_job(
job: &JobRow, job: &JobRow,
db: &PgPool,
stdb: &StdbClient,
cas: &CasStore, cas: &CasStore,
whisper_url: &str, whisper_url: &str,
) -> Result<serde_json::Value, String> { ) -> Result<serde_json::Value, String> {
@ -56,406 +50,83 @@ pub async fn handle_whisper_job(
.as_str() .as_str()
.unwrap_or("no"); .unwrap_or("no");
let model = std::env::var("WHISPER_MODEL")
.unwrap_or_else(|_| "medium".to_string());
// Hent initial_prompt: payload > miljøvariabel > ingen // Hent initial_prompt: payload > miljøvariabel > ingen
let initial_prompt = match job.payload["initial_prompt"].as_str() { let initial_prompt = match job.payload["initial_prompt"].as_str() {
Some(p) => Some(p.to_string()), Some(p) => Some(p.to_string()),
None => std::env::var("WHISPER_INITIAL_PROMPT").ok(), None => std::env::var("WHISPER_INITIAL_PROMPT").ok(),
}; };
// Modell: sentral serverinnstilling // Bygg kommando
let model = std::env::var("WHISPER_MODEL") let bin = transcribe_bin();
.unwrap_or_else(|_| "medium".to_string()); let mut cmd = tokio::process::Command::new(&bin);
// 1. Les lydfil fra CAS cmd.arg("--cas-hash").arg(cas_hash)
let file_path = cas.path_for(cas_hash); .arg("--model").arg(&model)
let file_data = tokio::fs::read(&file_path) .arg("--language").arg(language)
.await .arg("--mime").arg(mime)
.map_err(|e| format!("Kunne ikke lese CAS-fil {cas_hash}: {e}"))?; .arg("--node-id").arg(media_node_id.to_string())
.arg("--write");
if let Some(ref prompt) = initial_prompt {
cmd.arg("--initial-prompt").arg(prompt);
}
if let Some(requested_by) = job.payload["requested_by"].as_str() {
cmd.arg("--requested-by").arg(requested_by);
}
// Sett miljøvariabler CLI-verktøyet trenger
let db_url = std::env::var("DATABASE_URL")
.map_err(|_| "DATABASE_URL ikke satt".to_string())?;
let cas_root = cas.root().to_string_lossy().to_string();
cmd.env("DATABASE_URL", &db_url)
.env("CAS_ROOT", &cas_root)
.env("WHISPER_URL", whisper_url);
cmd.stdout(Stdio::piped())
.stderr(Stdio::piped());
tracing::info!( tracing::info!(
media_node_id = %media_node_id, media_node_id = %media_node_id,
cas_hash = %cas_hash, cas_hash = %cas_hash,
size = file_data.len(),
model = %model, model = %model,
"Sender lydfil til Whisper" bin = %bin,
"Starter synops-transcribe"
); );
// 2. Send til faster-whisper API (SRT-format) // Spawn og vent
let file_ext = mime_to_extension(mime); let child = cmd.spawn().map_err(|e| format!("Kunne ikke starte {bin}: {e}"))?;
let file_name = format!("audio.{file_ext}"); let output = child
.wait_with_output()
.await
.map_err(|e| format!("Feil ved kjøring av {bin}: {e}"))?;
let file_part = reqwest::multipart::Part::bytes(file_data) let stderr = String::from_utf8_lossy(&output.stderr);
.file_name(file_name) if !stderr.is_empty() {
.mime_str(mime) tracing::info!(stderr = %stderr, "synops-transcribe stderr");
.map_err(|e| format!("Kunne ikke bygge multipart: {e}"))?;
let mut form = reqwest::multipart::Form::new()
.part("file", file_part)
.text("model", model.clone())
.text("language", language.to_string())
.text("response_format", "srt");
if let Some(ref prompt) = initial_prompt {
form = form.text("initial_prompt", prompt.clone());
} }
let client = reqwest::Client::new(); if !output.status.success() {
let url = format!("{whisper_url}/v1/audio/transcriptions"); let code = output.status.code().unwrap_or(-1);
return Err(format!(
let response = client "synops-transcribe feilet (exit {code}): {stderr}"
.post(&url) ));
.multipart(form)
.timeout(std::time::Duration::from_secs(600)) // 10 min timeout for lange filer
.send()
.await
.map_err(|e| format!("HTTP-feil mot Whisper: {e}"))?;
if !response.status().is_success() {
let status = response.status();
let body = response.text().await.unwrap_or_default();
return Err(format!("Whisper returnerte {status}: {body}"));
} }
let srt_text = response // Parse stdout som JSON — det er resultatet
.text() let stdout = String::from_utf8_lossy(&output.stdout);
.await let result: serde_json::Value = serde_json::from_str(&stdout)
.map_err(|e| format!("Kunne ikke lese Whisper-respons: {e}"))?; .map_err(|e| format!("Kunne ikke parse synops-transcribe output: {e}"))?;
// 3. Parse SRT til segmenter
let segments = parse_srt(&srt_text)?;
tracing::info!( tracing::info!(
media_node_id = %media_node_id, media_node_id = %media_node_id,
segments = segments.len(), segments = result["segment_count"].as_u64().unwrap_or(0),
"SRT parset" "synops-transcribe fullført"
); );
if segments.is_empty() { Ok(result)
return Err("Whisper returnerte tom SRT — ingen segmenter".to_string());
}
// 4. Skriv segmenter til transcription_segments-tabellen
let transcribed_at = chrono::Utc::now();
insert_segments(db, media_node_id, transcribed_at, &segments).await?;
// 5. Bygg sammenhengende tekst og oppdater node
let transcript_text: String = segments
.iter()
.map(|s| s.content.trim())
.collect::<Vec<_>>()
.join(" ");
let duration_ms = segments.last().map(|s| s.end_ms).unwrap_or(0);
update_node_with_transcript(
db,
stdb,
media_node_id,
&transcript_text,
transcribed_at,
segments.len(),
duration_ms,
)
.await?;
// Logg ressursforbruk (Whisper)
let triggered_by: Option<Uuid> = job.payload["requested_by"]
.as_str()
.and_then(|s| s.parse().ok());
let collection_id = resource_usage::find_collection_for_node(db, media_node_id).await;
let duration_seconds = (duration_ms as f64) / 1000.0;
if let Err(e) = resource_usage::log(
db,
media_node_id,
triggered_by,
collection_id,
"whisper",
serde_json::json!({
"model": model,
"duration_seconds": duration_seconds,
"language": language,
"mode": "batch"
}),
)
.await
{
tracing::warn!(error = %e, "Kunne ikke logge Whisper-ressursforbruk");
}
Ok(serde_json::json!({
"segments": segments.len(),
"transcript_length": transcript_text.len(),
"duration_ms": duration_ms,
"model": model,
"transcribed_at": transcribed_at.to_rfc3339(),
}))
}
/// Parser SRT-tekst til en liste med segmenter.
///
/// SRT-format:
/// ```text
/// 1
/// 00:00:00,000 --> 00:00:05,230
/// Hei og velkommen til Sidelinja.
///
/// 2
/// 00:00:05,230 --> 00:00:10,500
/// I dag snakker vi om...
/// ```
fn parse_srt(srt: &str) -> Result<Vec<SrtSegment>, String> {
let mut segments = Vec::new();
let mut lines = srt.lines().peekable();
while lines.peek().is_some() {
// Hopp over tomme linjer
while lines.peek().map_or(false, |l| l.trim().is_empty()) {
lines.next();
}
// Sekvensnummer
let seq_line = match lines.next() {
Some(l) if !l.trim().is_empty() => l.trim().to_string(),
_ => break,
};
let seq: i32 = seq_line
.parse()
.map_err(|_| format!("Ugyldig SRT-sekvensnummer: '{seq_line}'"))?;
// Tidslinje: 00:00:00,000 --> 00:00:05,230
let time_line = lines
.next()
.ok_or_else(|| format!("Mangler tidslinje etter sekvens {seq}"))?;
let (start_ms, end_ms) = parse_srt_time_line(time_line)
.map_err(|e| format!("Ugyldig tidslinje for sekvens {seq}: {e}"))?;
// Tekstlinjer (frem til tom linje eller slutt)
let mut text_parts = Vec::new();
while lines.peek().map_or(false, |l| !l.trim().is_empty()) {
text_parts.push(lines.next().unwrap().to_string());
}
let content = text_parts.join("\n");
if !content.is_empty() {
segments.push(SrtSegment {
seq,
start_ms,
end_ms,
content,
});
}
}
Ok(segments)
}
/// Parser en SRT-tidslinje: "00:01:23,456 --> 00:01:30,789"
/// Returnerer (start_ms, end_ms).
fn parse_srt_time_line(line: &str) -> Result<(i32, i32), String> {
let parts: Vec<&str> = line.split("-->").collect();
if parts.len() != 2 {
return Err(format!("Forventet 'start --> end', fikk: '{line}'"));
}
let start = parse_srt_timestamp(parts[0].trim())?;
let end = parse_srt_timestamp(parts[1].trim())?;
Ok((start, end))
}
/// Parser et SRT-tidsstempel: "00:01:23,456" → millisekunder.
fn parse_srt_timestamp(ts: &str) -> Result<i32, String> {
// Format: HH:MM:SS,mmm
let ts = ts.replace(',', ".");
let parts: Vec<&str> = ts.split(':').collect();
if parts.len() != 3 {
return Err(format!("Ugyldig tidsstempel: '{ts}'"));
}
let hours: f64 = parts[0].parse().map_err(|_| format!("Ugyldig timer: '{}'", parts[0]))?;
let minutes: f64 = parts[1].parse().map_err(|_| format!("Ugyldig minutter: '{}'", parts[1]))?;
let seconds: f64 = parts[2].parse().map_err(|_| format!("Ugyldig sekunder: '{}'", parts[2]))?;
Ok(((hours * 3_600.0 + minutes * 60.0 + seconds) * 1000.0) as i32)
}
/// Setter inn segmenter i transcription_segments-tabellen.
async fn insert_segments(
db: &PgPool,
node_id: Uuid,
transcribed_at: chrono::DateTime<chrono::Utc>,
segments: &[SrtSegment],
) -> Result<(), String> {
let mut tx = db.begin().await.map_err(|e| format!("Transaksjon feilet: {e}"))?;
for seg in segments {
sqlx::query(
r#"
INSERT INTO transcription_segments (node_id, transcribed_at, seq, start_ms, end_ms, content)
VALUES ($1, $2, $3, $4, $5, $6)
"#,
)
.bind(node_id)
.bind(transcribed_at)
.bind(seg.seq)
.bind(seg.start_ms)
.bind(seg.end_ms)
.bind(&seg.content)
.execute(&mut *tx)
.await
.map_err(|e| format!("Kunne ikke sette inn segment {}: {e}", seg.seq))?;
}
tx.commit().await.map_err(|e| format!("Commit feilet: {e}"))?;
tracing::info!(
node_id = %node_id,
segments = segments.len(),
transcribed_at = %transcribed_at,
"Segmenter skrevet til transcription_segments"
);
Ok(())
}
/// Oppdaterer nodens content-felt med sammenhengende tekst og
/// lagrer transkripsjonsmetadata.
async fn update_node_with_transcript(
db: &PgPool,
stdb: &StdbClient,
node_id: Uuid,
transcript: &str,
transcribed_at: chrono::DateTime<chrono::Utc>,
segment_count: usize,
duration_ms: i32,
) -> Result<(), String> {
// Hent eksisterende node fra PG for å merge metadata
let existing = sqlx::query_as::<_, NodeMetadataRow>(
"SELECT metadata, node_kind, title, visibility::text as visibility FROM nodes WHERE id = $1",
)
.bind(node_id)
.fetch_optional(db)
.await
.map_err(|e| format!("PG-feil ved henting av node: {e}"))?
.ok_or_else(|| format!("Node {node_id} finnes ikke"))?;
// Merge transcription-data inn i eksisterende metadata
let mut metadata = existing.metadata.clone();
metadata["transcription"] = serde_json::json!({
"duration_ms": duration_ms,
"segment_count": segment_count,
"transcribed_at": transcribed_at.to_rfc3339(),
});
let metadata_str = metadata.to_string();
let node_id_str = node_id.to_string();
let title = existing.title.clone().unwrap_or_default();
// Oppdater STDB (instant feedback) — best-effort, PG er autoritativ
if let Err(e) = stdb.update_node(
&node_id_str,
&existing.node_kind,
&title,
transcript,
&existing.visibility,
&metadata_str,
)
.await
{
tracing::warn!(
node_id = %node_id,
error = %e,
"Kunne ikke oppdatere STDB med transkripsjon (fortsetter med PG)"
);
}
// Oppdater PG (persistent)
sqlx::query(
r#"
UPDATE nodes
SET content = $2, metadata = $3
WHERE id = $1
"#,
)
.bind(node_id)
.bind(transcript)
.bind(&metadata)
.execute(db)
.await
.map_err(|e| format!("PG update feilet: {e}"))?;
tracing::info!(
node_id = %node_id,
transcript_len = transcript.len(),
"Node oppdatert med transkripsjon"
);
Ok(())
}
#[derive(sqlx::FromRow)]
struct NodeMetadataRow {
metadata: serde_json::Value,
node_kind: String,
title: Option<String>,
visibility: String,
}
/// Konverterer MIME-type til filendelse for Whisper-hinting.
fn mime_to_extension(mime: &str) -> &str {
match mime {
"audio/mpeg" | "audio/mp3" => "mp3",
"audio/wav" | "audio/x-wav" => "wav",
"audio/ogg" => "ogg",
"audio/flac" | "audio/x-flac" => "flac",
"audio/mp4" | "audio/m4a" | "audio/x-m4a" => "m4a",
"audio/webm" => "webm",
_ => "wav",
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_srt_timestamp() {
assert_eq!(parse_srt_timestamp("00:00:00,000").unwrap(), 0);
assert_eq!(parse_srt_timestamp("00:01:23,456").unwrap(), 83456);
assert_eq!(parse_srt_timestamp("01:00:00,000").unwrap(), 3_600_000);
assert_eq!(parse_srt_timestamp("00:00:05,230").unwrap(), 5230);
}
#[test]
fn test_parse_srt() {
let srt = "\
1
00:00:00,000 --> 00:00:05,230
Hei og velkommen til Sidelinja.
2
00:00:05,230 --> 00:00:10,500
I dag snakker vi om fotball.
";
let segments = parse_srt(srt).unwrap();
assert_eq!(segments.len(), 2);
assert_eq!(segments[0].seq, 1);
assert_eq!(segments[0].start_ms, 0);
assert_eq!(segments[0].end_ms, 5230);
assert_eq!(segments[0].content, "Hei og velkommen til Sidelinja.");
assert_eq!(segments[1].seq, 2);
assert_eq!(segments[1].start_ms, 5230);
assert_eq!(segments[1].end_ms, 10500);
}
#[test]
fn test_parse_srt_time_line() {
let (start, end) = parse_srt_time_line("00:01:23,456 --> 00:01:30,789").unwrap();
assert_eq!(start, 83456);
assert_eq!(end, 90789);
}
#[test]
fn test_parse_srt_empty() {
let segments = parse_srt("").unwrap();
assert!(segments.is_empty());
}
} }

View file

@ -241,8 +241,7 @@ kaller dem direkte. Samme verktøy, to brukere.
### Prosessering (erstatter jobbkø-handlere) ### Prosessering (erstatter jobbkø-handlere)
- [~] 21.1 `synops-transcribe`: Whisper-transkribering. Input: `--cas-hash <hash> --model <model> [--initial-prompt <tekst>]`. Output: JSON med segmenter. Skriver segmenter til PG, oppdaterer node metadata. Erstatter `transcribe.rs`. - [x] 21.1 `synops-transcribe`: Whisper-transkribering. Input: `--cas-hash <hash> --model <model> [--initial-prompt <tekst>]`. Output: JSON med segmenter. Skriver segmenter til PG, oppdaterer node metadata. Erstatter `transcribe.rs`.
> Påbegynt: 2026-03-18T08:53
- [ ] 21.2 `synops-audio`: FFmpeg-prosessering. Input: `--cas-hash <hash> --edl <json>`. Output: ny CAS-hash. Erstatter `audio.rs`. Inkluder parametervalidering (fase 17.217.3). - [ ] 21.2 `synops-audio`: FFmpeg-prosessering. Input: `--cas-hash <hash> --edl <json>`. Output: ny CAS-hash. Erstatter `audio.rs`. Inkluder parametervalidering (fase 17.217.3).
- [ ] 21.3 `synops-render`: Tera HTML-rendering. Input: `--node-id <uuid> --theme <tema>`. Output: CAS-hash for rendret HTML. Erstatter `publishing.rs`. - [ ] 21.3 `synops-render`: Tera HTML-rendering. Input: `--node-id <uuid> --theme <tema>`. Output: CAS-hash for rendret HTML. Erstatter `publishing.rs`.
- [ ] 21.4 `synops-rss`: RSS/Atom-generering. Input: `--collection-id <uuid>`. Output: XML til stdout. Erstatter `rss.rs`. - [ ] 21.4 `synops-rss`: RSS/Atom-generering. Input: `--collection-id <uuid>`. Output: XML til stdout. Erstatter `rss.rs`.

View file

@ -7,7 +7,7 @@ eller maskinrommet-API. Ligger i PATH via symlink eller direkte kall.
| Verktøy | Beskrivelse | Status | | Verktøy | Beskrivelse | Status |
|---------|-------------|--------| |---------|-------------|--------|
| (kommer) | | | | `synops-transcribe` | Whisper-transkribering av lydfil fra CAS | Ferdig |
## Konvensjoner ## Konvensjoner
- Navnekonvensjon: `synops-<verb>` (f.eks. `synops-context`) - Navnekonvensjon: `synops-<verb>` (f.eks. `synops-context`)

2936
tools/synops-transcribe/Cargo.lock generated Normal file

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,20 @@
[package]
name = "synops-transcribe"
version = "0.1.0"
edition = "2024"
[[bin]]
name = "synops-transcribe"
path = "src/main.rs"
[dependencies]
clap = { version = "4", features = ["derive"] }
tokio = { version = "1", features = ["full"] }
sqlx = { version = "0.8", features = ["runtime-tokio", "tls-rustls", "postgres", "uuid", "chrono", "json"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
uuid = { version = "1", features = ["v7", "serde"] }
chrono = { version = "0.4", features = ["serde"] }
reqwest = { version = "0.12", default-features = false, features = ["rustls-tls", "json", "multipart"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }

View file

@ -0,0 +1,476 @@
// synops-transcribe — Whisper-transkribering via faster-whisper API.
//
// Input: CAS-hash til lydfil, modellnavn, valgfri initial prompt.
// Output: JSON med segmenter til stdout.
// Med --write: skriver segmenter til PG og oppdaterer node metadata.
//
// Miljøvariabler:
// DATABASE_URL — PostgreSQL-tilkobling (påkrevd med --write)
// CAS_ROOT — Rot for content-addressable store (default: /srv/synops/media/cas)
// WHISPER_URL — faster-whisper API URL (default: http://localhost:8000)
//
// Ref: docs/retninger/unix_filosofi.md, docs/concepts/podcastfabrikken.md
use chrono::Utc;
use clap::Parser;
use serde::Serialize;
use std::path::PathBuf;
use std::process;
use uuid::Uuid;
/// Transkriber lydfil fra CAS via faster-whisper.
#[derive(Parser)]
#[command(name = "synops-transcribe", about = "Whisper-transkribering av lydfil fra CAS")]
struct Cli {
/// SHA-256 CAS-hash til lydfilen
#[arg(long)]
cas_hash: String,
/// Whisper-modell (tiny, base, small, medium, large-v3)
#[arg(long, default_value = "medium")]
model: String,
/// Initial prompt for bedre egennavn-gjenkjenning
#[arg(long)]
initial_prompt: Option<String>,
/// Språkkode (ISO 639-1)
#[arg(long, default_value = "no")]
language: String,
/// MIME-type for lydfilen (brukes som filnavn-hint til Whisper)
#[arg(long, default_value = "audio/mpeg")]
mime: String,
/// Node-ID å oppdatere i databasen (påkrevd med --write)
#[arg(long)]
node_id: Option<Uuid>,
/// Bruker-ID som utløste transkripsjonen (for ressurslogging)
#[arg(long)]
requested_by: Option<Uuid>,
/// Skriv resultater til database (uten dette flagget: kun stdout)
#[arg(long)]
write: bool,
}
#[derive(Debug, Serialize)]
struct Segment {
seq: i32,
start_ms: i32,
end_ms: i32,
content: String,
}
#[derive(Serialize)]
struct TranscribeResult {
segments: Vec<Segment>,
transcript: String,
duration_ms: i32,
segment_count: usize,
model: String,
language: String,
transcribed_at: String,
}
#[tokio::main]
async fn main() {
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| "synops_transcribe=info".parse().unwrap()),
)
.with_target(false)
.with_writer(std::io::stderr)
.init();
let cli = Cli::parse();
if cli.write && cli.node_id.is_none() {
eprintln!("Feil: --node-id er påkrevd sammen med --write");
process::exit(1);
}
if let Err(e) = run(cli).await {
eprintln!("Feil: {e}");
process::exit(1);
}
}
async fn run(cli: Cli) -> Result<(), String> {
let cas_root = std::env::var("CAS_ROOT").unwrap_or_else(|_| "/srv/synops/media/cas".into());
let whisper_url = std::env::var("WHISPER_URL").unwrap_or_else(|_| "http://localhost:8000".into());
// 1. Les lydfil fra CAS
let file_path = cas_path(&cas_root, &cli.cas_hash);
let file_data = tokio::fs::read(&file_path)
.await
.map_err(|e| format!("Kunne ikke lese CAS-fil {}: {e}", cli.cas_hash))?;
tracing::info!(
cas_hash = %cli.cas_hash,
size = file_data.len(),
model = %cli.model,
"Sender lydfil til Whisper"
);
// 2. Send til faster-whisper API (SRT-format)
let file_ext = mime_to_extension(&cli.mime);
let file_name = format!("audio.{file_ext}");
let file_part = reqwest::multipart::Part::bytes(file_data)
.file_name(file_name)
.mime_str(&cli.mime)
.map_err(|e| format!("Kunne ikke bygge multipart: {e}"))?;
let mut form = reqwest::multipart::Form::new()
.part("file", file_part)
.text("model", cli.model.clone())
.text("language", cli.language.clone())
.text("response_format", "srt");
if let Some(ref prompt) = cli.initial_prompt {
form = form.text("initial_prompt", prompt.clone());
}
let url = format!("{whisper_url}/v1/audio/transcriptions");
let response = reqwest::Client::new()
.post(&url)
.multipart(form)
.timeout(std::time::Duration::from_secs(600))
.send()
.await
.map_err(|e| format!("HTTP-feil mot Whisper: {e}"))?;
if !response.status().is_success() {
let status = response.status();
let body = response.text().await.unwrap_or_default();
return Err(format!("Whisper returnerte {status}: {body}"));
}
let srt_text = response
.text()
.await
.map_err(|e| format!("Kunne ikke lese Whisper-respons: {e}"))?;
// 3. Parse SRT til segmenter
let segments = parse_srt(&srt_text)?;
tracing::info!(segments = segments.len(), "SRT parset");
if segments.is_empty() {
return Err("Whisper returnerte tom SRT — ingen segmenter".into());
}
// 4. Bygg resultat
let transcript: String = segments.iter().map(|s| s.content.trim()).collect::<Vec<_>>().join(" ");
let duration_ms = segments.last().map(|s| s.end_ms).unwrap_or(0);
let transcribed_at = Utc::now();
let result = TranscribeResult {
segment_count: segments.len(),
transcript,
duration_ms,
model: cli.model.clone(),
language: cli.language.clone(),
transcribed_at: transcribed_at.to_rfc3339(),
segments,
};
// 5. Skriv JSON til stdout
println!(
"{}",
serde_json::to_string_pretty(&result).map_err(|e| format!("JSON-serialisering feilet: {e}"))?
);
// 6. Skriv til database hvis --write
if cli.write {
let node_id = cli.node_id.unwrap(); // Allerede validert
let db_url = std::env::var("DATABASE_URL")
.map_err(|_| "DATABASE_URL må settes med --write".to_string())?;
let db = sqlx::postgres::PgPoolOptions::new()
.max_connections(2)
.connect(&db_url)
.await
.map_err(|e| format!("Kunne ikke koble til database: {e}"))?;
// Slett gamle segmenter for denne noden (idempotent)
sqlx::query("DELETE FROM transcription_segments WHERE node_id = $1")
.bind(node_id)
.execute(&db)
.await
.map_err(|e| format!("Kunne ikke slette gamle segmenter: {e}"))?;
// Sett inn nye segmenter
insert_segments(&db, node_id, transcribed_at, &result.segments).await?;
// Oppdater node metadata
update_node(&db, node_id, &result.transcript, transcribed_at, result.segment_count, duration_ms).await?;
// Logg ressursforbruk
log_resource_usage(&db, node_id, cli.requested_by, &cli.model, duration_ms, &cli.language).await;
tracing::info!(node_id = %node_id, "Database oppdatert");
}
Ok(())
}
/// CAS-filsti: {root}/{hash[0..2]}/{hash[2..4]}/{hash}
fn cas_path(root: &str, hash: &str) -> PathBuf {
PathBuf::from(root)
.join(&hash[..2])
.join(&hash[2..4])
.join(hash)
}
fn mime_to_extension(mime: &str) -> &str {
match mime {
"audio/mpeg" | "audio/mp3" => "mp3",
"audio/wav" | "audio/x-wav" => "wav",
"audio/ogg" => "ogg",
"audio/flac" | "audio/x-flac" => "flac",
"audio/mp4" | "audio/m4a" | "audio/x-m4a" => "m4a",
"audio/webm" => "webm",
_ => "wav",
}
}
// --- SRT-parsing ---
fn parse_srt(srt: &str) -> Result<Vec<Segment>, String> {
let mut segments = Vec::new();
let mut lines = srt.lines().peekable();
while lines.peek().is_some() {
// Hopp over tomme linjer
while lines.peek().map_or(false, |l| l.trim().is_empty()) {
lines.next();
}
// Sekvensnummer
let seq_line = match lines.next() {
Some(l) if !l.trim().is_empty() => l.trim().to_string(),
_ => break,
};
let seq: i32 = seq_line
.parse()
.map_err(|_| format!("Ugyldig SRT-sekvensnummer: '{seq_line}'"))?;
// Tidslinje: 00:00:00,000 --> 00:00:05,230
let time_line = lines
.next()
.ok_or_else(|| format!("Mangler tidslinje etter sekvens {seq}"))?;
let (start_ms, end_ms) = parse_srt_time_line(time_line)
.map_err(|e| format!("Ugyldig tidslinje for sekvens {seq}: {e}"))?;
// Tekstlinjer (frem til tom linje eller slutt)
let mut text_parts = Vec::new();
while lines.peek().map_or(false, |l| !l.trim().is_empty()) {
text_parts.push(lines.next().unwrap().to_string());
}
let content = text_parts.join("\n");
if !content.is_empty() {
segments.push(Segment {
seq,
start_ms,
end_ms,
content,
});
}
}
Ok(segments)
}
fn parse_srt_time_line(line: &str) -> Result<(i32, i32), String> {
let parts: Vec<&str> = line.split("-->").collect();
if parts.len() != 2 {
return Err(format!("Forventet 'start --> end', fikk: '{line}'"));
}
let start = parse_srt_timestamp(parts[0].trim())?;
let end = parse_srt_timestamp(parts[1].trim())?;
Ok((start, end))
}
fn parse_srt_timestamp(ts: &str) -> Result<i32, String> {
let ts = ts.replace(',', ".");
let parts: Vec<&str> = ts.split(':').collect();
if parts.len() != 3 {
return Err(format!("Ugyldig tidsstempel: '{ts}'"));
}
let hours: f64 = parts[0].parse().map_err(|_| format!("Ugyldig timer: '{}'", parts[0]))?;
let minutes: f64 = parts[1].parse().map_err(|_| format!("Ugyldig minutter: '{}'", parts[1]))?;
let seconds: f64 = parts[2].parse().map_err(|_| format!("Ugyldig sekunder: '{}'", parts[2]))?;
Ok(((hours * 3_600.0 + minutes * 60.0 + seconds) * 1000.0) as i32)
}
// --- Database-operasjoner (kun med --write) ---
async fn insert_segments(
db: &sqlx::PgPool,
node_id: Uuid,
transcribed_at: chrono::DateTime<Utc>,
segments: &[Segment],
) -> Result<(), String> {
let mut tx = db.begin().await.map_err(|e| format!("Transaksjon feilet: {e}"))?;
for seg in segments {
sqlx::query(
"INSERT INTO transcription_segments (node_id, transcribed_at, seq, start_ms, end_ms, content)
VALUES ($1, $2, $3, $4, $5, $6)",
)
.bind(node_id)
.bind(transcribed_at)
.bind(seg.seq)
.bind(seg.start_ms)
.bind(seg.end_ms)
.bind(&seg.content)
.execute(&mut *tx)
.await
.map_err(|e| format!("Kunne ikke sette inn segment {}: {e}", seg.seq))?;
}
tx.commit().await.map_err(|e| format!("Commit feilet: {e}"))?;
tracing::info!(node_id = %node_id, segments = segments.len(), "Segmenter skrevet til DB");
Ok(())
}
async fn update_node(
db: &sqlx::PgPool,
node_id: Uuid,
transcript: &str,
transcribed_at: chrono::DateTime<Utc>,
segment_count: usize,
duration_ms: i32,
) -> Result<(), String> {
// Hent eksisterende metadata for å merge
let existing_metadata: Option<serde_json::Value> =
sqlx::query_scalar("SELECT metadata FROM nodes WHERE id = $1")
.bind(node_id)
.fetch_optional(db)
.await
.map_err(|e| format!("Kunne ikke hente node: {e}"))?;
let mut metadata = existing_metadata.ok_or_else(|| format!("Node {node_id} finnes ikke"))?;
metadata["transcription"] = serde_json::json!({
"duration_ms": duration_ms,
"segment_count": segment_count,
"transcribed_at": transcribed_at.to_rfc3339(),
});
sqlx::query("UPDATE nodes SET content = $2, metadata = $3 WHERE id = $1")
.bind(node_id)
.bind(transcript)
.bind(&metadata)
.execute(db)
.await
.map_err(|e| format!("Kunne ikke oppdatere node: {e}"))?;
tracing::info!(node_id = %node_id, transcript_len = transcript.len(), "Node oppdatert med transkripsjon");
Ok(())
}
async fn log_resource_usage(
db: &sqlx::PgPool,
node_id: Uuid,
requested_by: Option<Uuid>,
model: &str,
duration_ms: i32,
language: &str,
) {
// Finn collection via belongs_to-edge
let collection_id: Option<Uuid> = sqlx::query_scalar(
"SELECT e.target_id FROM edges e
JOIN nodes n ON n.id = e.target_id
WHERE e.source_id = $1 AND e.edge_type = 'belongs_to' AND n.node_kind = 'collection'
LIMIT 1",
)
.bind(node_id)
.fetch_optional(db)
.await
.ok()
.flatten();
let detail = serde_json::json!({
"model": model,
"duration_seconds": (duration_ms as f64) / 1000.0,
"language": language,
"mode": "batch",
});
if let Err(e) = sqlx::query(
"INSERT INTO resource_usage_log (target_node_id, triggered_by, collection_id, resource_type, detail)
VALUES ($1, $2, $3, $4, $5)",
)
.bind(node_id)
.bind(requested_by)
.bind(collection_id)
.bind("whisper")
.bind(&detail)
.execute(db)
.await
{
tracing::warn!(error = %e, "Kunne ikke logge ressursforbruk");
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_srt_timestamp() {
assert_eq!(parse_srt_timestamp("00:00:00,000").unwrap(), 0);
assert_eq!(parse_srt_timestamp("00:01:23,456").unwrap(), 83456);
assert_eq!(parse_srt_timestamp("01:00:00,000").unwrap(), 3_600_000);
assert_eq!(parse_srt_timestamp("00:00:05,230").unwrap(), 5230);
}
#[test]
fn test_parse_srt() {
let srt = "\
1
00:00:00,000 --> 00:00:05,230
Hei og velkommen til Sidelinja.
2
00:00:05,230 --> 00:00:10,500
I dag snakker vi om fotball.
";
let segments = parse_srt(srt).unwrap();
assert_eq!(segments.len(), 2);
assert_eq!(segments[0].seq, 1);
assert_eq!(segments[0].start_ms, 0);
assert_eq!(segments[0].end_ms, 5230);
assert_eq!(segments[0].content, "Hei og velkommen til Sidelinja.");
assert_eq!(segments[1].seq, 2);
assert_eq!(segments[1].start_ms, 5230);
assert_eq!(segments[1].end_ms, 10500);
}
#[test]
fn test_parse_srt_time_line() {
let (start, end) = parse_srt_time_line("00:01:23,456 --> 00:01:30,789").unwrap();
assert_eq!(start, 83456);
assert_eq!(end, 90789);
}
#[test]
fn test_parse_srt_empty() {
let segments = parse_srt("").unwrap();
assert!(segments.is_empty());
}
#[test]
fn test_cas_path() {
let p = cas_path("/srv/synops/media/cas", "b94d27b9934d3e08");
assert_eq!(
p,
PathBuf::from("/srv/synops/media/cas/b9/4d/b94d27b9934d3e08")
);
}
}