# Infrastruktur: Jobbkø (PostgreSQL-basert) **Filsti:** `docs/infra/jobbkø.md` ## 1. Konsept Et felles, sentralisert køsystem for alle asynkrone bakgrunnsjobber i Sidelinja. Bygget som en enkel tabell i PostgreSQL med Rust-workers som konsumerer jobber. Ingen ekstern message broker — PostgreSQL er køen. ## 2. Hvorfor PostgreSQL? - Allerede i stacken, ingen ny infrastruktur å drifte - Transaksjonell garanti: jobben og resultatet kan committes sammen med dataendringer - Lavt volum (titalls jobber/time) gjør polling neglisjerbart - Enkel feilsøking via SQL (`SELECT * FROM job_queue WHERE status = 'error'`) - `SELECT ... FOR UPDATE SKIP LOCKED` gir trygg concurrent polling uten låsekonflikt ## 3. Datastruktur ```sql CREATE TYPE job_status AS ENUM ('pending', 'running', 'completed', 'error', 'retry'); CREATE TABLE job_queue ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), collection_node_id UUID NOT NULL REFERENCES nodes(id) ON DELETE CASCADE, -- Samlings-node jobben tilhører job_type TEXT NOT NULL, -- 'whisper_transcribe', 'openrouter_analyze', 'stats_parse', 'research_clip' payload JSONB NOT NULL, -- Inputdata (filsti, tekst, tema_id, etc.) status job_status NOT NULL DEFAULT 'pending', priority SMALLINT NOT NULL DEFAULT 0, -- Høyere = viktigere result JSONB, -- Resultatet ved fullført jobb error_msg TEXT, -- Feilmelding ved error attempts SMALLINT NOT NULL DEFAULT 0, max_attempts SMALLINT NOT NULL DEFAULT 3, created_at TIMESTAMPTZ NOT NULL DEFAULT now(), started_at TIMESTAMPTZ, completed_at TIMESTAMPTZ, scheduled_for TIMESTAMPTZ NOT NULL DEFAULT now() -- For utsatte jobber / retry med backoff ); CREATE INDEX idx_job_queue_pending ON job_queue (priority DESC, scheduled_for ASC) WHERE status IN ('pending', 'retry'); ``` ## 4. Worker-arkitektur (Rust) ### 4.1 Designprinsipp: Orkestrator, ikke prosesseringsmotor Workeren gjør lite tung prosessering selv. Den er en **orkestrator** som koordinerer eksterne tjenester: | Jobbtype | Hva workeren gjør | Tung logikk i workeren? | |---|---|---| | `whisper_transcribe` | HTTP-kall til faster-whisper-server (SRT), parse → `transcription_segments` | Lett SRT-parsing | | `openrouter_analyze` | HTTP-kall til AI Gateway | Nei — venter på svar | | `stats_parse` | Parser Caddy-loggfiler, skriver til PG | Lett I/O | | `research_clip` | HTTP-kall til AI Gateway | Nei — venter på svar | | `generate_embeddings` | HTTP-kall til AI Gateway | Nei — venter på svar | Ny jobbtype = ny handler-funksjon (bygg request, håndter respons, feilhåndtering). Tynt glue-code. Rekompilering er triviell og inkrementell. ### 4.2 Én worker, prioritetsstyrt Én enkelt worker-prosess håndterer **alle jobbtyper**. Prioritering skjer via `priority`-kolonnen i køen — SQL-spørringen plukker alltid viktigste jobb først. Ingen behov for separate prosesser per jobbtype. ``` ┌──────────────────────────────────────────────────┐ │ Rust Worker (sidelinja-worker) │ │ │ │ Konfigurasjon: │ │ --max-concurrent 3 (samtidige jobber) │ │ --poll-interval 1s │ │ │ │ Loop (per ledig slot): │ │ 1. SELECT ... FOR UPDATE SKIP LOCKED │ │ WHERE status IN ('pending','retry') │ │ AND scheduled_for <= now() │ │ ORDER BY priority DESC, scheduled_for │ │ LIMIT 1 │ │ │ │ 2. UPDATE status = 'running' │ │ 3. Dispatch til handler basert på job_type │ │ 4a. OK: UPDATE status = 'completed' │ │ 4b. Feil: attempts += 1 │ │ Hvis attempts < max_attempts: │ │ status = 'retry' │ │ scheduled_for = now() │ │ + backoff(attempts) │ │ Ellers: status = 'error' │ │ │ └──────────────────────────────────────────────────┘ ``` ### 4.3 Prioritetsmodell | Prioritet | Kategori | Eksempler | |---|---|---| | 10 | Brukerrettet / sanntid | `dictation_cleanup`, `research_clip` | | 5 | Normal | `whisper_transcribe`, `openrouter_analyze`, `srt_parse` | | 1 | Bakgrunn | `stats_parse`, `generate_embeddings`, `prompt_eval` | Verdiene er veiledende — SvelteKit setter prioritet ved opprettelse basert på kontekst. En manuelt trigget transkripsjon kan få høyere prioritet enn en automatisk nattjobb. ### 4.4 Ressursstyring * **Concurrency:** `--max-concurrent` begrenser antall samtidige jobber. Default 3 — passer for 8 vCPU der noen slots er Whisper (CPU-tung) og resten er HTTP-kall (ventetid). * **Resource Governor (Whisper):** Når et LiveKit-rom er aktivt, reduserer workeren Whisper-tråder (`--threads 2` i HTTP-kall til faster-whisper) for å beskytte lydkvaliteten. Sjekkes via LiveKit room-status før Whisper-kall. * **Skalering senere:** To nivåer: 1. **Worker-splitting:** Workeren splittes til to binærer fra samme crate (`worker-heavy`, `worker-light`) via CLI-argument (`--types whisper_transcribe,openrouter_analyze`). Ingen kodeendring nødvendig — kun deploy-konfigurasjon. 2. **Compute-separasjon:** Flytt Rust-worker + faster-whisper til en separat Hetzner-node (evt. ARM/Ampere for pris/ytelse). LiveKit er ekstremt sensitivt for CPU-stotring — ved samtidig WebRTC og Whisper på samme maskin risikerer vi audio glitches uansett cgroups. Worker-noden poller jobbkøen i PostgreSQL over internt nettverk — arkitekturen støtter dette uten kodeendring. **Backoff-strategi:** Eksponentiell: `30s × 2^(attempts-1)` (30s, 60s, 120s). ## 5. Jobbtyper | `job_type` | Konsument | Beskrivelse | |---|---|---| | `whisper_transcribe` | Universell lyd-tjeneste | Transkriber lydfil via faster-whisper (SRT) → `transcription_segments` | | `openrouter_analyze` | Podcastfabrikken | Metadata-uttrekk fra transkripsjon | | `ai_text_process` | Editor (AI-knapp) | Rens, oppsummer, trekk ut fakta, skriv om (se `docs/proposals/editor.md`) | | `stats_parse` | Podcast-Statistikk | Batch-prosesser Caddy-logger | | `meeting_summarize` | Møterommet | Generer møtereferat og action points fra transkripsjon | | `valgomat_generate_profile` | Valgomat | Generer syntetiske kandidatprofiler fra partiprogrammer | | `valgomat_moderation` | Valgomat | Semantisk deduplisering og nøytralitetsvask av brukerspørsmål | | `dictation_cleanup` | Lydmeldinger | AI-opprydding av diktert transkripsjon til strukturert notat | | `generate_embeddings` | Kunnskaps-Bridge | Generer vector embeddings for noder (pgvector) | | `prompt_eval` | Prompt-Laboratorium | Batch-evaluering av testsett mot valgte modeller | | `suggest_edges` | Kunnskapsgraf (AI) | Analyser innhold via LLM, foreslå topics og mentions-edges. Trigges automatisk ved opprettelse av content-noder med tilstrekkelig tekst | | `summarize_communication` | Oppsummering (AI) | Generer AI-sammendrag av kommunikasjonsnode (chat/møte). Oppretter content-node med summary-edge tilbake. Trigges via `/intentions/summarize` | | `url_ingest` | Web Clipper (proposal) | Hent URL, oppsummer via AI, opprett research-klipp med graf-koblinger | | `generate_waveform` | Waveforms (proposal) | Generer audio-peaks fra lydfil for visuell bølgeform | ## 6. Tilgangsisolasjon Alle jobber merkes med `collection_node_id`. Rust-workers kjører som superuser (bypasser RLS) og sikrer isolasjon i applikasjonskode: * Worker leser `collection_node_id` fra jobben og bruker det til å lagre resultater tilbake i riktig samlings-node * Per samlings-node config (AI-prompts, navnelister) hentes fra samlings-nodens JSONB-metadata * Feilede jobber vises kun for brukere med tilgang til samlings-noden (via node_access) i admin-visningen ## 7. Observabilitet og admin-API ### Implementert (oppgave 15.3) Admin-oversikt over jobbkøen med filtrering og handlinger. **API-endepunkter:** - `GET /admin/jobs?status=&type=&collection_id=&limit=&offset=` — paginert jobbliste med filtre - `POST /intentions/retry_job` `{ job_id }` — sett feilet/retry-jobb tilbake til pending - `POST /intentions/cancel_job` `{ job_id }` — avbryt ventende/retry-jobb **Frontend:** `/admin/jobs` — statusoppsummering (antall per status), filter på type/status, tabell med alle felter, retry/avbryt-knapper. Poller hvert 5. sekund. **Kode:** `jobs.rs` (`list_jobs`, `count_by_status`, `distinct_job_types`, `retry_job`, `cancel_job`), `intentions.rs` (API-handlers), `frontend/src/routes/admin/jobs/+page.svelte` - Valgfritt: SpacetimeDB-event ved statusendring slik at UI kan vise fremdrift i sanntid (f.eks. "Transkriberer... 2/3 forsøk") ## 8. Instruks for Claude Code - Én binær: `sidelinja-worker`. Én Rust-crate med polling-loop + handler-dispatch - Hver jobbtype implementeres som en handler-funksjon som registreres i en `HashMap` - Bruk `tokio` med semaphore for concurrency-kontroll (`--max-concurrent`) - Aldri lagre lydfiler i `payload` — bruk filstier - Opprett alltid jobber med riktig `collection_node_id` — hent fra konteksten (innlogget bruker, webhook, etc.) - Ved `stats_parse`: denne erstatter den frittstående cronjobben beskrevet i podcast_statistikk.md — bruk jobbkøen med `scheduled_for` for periodisk kjøring - Splitt til flere binærer kun hvis det blir eksplisitt bedt om — start med én