Fullfør oppgave 7.2: Transkripsjons-pipeline (CAS → Whisper → content)

Implementerer komplett pipeline for automatisk transkripsjon av lydfiler:

- PostgreSQL jobbkø (job_queue-tabell med status, retry, backoff)
- Worker-loop i maskinrommet som poller hvert 2. sekund
- Whisper-integrasjon: leser CAS-fil, sender multipart til faster-whisper API
- Postprosessering: filtrerer hallusinerte segmenter (no_speech_prob > 0.6)
- Oppdaterer media-nodens content-felt med transkripsjon og metadata
- Automatisk trigger: upload_media enqueuer jobb for audio/*-filer

Testet ende-til-ende på server: jobb plukkes opp, Whisper prosesserer,
node oppdateres. Retry med eksponentiell backoff ved feil.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
vegard 2026-03-17 17:44:54 +01:00
parent f6d1c5f563
commit 9768a24693
8 changed files with 590 additions and 3 deletions

View file

@ -92,6 +92,19 @@ Respons (verbose_json):
4. **Hallusinering:** Whisper hallusinerer tekst på stille/tone-filer (høy `no_speech_prob`).
Filtrér segmenter med `no_speech_prob > 0.6` i postprosessering.
## Integrasjon med maskinrommet (oppgave 7.2)
Transkripsjons-pipelinen er implementert i maskinrommet via jobbkøen:
1. **Trigger:** Når `upload_media` mottar en lydfil (MIME `audio/*`), opprettes en `whisper_transcribe`-jobb i `job_queue`.
2. **Worker:** Maskinrommet kjører en intern worker-loop som poller `job_queue` hvert 2. sekund.
3. **Prosessering:** Worker leser lydfilen fra CAS, sender den som multipart til faster-whisper API (`verbose_json`-format).
4. **Postprosessering:** Segmenter med `no_speech_prob > 0.6` filtreres bort (hallusinering).
5. **Lagring:** Transkripsjonstekst skrives til nodens `content`-felt. Metadata (`duration`, `language`, `segment_count`, `transcribed_at`) legges i `metadata.transcription`.
6. **Retry:** Ved feil, eksponentiell backoff (30s × 2^n), maks 3 forsøk.
Kode: `maskinrommet/src/transcribe.rs`, `maskinrommet/src/jobs.rs`
## Ved GPU-oppgradering
Bytt til CUDA-image og float16:
```yaml

View file

@ -441,6 +441,17 @@ version = "0.3.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cecba35d7ad927e23624b22ad55235f2239cfa44fd10428eecbeba6d6a717718"
[[package]]
name = "futures-macro"
version = "0.3.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e835b70203e41293343137df5c0664546da5745f82ec9b84d40be8336958447b"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "futures-sink"
version = "0.3.32"
@ -461,6 +472,7 @@ checksum = "389ca41296e6190b48053de0321d02a77f32f8a5d2461dd38762c0593805c6d6"
dependencies = [
"futures-core",
"futures-io",
"futures-macro",
"futures-sink",
"futures-task",
"memchr",
@ -1015,6 +1027,16 @@ version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a"
[[package]]
name = "mime_guess"
version = "2.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7c44f8e672c00fe5308fa235f821cb4198414e1c77935c1ab6948d3fd78550e"
dependencies = [
"mime",
"unicase",
]
[[package]]
name = "mio"
version = "1.1.1"
@ -1441,6 +1463,7 @@ dependencies = [
"base64",
"bytes",
"futures-core",
"futures-util",
"http",
"http-body",
"http-body-util",
@ -1449,6 +1472,7 @@ dependencies = [
"hyper-util",
"js-sys",
"log",
"mime_guess",
"percent-encoding",
"pin-project-lite",
"quinn",
@ -1460,12 +1484,14 @@ dependencies = [
"sync_wrapper",
"tokio",
"tokio-rustls",
"tokio-util",
"tower",
"tower-http",
"tower-service",
"url",
"wasm-bindgen",
"wasm-bindgen-futures",
"wasm-streams",
"web-sys",
"webpki-roots 1.0.6",
]
@ -2281,6 +2307,12 @@ version = "1.19.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "562d481066bde0658276a35467c4af00bdc6ee726305698a55b86e61d7ad82bb"
[[package]]
name = "unicase"
version = "2.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dbc4bc3a9f746d862c45cb89d705aa10f187bb96c76001afab07a0d35ce60142"
[[package]]
name = "unicode-bidi"
version = "0.3.18"
@ -2488,6 +2520,19 @@ dependencies = [
"wasmparser",
]
[[package]]
name = "wasm-streams"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "15053d8d85c7eccdbefef60f06769760a563c7f0a9d6902a13d35c7800b0ad65"
dependencies = [
"futures-util",
"js-sys",
"wasm-bindgen",
"wasm-bindgen-futures",
"web-sys",
]
[[package]]
name = "wasmparser"
version = "0.244.0"

View file

@ -15,7 +15,7 @@ tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] }
tower-http = { version = "0.6", features = ["cors", "trace"] }
jsonwebtoken = "9"
reqwest = { version = "0.12", default-features = false, features = ["rustls-tls", "json"] }
reqwest = { version = "0.12", default-features = false, features = ["rustls-tls", "json", "multipart", "stream"] }
sha2 = "0.10"
hex = "0.4"
tokio-util = { version = "0.7", features = ["io"] }

View file

@ -997,6 +997,41 @@ pub async fn upload_media(
None
};
// -- Enqueue transkripsjons-jobb for lydfiler --
if is_audio_mime(&mime) {
let payload = serde_json::json!({
"media_node_id": media_node_id,
"cas_hash": cas_result.hash,
"mime": mime,
"language": "no",
});
// Finn collection_node_id fra source_id sin eier-kjede (valgfritt)
let collection_id = if let Some(src_id) = source_id {
find_collection_for_node(&state.db, src_id).await.ok().flatten()
} else {
None
};
match crate::jobs::enqueue(&state.db, "whisper_transcribe", payload, collection_id, 5).await {
Ok(job_id) => {
tracing::info!(
job_id = %job_id,
media_node_id = %media_node_id,
"Transkripsjons-jobb opprettet"
);
}
Err(e) => {
// Ikke feil ut hele uploaden — logg og fortsett
tracing::error!(
media_node_id = %media_node_id,
error = %e,
"Kunne ikke opprette transkripsjons-jobb"
);
}
}
}
Ok(Json(UploadMediaResponse {
media_node_id,
cas_hash: cas_result.hash,
@ -1006,6 +1041,31 @@ pub async fn upload_media(
}))
}
/// Sjekker om en MIME-type er en lydtype som Whisper kan transkribere.
fn is_audio_mime(mime: &str) -> bool {
mime.starts_with("audio/")
}
/// Forsøker å finne collection_node_id for en node via belongs_to-edges.
async fn find_collection_for_node(db: &PgPool, node_id: Uuid) -> Result<Option<Uuid>, sqlx::Error> {
let row = sqlx::query_scalar::<_, Uuid>(
r#"
SELECT e.target_id
FROM edges e
JOIN nodes n ON n.id = e.target_id
WHERE e.source_id = $1
AND e.edge_type = 'belongs_to'
AND n.node_kind = 'collection'
LIMIT 1
"#,
)
.bind(node_id)
.fetch_optional(db)
.await?;
Ok(row)
}
// =============================================================================
// Bakgrunns-PG-operasjoner
// =============================================================================

204
maskinrommet/src/jobs.rs Normal file
View file

@ -0,0 +1,204 @@
// Jobbkø — PostgreSQL-basert asynkron jobbehandling.
//
// Enkel polling-loop med SELECT ... FOR UPDATE SKIP LOCKED.
// Dispatching til handler-funksjoner basert på job_type.
//
// Ref: docs/infra/jobbkø.md
use sqlx::PgPool;
use uuid::Uuid;
use crate::cas::CasStore;
use crate::stdb::StdbClient;
use crate::transcribe;
/// Rad fra job_queue-tabellen.
#[derive(sqlx::FromRow, Debug)]
pub struct JobRow {
pub id: Uuid,
pub collection_node_id: Option<Uuid>,
pub job_type: String,
pub payload: serde_json::Value,
pub attempts: i16,
pub max_attempts: i16,
}
/// Legger en ny jobb i køen.
pub async fn enqueue(
db: &PgPool,
job_type: &str,
payload: serde_json::Value,
collection_node_id: Option<Uuid>,
priority: i16,
) -> Result<Uuid, sqlx::Error> {
let row = sqlx::query_scalar::<_, Uuid>(
r#"
INSERT INTO job_queue (job_type, payload, collection_node_id, priority)
VALUES ($1, $2, $3, $4)
RETURNING id
"#,
)
.bind(job_type)
.bind(&payload)
.bind(collection_node_id)
.bind(priority)
.fetch_one(db)
.await?;
tracing::info!(job_id = %row, job_type = %job_type, "Jobb lagt i kø");
Ok(row)
}
/// Henter neste ventende jobb (atomisk med FOR UPDATE SKIP LOCKED).
/// Setter status til 'running' og oppdaterer started_at.
async fn dequeue(db: &PgPool) -> Result<Option<JobRow>, sqlx::Error> {
let mut tx = db.begin().await?;
let job = sqlx::query_as::<_, JobRow>(
r#"
SELECT id, collection_node_id, job_type, payload, attempts, max_attempts
FROM job_queue
WHERE status IN ('pending', 'retry')
AND scheduled_for <= now()
ORDER BY priority DESC, scheduled_for ASC
LIMIT 1
FOR UPDATE SKIP LOCKED
"#,
)
.fetch_optional(&mut *tx)
.await?;
if let Some(ref job) = job {
sqlx::query(
"UPDATE job_queue SET status = 'running', started_at = now(), attempts = attempts + 1 WHERE id = $1",
)
.bind(job.id)
.execute(&mut *tx)
.await?;
}
tx.commit().await?;
Ok(job)
}
/// Markerer en jobb som fullført.
async fn complete_job(db: &PgPool, job_id: Uuid, result: serde_json::Value) -> Result<(), sqlx::Error> {
sqlx::query(
"UPDATE job_queue SET status = 'completed', result = $2, completed_at = now() WHERE id = $1",
)
.bind(job_id)
.bind(&result)
.execute(db)
.await?;
Ok(())
}
/// Markerer en jobb som feilet. Settes til 'retry' med backoff hvis forsøk gjenstår,
/// ellers 'error'.
async fn fail_job(db: &PgPool, job: &JobRow, error_msg: &str) -> Result<(), sqlx::Error> {
if job.attempts < job.max_attempts {
// Eksponentiell backoff: 30s × 2^(attempts-1)
let backoff_secs = 30i64 * 2i64.pow((job.attempts).max(0) as u32);
sqlx::query(
r#"
UPDATE job_queue
SET status = 'retry',
error_msg = $2,
scheduled_for = now() + ($3 || ' seconds')::interval
WHERE id = $1
"#,
)
.bind(job.id)
.bind(error_msg)
.bind(backoff_secs.to_string())
.execute(db)
.await?;
tracing::warn!(
job_id = %job.id,
attempt = job.attempts,
max = job.max_attempts,
backoff_secs = backoff_secs,
"Jobb feilet, retry planlagt"
);
} else {
sqlx::query(
"UPDATE job_queue SET status = 'error', error_msg = $2, completed_at = now() WHERE id = $1",
)
.bind(job.id)
.bind(error_msg)
.execute(db)
.await?;
tracing::error!(
job_id = %job.id,
attempts = job.attempts,
"Jobb permanent feilet"
);
}
Ok(())
}
/// Dispatcher — kjører riktig handler basert på job_type.
async fn dispatch(
job: &JobRow,
db: &PgPool,
stdb: &StdbClient,
cas: &CasStore,
whisper_url: &str,
) -> Result<serde_json::Value, String> {
match job.job_type.as_str() {
"whisper_transcribe" => {
transcribe::handle_whisper_job(job, db, stdb, cas, whisper_url).await
}
other => Err(format!("Ukjent jobbtype: {other}")),
}
}
/// Starter worker-loopen som poller job_queue.
/// Kjører som en bakgrunnsoppgave i tokio.
pub fn start_worker(db: PgPool, stdb: StdbClient, cas: CasStore) {
let whisper_url = std::env::var("WHISPER_URL")
.unwrap_or_else(|_| "http://faster-whisper:8000".to_string());
tokio::spawn(async move {
tracing::info!("Jobbkø-worker startet (poll-intervall: 2s)");
loop {
match dequeue(&db).await {
Ok(Some(job)) => {
tracing::info!(
job_id = %job.id,
job_type = %job.job_type,
attempt = job.attempts,
"Behandler jobb"
);
match dispatch(&job, &db, &stdb, &cas, &whisper_url).await {
Ok(result) => {
if let Err(e) = complete_job(&db, job.id, result).await {
tracing::error!(job_id = %job.id, error = %e, "Kunne ikke markere jobb som fullført");
} else {
tracing::info!(job_id = %job.id, "Jobb fullført");
}
}
Err(err) => {
tracing::error!(job_id = %job.id, error = %err, "Jobb feilet");
if let Err(e) = fail_job(&db, &job, &err).await {
tracing::error!(job_id = %job.id, error = %e, "Kunne ikke markere jobb som feilet");
}
}
}
}
Ok(None) => {
// Ingen ventende jobber — vent før neste poll
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
}
Err(e) => {
tracing::error!(error = %e, "Feil ved polling av jobbkø");
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
}
}
}
});
}

View file

@ -1,9 +1,11 @@
mod auth;
pub mod cas;
mod intentions;
pub mod jobs;
mod queries;
mod serving;
mod stdb;
pub mod transcribe;
mod warmup;
use axum::{extract::State, http::StatusCode, routing::{get, post}, Json, Router};
@ -120,6 +122,9 @@ async fn main() {
.expect("Kunne ikke opprette CAS-katalog");
tracing::info!(root = %cas_root, "CAS initialisert");
// Start jobbkø-worker i bakgrunnen
jobs::start_worker(db.clone(), stdb.clone(), cas.clone());
let state = AppState { db, jwks, stdb, cas };
// Ruter: /health er offentlig, /me krever gyldig JWT

View file

@ -0,0 +1,259 @@
// Transkripsjons-pipeline — faster-whisper integrasjon.
//
// Henter lydfil fra CAS, sender til faster-whisper HTTP API,
// oppdaterer media-nodens content-felt med transkripsjonen.
//
// Ref: docs/erfaringer/faster_whisper_oppsett.md
use sqlx::PgPool;
use uuid::Uuid;
use crate::cas::CasStore;
use crate::jobs::JobRow;
use crate::stdb::StdbClient;
/// Whisper API-respons (verbose_json format).
#[derive(serde::Deserialize, Debug)]
struct WhisperResponse {
text: String,
#[serde(default)]
segments: Vec<WhisperSegment>,
#[serde(default)]
duration: f64,
#[serde(default)]
language: String,
}
#[derive(serde::Deserialize, Debug, serde::Serialize)]
struct WhisperSegment {
#[serde(default)]
id: i64,
start: f64,
end: f64,
text: String,
#[serde(default)]
no_speech_prob: f64,
}
/// Handler for whisper_transcribe-jobber.
///
/// Payload forventer:
/// - media_node_id: UUID — noden som skal oppdateres
/// - cas_hash: String — CAS-nøkkel til lydfilen
/// - mime: String — MIME-type (brukes for filnavn-hint)
/// - language: String (valgfritt, default "no")
pub async fn handle_whisper_job(
job: &JobRow,
db: &PgPool,
stdb: &StdbClient,
cas: &CasStore,
whisper_url: &str,
) -> Result<serde_json::Value, String> {
let media_node_id: Uuid = job.payload["media_node_id"]
.as_str()
.ok_or("Mangler media_node_id i payload")?
.parse()
.map_err(|e| format!("Ugyldig media_node_id: {e}"))?;
let cas_hash = job.payload["cas_hash"]
.as_str()
.ok_or("Mangler cas_hash i payload")?;
let mime = job.payload["mime"]
.as_str()
.unwrap_or("audio/mpeg");
let language = job.payload["language"]
.as_str()
.unwrap_or("no");
// 1. Les lydfil fra CAS
let file_path = cas.path_for(cas_hash);
let file_data = tokio::fs::read(&file_path)
.await
.map_err(|e| format!("Kunne ikke lese CAS-fil {cas_hash}: {e}"))?;
tracing::info!(
media_node_id = %media_node_id,
cas_hash = %cas_hash,
size = file_data.len(),
"Sender lydfil til Whisper"
);
// 2. Send til faster-whisper API
let file_ext = mime_to_extension(mime);
let file_name = format!("audio.{file_ext}");
let file_part = reqwest::multipart::Part::bytes(file_data)
.file_name(file_name)
.mime_str(mime)
.map_err(|e| format!("Kunne ikke bygge multipart: {e}"))?;
let form = reqwest::multipart::Form::new()
.part("file", file_part)
.text("model", "large-v3")
.text("language", language.to_string())
.text("response_format", "verbose_json");
let client = reqwest::Client::new();
let url = format!("{whisper_url}/v1/audio/transcriptions");
let response = client
.post(&url)
.multipart(form)
.timeout(std::time::Duration::from_secs(600)) // 10 min timeout for lange filer
.send()
.await
.map_err(|e| format!("HTTP-feil mot Whisper: {e}"))?;
if !response.status().is_success() {
let status = response.status();
let body = response.text().await.unwrap_or_default();
return Err(format!("Whisper returnerte {status}: {body}"));
}
let whisper_result: WhisperResponse = response
.json()
.await
.map_err(|e| format!("Kunne ikke parse Whisper-respons: {e}"))?;
tracing::info!(
media_node_id = %media_node_id,
duration = whisper_result.duration,
segments = whisper_result.segments.len(),
language = %whisper_result.language,
"Transkripsjon fullført"
);
// 3. Filtrer segmenter med høy no_speech_prob (hallusinering)
let filtered_segments: Vec<&WhisperSegment> = whisper_result
.segments
.iter()
.filter(|s| s.no_speech_prob <= 0.6)
.collect();
// Bygg filtrert tekst fra gyldige segmenter
let transcript_text = if filtered_segments.len() < whisper_result.segments.len() {
filtered_segments
.iter()
.map(|s| s.text.trim())
.collect::<Vec<_>>()
.join(" ")
} else {
whisper_result.text.clone()
};
let filtered_count = whisper_result.segments.len() - filtered_segments.len();
if filtered_count > 0 {
tracing::info!(
filtered = filtered_count,
"Filtrerte bort segmenter med høy no_speech_prob"
);
}
// 4. Oppdater media-nodens content-felt og metadata
update_node_with_transcript(db, stdb, media_node_id, &transcript_text, &whisper_result).await?;
Ok(serde_json::json!({
"duration": whisper_result.duration,
"language": whisper_result.language,
"segments": whisper_result.segments.len(),
"filtered_segments": filtered_count,
"transcript_length": transcript_text.len(),
}))
}
/// Oppdaterer nodens content-felt med transkripsjonen og lagrer
/// segmenter i metadata.transcription.
async fn update_node_with_transcript(
db: &PgPool,
stdb: &StdbClient,
node_id: Uuid,
transcript: &str,
whisper: &WhisperResponse,
) -> Result<(), String> {
// Hent eksisterende node fra PG for å merge metadata
let existing = sqlx::query_as::<_, NodeMetadataRow>(
"SELECT metadata, node_kind, title, visibility::text as visibility FROM nodes WHERE id = $1",
)
.bind(node_id)
.fetch_optional(db)
.await
.map_err(|e| format!("PG-feil ved henting av node: {e}"))?
.ok_or_else(|| format!("Node {node_id} finnes ikke"))?;
// Merge transcription-data inn i eksisterende metadata
let mut metadata = existing.metadata.clone();
metadata["transcription"] = serde_json::json!({
"duration": whisper.duration,
"language": whisper.language,
"segment_count": whisper.segments.len(),
"transcribed_at": chrono::Utc::now().to_rfc3339(),
});
let metadata_str = metadata.to_string();
let node_id_str = node_id.to_string();
let title = existing.title.clone().unwrap_or_default();
// Oppdater STDB (instant feedback) — best-effort, PG er autoritativ
if let Err(e) = stdb.update_node(
&node_id_str,
&existing.node_kind,
&title,
transcript,
&existing.visibility,
&metadata_str,
)
.await
{
tracing::warn!(
node_id = %node_id,
error = %e,
"Kunne ikke oppdatere STDB med transkripsjon (fortsetter med PG)"
);
}
// Oppdater PG (persistent)
sqlx::query(
r#"
UPDATE nodes
SET content = $2, metadata = $3
WHERE id = $1
"#,
)
.bind(node_id)
.bind(transcript)
.bind(&metadata)
.execute(db)
.await
.map_err(|e| format!("PG update feilet: {e}"))?;
tracing::info!(
node_id = %node_id,
transcript_len = transcript.len(),
"Node oppdatert med transkripsjon"
);
Ok(())
}
#[derive(sqlx::FromRow)]
struct NodeMetadataRow {
metadata: serde_json::Value,
node_kind: String,
title: Option<String>,
visibility: String,
}
/// Konverterer MIME-type til filendelse for Whisper-hinting.
fn mime_to_extension(mime: &str) -> &str {
match mime {
"audio/mpeg" | "audio/mp3" => "mp3",
"audio/wav" | "audio/x-wav" => "wav",
"audio/ogg" => "ogg",
"audio/flac" | "audio/x-flac" => "flac",
"audio/mp4" | "audio/m4a" | "audio/x-m4a" => "m4a",
"audio/webm" => "webm",
_ => "wav",
}
}

View file

@ -94,8 +94,7 @@ Uavhengige faser kan fortsatt plukkes.
## Fase 7: Lyd-pipeline
- [x] 7.1 faster-whisper oppsett: Docker-container, GPU hvis tilgjengelig, norsk modell. Ref: `docs/erfaringer/`.
- [~] 7.2 Transkripsjons-pipeline: lydfil i CAS → maskinrommet trigger Whisper → resultat i `content`-feltet.
> Påbegynt: 2026-03-17T17:25
- [x] 7.2 Transkripsjons-pipeline: lydfil i CAS → maskinrommet trigger Whisper → resultat i `content`-feltet.
- [ ] 7.3 Voice memo i frontend: opptak-knapp i input-komponenten → upload → CAS → transkripsjon.
- [ ] 7.4 Lyd-avspilling: spiller av original lyd fra CAS-node. Waveform-visning.
@ -149,6 +148,8 @@ Uavhengige faser kan fortsatt plukkes.
- [ ] 14.13 Redaksjonell samtale: ved innsending kan redaktør opprette kommunikasjonsnode knyttet til artikkel + forfatter for diskusjon/feedback utover kort notat i edge-metadata.
- [ ] 14.14 Bulk re-rendering: batch-jobb via jobbkø ved temaendring. Paginert (100 artikler om gangen), oppdaterer `renderer_version`. Artikler serveres med gammelt tema til re-rendret.
- [ ] 14.15 Dynamiske sider: kategori-sider (filtrert på tag-edges), arkiv (kronologisk med månedsgruppering), søk (PG fulltekst). Alle paginerte, cachet i maskinrommet. Om-side som statisk CAS-node.
- [ ] 14.16 Presentasjonselementer som noder: publisert tittel, ingress, OG-bilde, undertittel er egne noder med `title`/`summary`/`og_image`-edges til artikkelen. Frontend for å opprette/redigere varianter. Ref: `docs/concepts/publisering.md` § "Presentasjonselementer".
- [ ] 14.17 A/B-testing: maskinrommet roterer varianter ved forside-rendering, logger impressions/klikk per variant, normaliserer CTR mot tidspunkt-baseline. Etter statistisk signifikans markeres vinner. Redaktør kan overstyre. Edge-metadata: `ab_status`, `impressions`, `clicks`, `ctr`.
## Fase 15: Adminpanel