synops/tools/synops-transcribe/src/main.rs
vegard 6496434bd3 synops-common: delt lib for alle CLI-verktøy (oppgave 21.16)
Ny crate `tools/synops-common` samler duplisert kode som var
spredt over 13 CLI-verktøy:

- db::connect() — PG-pool fra DATABASE_URL (erstatter 10+ identiske blokker)
- cas::path() — CAS-stioppslag med to-nivå hash-katalog
- cas::root() — CAS_ROOT env med default
- cas::hash_bytes() / hash_file() / store() — SHA-256 hashing og lagring
- cas::mime_to_extension() — MIME → filendelse
- logging::init() — tracing til stderr med env-filter
- types::{NodeRow, EdgeRow, NodeSummary} — delte FromRow-structs

Alle verktøy (unntatt synops-tasks som ikke bruker DB) er refaktorert
til å bruke synops-common. Alle kompilerer og tester passerer.
2026-03-18 10:51:40 +00:00

441 lines
13 KiB
Rust

// synops-transcribe — Whisper-transkribering via faster-whisper API.
//
// Input: CAS-hash til lydfil, modellnavn, valgfri initial prompt.
// Output: JSON med segmenter til stdout.
// Med --write: skriver segmenter til PG og oppdaterer node metadata.
//
// Miljøvariabler:
// DATABASE_URL — PostgreSQL-tilkobling (påkrevd med --write)
// CAS_ROOT — Rot for content-addressable store (default: /srv/synops/media/cas)
// WHISPER_URL — faster-whisper API URL (default: http://localhost:8000)
//
// Ref: docs/retninger/unix_filosofi.md, docs/concepts/podcastfabrikken.md
use chrono::Utc;
use clap::Parser;
use serde::Serialize;
use std::process;
use uuid::Uuid;
/// Transkriber lydfil fra CAS via faster-whisper.
#[derive(Parser)]
#[command(name = "synops-transcribe", about = "Whisper-transkribering av lydfil fra CAS")]
struct Cli {
/// SHA-256 CAS-hash til lydfilen
#[arg(long)]
cas_hash: String,
/// Whisper-modell (tiny, base, small, medium, large-v3)
#[arg(long, default_value = "medium")]
model: String,
/// Initial prompt for bedre egennavn-gjenkjenning
#[arg(long)]
initial_prompt: Option<String>,
/// Språkkode (ISO 639-1)
#[arg(long, default_value = "no")]
language: String,
/// MIME-type for lydfilen (brukes som filnavn-hint til Whisper)
#[arg(long, default_value = "audio/mpeg")]
mime: String,
/// Node-ID å oppdatere i databasen (påkrevd med --write)
#[arg(long)]
node_id: Option<Uuid>,
/// Bruker-ID som utløste transkripsjonen (for ressurslogging)
#[arg(long)]
requested_by: Option<Uuid>,
/// Skriv resultater til database (uten dette flagget: kun stdout)
#[arg(long)]
write: bool,
}
#[derive(Debug, Serialize)]
struct Segment {
seq: i32,
start_ms: i32,
end_ms: i32,
content: String,
}
#[derive(Serialize)]
struct TranscribeResult {
segments: Vec<Segment>,
transcript: String,
duration_ms: i32,
segment_count: usize,
model: String,
language: String,
transcribed_at: String,
}
#[tokio::main]
async fn main() {
synops_common::logging::init("synops_transcribe");
let cli = Cli::parse();
if cli.write && cli.node_id.is_none() {
eprintln!("Feil: --node-id er påkrevd sammen med --write");
process::exit(1);
}
if let Err(e) = run(cli).await {
eprintln!("Feil: {e}");
process::exit(1);
}
}
async fn run(cli: Cli) -> Result<(), String> {
let cas_root = synops_common::cas::root();
let whisper_url = std::env::var("WHISPER_URL").unwrap_or_else(|_| "http://localhost:8000".into());
// 1. Les lydfil fra CAS
let file_path = synops_common::cas::path(&cas_root, &cli.cas_hash);
let file_data = tokio::fs::read(&file_path)
.await
.map_err(|e| format!("Kunne ikke lese CAS-fil {}: {e}", cli.cas_hash))?;
tracing::info!(
cas_hash = %cli.cas_hash,
size = file_data.len(),
model = %cli.model,
"Sender lydfil til Whisper"
);
// 2. Send til faster-whisper API (SRT-format)
let file_ext = synops_common::cas::mime_to_extension(&cli.mime);
let file_name = format!("audio.{file_ext}");
let file_part = reqwest::multipart::Part::bytes(file_data)
.file_name(file_name)
.mime_str(&cli.mime)
.map_err(|e| format!("Kunne ikke bygge multipart: {e}"))?;
let mut form = reqwest::multipart::Form::new()
.part("file", file_part)
.text("model", cli.model.clone())
.text("language", cli.language.clone())
.text("response_format", "srt");
if let Some(ref prompt) = cli.initial_prompt {
form = form.text("initial_prompt", prompt.clone());
}
let url = format!("{whisper_url}/v1/audio/transcriptions");
let response = reqwest::Client::new()
.post(&url)
.multipart(form)
.timeout(std::time::Duration::from_secs(600))
.send()
.await
.map_err(|e| format!("HTTP-feil mot Whisper: {e}"))?;
if !response.status().is_success() {
let status = response.status();
let body = response.text().await.unwrap_or_default();
return Err(format!("Whisper returnerte {status}: {body}"));
}
let srt_text = response
.text()
.await
.map_err(|e| format!("Kunne ikke lese Whisper-respons: {e}"))?;
// 3. Parse SRT til segmenter
let segments = parse_srt(&srt_text)?;
tracing::info!(segments = segments.len(), "SRT parset");
if segments.is_empty() {
return Err("Whisper returnerte tom SRT — ingen segmenter".into());
}
// 4. Bygg resultat
let transcript: String = segments.iter().map(|s| s.content.trim()).collect::<Vec<_>>().join(" ");
let duration_ms = segments.last().map(|s| s.end_ms).unwrap_or(0);
let transcribed_at = Utc::now();
let result = TranscribeResult {
segment_count: segments.len(),
transcript,
duration_ms,
model: cli.model.clone(),
language: cli.language.clone(),
transcribed_at: transcribed_at.to_rfc3339(),
segments,
};
// 5. Skriv JSON til stdout
println!(
"{}",
serde_json::to_string_pretty(&result).map_err(|e| format!("JSON-serialisering feilet: {e}"))?
);
// 6. Skriv til database hvis --write
if cli.write {
let node_id = cli.node_id.unwrap(); // Allerede validert
let db = synops_common::db::connect().await?;
// Slett gamle segmenter for denne noden (idempotent)
sqlx::query("DELETE FROM transcription_segments WHERE node_id = $1")
.bind(node_id)
.execute(&db)
.await
.map_err(|e| format!("Kunne ikke slette gamle segmenter: {e}"))?;
// Sett inn nye segmenter
insert_segments(&db, node_id, transcribed_at, &result.segments).await?;
// Oppdater node metadata
update_node(&db, node_id, &result.transcript, transcribed_at, result.segment_count, duration_ms).await?;
// Logg ressursforbruk
log_resource_usage(&db, node_id, cli.requested_by, &cli.model, duration_ms, &cli.language).await;
tracing::info!(node_id = %node_id, "Database oppdatert");
}
Ok(())
}
// --- SRT-parsing ---
fn parse_srt(srt: &str) -> Result<Vec<Segment>, String> {
let mut segments = Vec::new();
let mut lines = srt.lines().peekable();
while lines.peek().is_some() {
// Hopp over tomme linjer
while lines.peek().map_or(false, |l| l.trim().is_empty()) {
lines.next();
}
// Sekvensnummer
let seq_line = match lines.next() {
Some(l) if !l.trim().is_empty() => l.trim().to_string(),
_ => break,
};
let seq: i32 = seq_line
.parse()
.map_err(|_| format!("Ugyldig SRT-sekvensnummer: '{seq_line}'"))?;
// Tidslinje: 00:00:00,000 --> 00:00:05,230
let time_line = lines
.next()
.ok_or_else(|| format!("Mangler tidslinje etter sekvens {seq}"))?;
let (start_ms, end_ms) = parse_srt_time_line(time_line)
.map_err(|e| format!("Ugyldig tidslinje for sekvens {seq}: {e}"))?;
// Tekstlinjer (frem til tom linje eller slutt)
let mut text_parts = Vec::new();
while lines.peek().map_or(false, |l| !l.trim().is_empty()) {
text_parts.push(lines.next().unwrap().to_string());
}
let content = text_parts.join("\n");
if !content.is_empty() {
segments.push(Segment {
seq,
start_ms,
end_ms,
content,
});
}
}
Ok(segments)
}
fn parse_srt_time_line(line: &str) -> Result<(i32, i32), String> {
let parts: Vec<&str> = line.split("-->").collect();
if parts.len() != 2 {
return Err(format!("Forventet 'start --> end', fikk: '{line}'"));
}
let start = parse_srt_timestamp(parts[0].trim())?;
let end = parse_srt_timestamp(parts[1].trim())?;
Ok((start, end))
}
fn parse_srt_timestamp(ts: &str) -> Result<i32, String> {
let ts = ts.replace(',', ".");
let parts: Vec<&str> = ts.split(':').collect();
if parts.len() != 3 {
return Err(format!("Ugyldig tidsstempel: '{ts}'"));
}
let hours: f64 = parts[0].parse().map_err(|_| format!("Ugyldig timer: '{}'", parts[0]))?;
let minutes: f64 = parts[1].parse().map_err(|_| format!("Ugyldig minutter: '{}'", parts[1]))?;
let seconds: f64 = parts[2].parse().map_err(|_| format!("Ugyldig sekunder: '{}'", parts[2]))?;
Ok(((hours * 3_600.0 + minutes * 60.0 + seconds) * 1000.0) as i32)
}
// --- Database-operasjoner (kun med --write) ---
async fn insert_segments(
db: &sqlx::PgPool,
node_id: Uuid,
transcribed_at: chrono::DateTime<Utc>,
segments: &[Segment],
) -> Result<(), String> {
let mut tx = db.begin().await.map_err(|e| format!("Transaksjon feilet: {e}"))?;
for seg in segments {
sqlx::query(
"INSERT INTO transcription_segments (node_id, transcribed_at, seq, start_ms, end_ms, content)
VALUES ($1, $2, $3, $4, $5, $6)",
)
.bind(node_id)
.bind(transcribed_at)
.bind(seg.seq)
.bind(seg.start_ms)
.bind(seg.end_ms)
.bind(&seg.content)
.execute(&mut *tx)
.await
.map_err(|e| format!("Kunne ikke sette inn segment {}: {e}", seg.seq))?;
}
tx.commit().await.map_err(|e| format!("Commit feilet: {e}"))?;
tracing::info!(node_id = %node_id, segments = segments.len(), "Segmenter skrevet til DB");
Ok(())
}
async fn update_node(
db: &sqlx::PgPool,
node_id: Uuid,
transcript: &str,
transcribed_at: chrono::DateTime<Utc>,
segment_count: usize,
duration_ms: i32,
) -> Result<(), String> {
// Hent eksisterende metadata for å merge
let existing_metadata: Option<serde_json::Value> =
sqlx::query_scalar("SELECT metadata FROM nodes WHERE id = $1")
.bind(node_id)
.fetch_optional(db)
.await
.map_err(|e| format!("Kunne ikke hente node: {e}"))?;
let mut metadata = existing_metadata.ok_or_else(|| format!("Node {node_id} finnes ikke"))?;
metadata["transcription"] = serde_json::json!({
"duration_ms": duration_ms,
"segment_count": segment_count,
"transcribed_at": transcribed_at.to_rfc3339(),
});
sqlx::query("UPDATE nodes SET content = $2, metadata = $3 WHERE id = $1")
.bind(node_id)
.bind(transcript)
.bind(&metadata)
.execute(db)
.await
.map_err(|e| format!("Kunne ikke oppdatere node: {e}"))?;
tracing::info!(node_id = %node_id, transcript_len = transcript.len(), "Node oppdatert med transkripsjon");
Ok(())
}
async fn log_resource_usage(
db: &sqlx::PgPool,
node_id: Uuid,
requested_by: Option<Uuid>,
model: &str,
duration_ms: i32,
language: &str,
) {
// Finn collection via belongs_to-edge
let collection_id: Option<Uuid> = sqlx::query_scalar(
"SELECT e.target_id FROM edges e
JOIN nodes n ON n.id = e.target_id
WHERE e.source_id = $1 AND e.edge_type = 'belongs_to' AND n.node_kind = 'collection'
LIMIT 1",
)
.bind(node_id)
.fetch_optional(db)
.await
.ok()
.flatten();
let detail = serde_json::json!({
"model": model,
"duration_seconds": (duration_ms as f64) / 1000.0,
"language": language,
"mode": "batch",
});
if let Err(e) = sqlx::query(
"INSERT INTO resource_usage_log (target_node_id, triggered_by, collection_id, resource_type, detail)
VALUES ($1, $2, $3, $4, $5)",
)
.bind(node_id)
.bind(requested_by)
.bind(collection_id)
.bind("whisper")
.bind(&detail)
.execute(db)
.await
{
tracing::warn!(error = %e, "Kunne ikke logge ressursforbruk");
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_srt_timestamp() {
assert_eq!(parse_srt_timestamp("00:00:00,000").unwrap(), 0);
assert_eq!(parse_srt_timestamp("00:01:23,456").unwrap(), 83456);
assert_eq!(parse_srt_timestamp("01:00:00,000").unwrap(), 3_600_000);
assert_eq!(parse_srt_timestamp("00:00:05,230").unwrap(), 5230);
}
#[test]
fn test_parse_srt() {
let srt = "\
1
00:00:00,000 --> 00:00:05,230
Hei og velkommen til Sidelinja.
2
00:00:05,230 --> 00:00:10,500
I dag snakker vi om fotball.
";
let segments = parse_srt(srt).unwrap();
assert_eq!(segments.len(), 2);
assert_eq!(segments[0].seq, 1);
assert_eq!(segments[0].start_ms, 0);
assert_eq!(segments[0].end_ms, 5230);
assert_eq!(segments[0].content, "Hei og velkommen til Sidelinja.");
assert_eq!(segments[1].seq, 2);
assert_eq!(segments[1].start_ms, 5230);
assert_eq!(segments[1].end_ms, 10500);
}
#[test]
fn test_parse_srt_time_line() {
let (start, end) = parse_srt_time_line("00:01:23,456 --> 00:01:30,789").unwrap();
assert_eq!(start, 83456);
assert_eq!(end, 90789);
}
#[test]
fn test_parse_srt_empty() {
let segments = parse_srt("").unwrap();
assert!(segments.is_empty());
}
#[test]
fn test_cas_path() {
let p = synops_common::cas::path("/srv/synops/media/cas", "b94d27b9934d3e08");
assert_eq!(
p,
std::path::PathBuf::from("/srv/synops/media/cas/b9/4d/b94d27b9934d3e08")
);
}
}