synops/maskinrommet/src/transcribe.rs
vegard eb81055ef4 Fullfører oppgave 15.7: Ressursforbruk-logging
Sentralisert logging av alle ressurskrevende operasjoner til
resource_usage_log-tabellen (opprettet i migrasjon 009).

Ny kode:
- resource_usage.rs: hjelpemodul med log() og find_collection_for_node()
- bandwidth.rs: Caddy JSON-logg-parser med nattlig batch-jobb (kl 03:00)

Logging lagt til i handlere:
- AI: summarize, ai_edges (token-telling via LiteLLM usage-felt),
  agent (placeholder — claude CLI gir ikke token-info)
- Whisper: duration_seconds, model, language, mode
- TTS: refaktorert til sentralisert modul, lagt til collection_id
- CAS: logger nye filer ved upload (ikke dedup)
- LiveKit: logger join-hendelser (faktisk deltaker-minutter
  krever webhook-integrasjon i fremtiden)

Caddy-config: JSON access logging aktivert for sidelinja.org og
synops.no i /srv/synops/config/caddy/Caddyfile (utenfor repo).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-18 04:24:54 +00:00

461 lines
13 KiB
Rust

// Transkripsjons-pipeline — faster-whisper integrasjon.
//
// Henter lydfil fra CAS, sender til faster-whisper HTTP API (SRT-format),
// parser SRT og skriver segmenter til transcription_segments-tabellen.
// Universell tjeneste for all lyd: podcast, møter, voice memos.
//
// Ref: docs/concepts/podcastfabrikken.md
use sqlx::PgPool;
use uuid::Uuid;
use crate::cas::CasStore;
use crate::jobs::JobRow;
use crate::resource_usage;
use crate::stdb::StdbClient;
/// Et parset SRT-segment.
#[derive(Debug)]
struct SrtSegment {
seq: i32,
start_ms: i32,
end_ms: i32,
content: String,
}
/// Handler for whisper_transcribe-jobber.
///
/// Payload forventer:
/// - media_node_id: UUID — noden som skal oppdateres
/// - cas_hash: String — CAS-nøkkel til lydfilen
/// - mime: String — MIME-type (brukes for filnavn-hint)
/// - language: String (valgfritt, default "no")
/// - initial_prompt: String (valgfritt — navneliste for bedre egennavn)
pub async fn handle_whisper_job(
job: &JobRow,
db: &PgPool,
stdb: &StdbClient,
cas: &CasStore,
whisper_url: &str,
) -> Result<serde_json::Value, String> {
let media_node_id: Uuid = job.payload["media_node_id"]
.as_str()
.ok_or("Mangler media_node_id i payload")?
.parse()
.map_err(|e| format!("Ugyldig media_node_id: {e}"))?;
let cas_hash = job.payload["cas_hash"]
.as_str()
.ok_or("Mangler cas_hash i payload")?;
let mime = job.payload["mime"]
.as_str()
.unwrap_or("audio/mpeg");
let language = job.payload["language"]
.as_str()
.unwrap_or("no");
// Hent initial_prompt: payload > miljøvariabel > ingen
let initial_prompt = match job.payload["initial_prompt"].as_str() {
Some(p) => Some(p.to_string()),
None => std::env::var("WHISPER_INITIAL_PROMPT").ok(),
};
// Modell: sentral serverinnstilling
let model = std::env::var("WHISPER_MODEL")
.unwrap_or_else(|_| "medium".to_string());
// 1. Les lydfil fra CAS
let file_path = cas.path_for(cas_hash);
let file_data = tokio::fs::read(&file_path)
.await
.map_err(|e| format!("Kunne ikke lese CAS-fil {cas_hash}: {e}"))?;
tracing::info!(
media_node_id = %media_node_id,
cas_hash = %cas_hash,
size = file_data.len(),
model = %model,
"Sender lydfil til Whisper"
);
// 2. Send til faster-whisper API (SRT-format)
let file_ext = mime_to_extension(mime);
let file_name = format!("audio.{file_ext}");
let file_part = reqwest::multipart::Part::bytes(file_data)
.file_name(file_name)
.mime_str(mime)
.map_err(|e| format!("Kunne ikke bygge multipart: {e}"))?;
let mut form = reqwest::multipart::Form::new()
.part("file", file_part)
.text("model", model.clone())
.text("language", language.to_string())
.text("response_format", "srt");
if let Some(ref prompt) = initial_prompt {
form = form.text("initial_prompt", prompt.clone());
}
let client = reqwest::Client::new();
let url = format!("{whisper_url}/v1/audio/transcriptions");
let response = client
.post(&url)
.multipart(form)
.timeout(std::time::Duration::from_secs(600)) // 10 min timeout for lange filer
.send()
.await
.map_err(|e| format!("HTTP-feil mot Whisper: {e}"))?;
if !response.status().is_success() {
let status = response.status();
let body = response.text().await.unwrap_or_default();
return Err(format!("Whisper returnerte {status}: {body}"));
}
let srt_text = response
.text()
.await
.map_err(|e| format!("Kunne ikke lese Whisper-respons: {e}"))?;
// 3. Parse SRT til segmenter
let segments = parse_srt(&srt_text)?;
tracing::info!(
media_node_id = %media_node_id,
segments = segments.len(),
"SRT parset"
);
if segments.is_empty() {
return Err("Whisper returnerte tom SRT — ingen segmenter".to_string());
}
// 4. Skriv segmenter til transcription_segments-tabellen
let transcribed_at = chrono::Utc::now();
insert_segments(db, media_node_id, transcribed_at, &segments).await?;
// 5. Bygg sammenhengende tekst og oppdater node
let transcript_text: String = segments
.iter()
.map(|s| s.content.trim())
.collect::<Vec<_>>()
.join(" ");
let duration_ms = segments.last().map(|s| s.end_ms).unwrap_or(0);
update_node_with_transcript(
db,
stdb,
media_node_id,
&transcript_text,
transcribed_at,
segments.len(),
duration_ms,
)
.await?;
// Logg ressursforbruk (Whisper)
let triggered_by: Option<Uuid> = job.payload["requested_by"]
.as_str()
.and_then(|s| s.parse().ok());
let collection_id = resource_usage::find_collection_for_node(db, media_node_id).await;
let duration_seconds = (duration_ms as f64) / 1000.0;
if let Err(e) = resource_usage::log(
db,
media_node_id,
triggered_by,
collection_id,
"whisper",
serde_json::json!({
"model": model,
"duration_seconds": duration_seconds,
"language": language,
"mode": "batch"
}),
)
.await
{
tracing::warn!(error = %e, "Kunne ikke logge Whisper-ressursforbruk");
}
Ok(serde_json::json!({
"segments": segments.len(),
"transcript_length": transcript_text.len(),
"duration_ms": duration_ms,
"model": model,
"transcribed_at": transcribed_at.to_rfc3339(),
}))
}
/// Parser SRT-tekst til en liste med segmenter.
///
/// SRT-format:
/// ```text
/// 1
/// 00:00:00,000 --> 00:00:05,230
/// Hei og velkommen til Sidelinja.
///
/// 2
/// 00:00:05,230 --> 00:00:10,500
/// I dag snakker vi om...
/// ```
fn parse_srt(srt: &str) -> Result<Vec<SrtSegment>, String> {
let mut segments = Vec::new();
let mut lines = srt.lines().peekable();
while lines.peek().is_some() {
// Hopp over tomme linjer
while lines.peek().map_or(false, |l| l.trim().is_empty()) {
lines.next();
}
// Sekvensnummer
let seq_line = match lines.next() {
Some(l) if !l.trim().is_empty() => l.trim().to_string(),
_ => break,
};
let seq: i32 = seq_line
.parse()
.map_err(|_| format!("Ugyldig SRT-sekvensnummer: '{seq_line}'"))?;
// Tidslinje: 00:00:00,000 --> 00:00:05,230
let time_line = lines
.next()
.ok_or_else(|| format!("Mangler tidslinje etter sekvens {seq}"))?;
let (start_ms, end_ms) = parse_srt_time_line(time_line)
.map_err(|e| format!("Ugyldig tidslinje for sekvens {seq}: {e}"))?;
// Tekstlinjer (frem til tom linje eller slutt)
let mut text_parts = Vec::new();
while lines.peek().map_or(false, |l| !l.trim().is_empty()) {
text_parts.push(lines.next().unwrap().to_string());
}
let content = text_parts.join("\n");
if !content.is_empty() {
segments.push(SrtSegment {
seq,
start_ms,
end_ms,
content,
});
}
}
Ok(segments)
}
/// Parser en SRT-tidslinje: "00:01:23,456 --> 00:01:30,789"
/// Returnerer (start_ms, end_ms).
fn parse_srt_time_line(line: &str) -> Result<(i32, i32), String> {
let parts: Vec<&str> = line.split("-->").collect();
if parts.len() != 2 {
return Err(format!("Forventet 'start --> end', fikk: '{line}'"));
}
let start = parse_srt_timestamp(parts[0].trim())?;
let end = parse_srt_timestamp(parts[1].trim())?;
Ok((start, end))
}
/// Parser et SRT-tidsstempel: "00:01:23,456" → millisekunder.
fn parse_srt_timestamp(ts: &str) -> Result<i32, String> {
// Format: HH:MM:SS,mmm
let ts = ts.replace(',', ".");
let parts: Vec<&str> = ts.split(':').collect();
if parts.len() != 3 {
return Err(format!("Ugyldig tidsstempel: '{ts}'"));
}
let hours: f64 = parts[0].parse().map_err(|_| format!("Ugyldig timer: '{}'", parts[0]))?;
let minutes: f64 = parts[1].parse().map_err(|_| format!("Ugyldig minutter: '{}'", parts[1]))?;
let seconds: f64 = parts[2].parse().map_err(|_| format!("Ugyldig sekunder: '{}'", parts[2]))?;
Ok(((hours * 3_600.0 + minutes * 60.0 + seconds) * 1000.0) as i32)
}
/// Setter inn segmenter i transcription_segments-tabellen.
async fn insert_segments(
db: &PgPool,
node_id: Uuid,
transcribed_at: chrono::DateTime<chrono::Utc>,
segments: &[SrtSegment],
) -> Result<(), String> {
let mut tx = db.begin().await.map_err(|e| format!("Transaksjon feilet: {e}"))?;
for seg in segments {
sqlx::query(
r#"
INSERT INTO transcription_segments (node_id, transcribed_at, seq, start_ms, end_ms, content)
VALUES ($1, $2, $3, $4, $5, $6)
"#,
)
.bind(node_id)
.bind(transcribed_at)
.bind(seg.seq)
.bind(seg.start_ms)
.bind(seg.end_ms)
.bind(&seg.content)
.execute(&mut *tx)
.await
.map_err(|e| format!("Kunne ikke sette inn segment {}: {e}", seg.seq))?;
}
tx.commit().await.map_err(|e| format!("Commit feilet: {e}"))?;
tracing::info!(
node_id = %node_id,
segments = segments.len(),
transcribed_at = %transcribed_at,
"Segmenter skrevet til transcription_segments"
);
Ok(())
}
/// Oppdaterer nodens content-felt med sammenhengende tekst og
/// lagrer transkripsjonsmetadata.
async fn update_node_with_transcript(
db: &PgPool,
stdb: &StdbClient,
node_id: Uuid,
transcript: &str,
transcribed_at: chrono::DateTime<chrono::Utc>,
segment_count: usize,
duration_ms: i32,
) -> Result<(), String> {
// Hent eksisterende node fra PG for å merge metadata
let existing = sqlx::query_as::<_, NodeMetadataRow>(
"SELECT metadata, node_kind, title, visibility::text as visibility FROM nodes WHERE id = $1",
)
.bind(node_id)
.fetch_optional(db)
.await
.map_err(|e| format!("PG-feil ved henting av node: {e}"))?
.ok_or_else(|| format!("Node {node_id} finnes ikke"))?;
// Merge transcription-data inn i eksisterende metadata
let mut metadata = existing.metadata.clone();
metadata["transcription"] = serde_json::json!({
"duration_ms": duration_ms,
"segment_count": segment_count,
"transcribed_at": transcribed_at.to_rfc3339(),
});
let metadata_str = metadata.to_string();
let node_id_str = node_id.to_string();
let title = existing.title.clone().unwrap_or_default();
// Oppdater STDB (instant feedback) — best-effort, PG er autoritativ
if let Err(e) = stdb.update_node(
&node_id_str,
&existing.node_kind,
&title,
transcript,
&existing.visibility,
&metadata_str,
)
.await
{
tracing::warn!(
node_id = %node_id,
error = %e,
"Kunne ikke oppdatere STDB med transkripsjon (fortsetter med PG)"
);
}
// Oppdater PG (persistent)
sqlx::query(
r#"
UPDATE nodes
SET content = $2, metadata = $3
WHERE id = $1
"#,
)
.bind(node_id)
.bind(transcript)
.bind(&metadata)
.execute(db)
.await
.map_err(|e| format!("PG update feilet: {e}"))?;
tracing::info!(
node_id = %node_id,
transcript_len = transcript.len(),
"Node oppdatert med transkripsjon"
);
Ok(())
}
#[derive(sqlx::FromRow)]
struct NodeMetadataRow {
metadata: serde_json::Value,
node_kind: String,
title: Option<String>,
visibility: String,
}
/// Konverterer MIME-type til filendelse for Whisper-hinting.
fn mime_to_extension(mime: &str) -> &str {
match mime {
"audio/mpeg" | "audio/mp3" => "mp3",
"audio/wav" | "audio/x-wav" => "wav",
"audio/ogg" => "ogg",
"audio/flac" | "audio/x-flac" => "flac",
"audio/mp4" | "audio/m4a" | "audio/x-m4a" => "m4a",
"audio/webm" => "webm",
_ => "wav",
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_srt_timestamp() {
assert_eq!(parse_srt_timestamp("00:00:00,000").unwrap(), 0);
assert_eq!(parse_srt_timestamp("00:01:23,456").unwrap(), 83456);
assert_eq!(parse_srt_timestamp("01:00:00,000").unwrap(), 3_600_000);
assert_eq!(parse_srt_timestamp("00:00:05,230").unwrap(), 5230);
}
#[test]
fn test_parse_srt() {
let srt = "\
1
00:00:00,000 --> 00:00:05,230
Hei og velkommen til Sidelinja.
2
00:00:05,230 --> 00:00:10,500
I dag snakker vi om fotball.
";
let segments = parse_srt(srt).unwrap();
assert_eq!(segments.len(), 2);
assert_eq!(segments[0].seq, 1);
assert_eq!(segments[0].start_ms, 0);
assert_eq!(segments[0].end_ms, 5230);
assert_eq!(segments[0].content, "Hei og velkommen til Sidelinja.");
assert_eq!(segments[1].seq, 2);
assert_eq!(segments[1].start_ms, 5230);
assert_eq!(segments[1].end_ms, 10500);
}
#[test]
fn test_parse_srt_time_line() {
let (start, end) = parse_srt_time_line("00:01:23,456 --> 00:01:30,789").unwrap();
assert_eq!(start, 83456);
assert_eq!(end, 90789);
}
#[test]
fn test_parse_srt_empty() {
let segments = parse_srt("").unwrap();
assert!(segments.is_empty());
}
}