synops/docs/infra/jobbkø.md
vegard bfc88b9a80 Jobbkø-dispatcher: spawn CLI-verktøy i stedet for inline-kode (oppgave 21.15)
Alle jobbkø-handlere i maskinrommet delegerer nå til CLI-verktøy
(Command::new("synops-X")) i stedet for å kjøre logikk inline.
Stdout → jobbresultat (JSON), stderr → feillogg, exitkode → status.

Konverterte handlere:
- tts_generate → synops-tts (beholder voice-oppslag + STDB-synk)
- suggest_edges → synops-suggest-edges (STDB-synk utelatt, topics er bakgrunnsdata)
- render_article → synops-render --render-type article
- render_index → synops-render --render-type index
- audio_process → synops-audio (beholder STDB-synk)

Allerede CLI-delegerende (uendret):
- whisper_transcribe → synops-transcribe
- agent_respond → synops-respond
- summarize_communication → synops-summarize

Fortsatt inline (mangler CLI-verktøy):
- ai_process — planlagt i fremtidig oppgave

Ny felles modul: cli_dispatch.rs med run_cli_tool() og env-helpers.
synops-tts modifisert til å inkludere media_node_id i output.
Ref: docs/retninger/unix_filosofi.md
2026-03-18 10:36:38 +00:00

230 lines
13 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Infrastruktur: Jobbkø (PostgreSQL-basert)
**Filsti:** `docs/infra/jobbkø.md`
## 1. Konsept
Et felles, sentralisert køsystem for alle asynkrone bakgrunnsjobber i Sidelinja. Bygget som en enkel tabell i PostgreSQL med Rust-workers som konsumerer jobber. Ingen ekstern message broker — PostgreSQL er køen.
## 2. Hvorfor PostgreSQL?
- Allerede i stacken, ingen ny infrastruktur å drifte
- Transaksjonell garanti: jobben og resultatet kan committes sammen med dataendringer
- Lavt volum (titalls jobber/time) gjør polling neglisjerbart
- Enkel feilsøking via SQL (`SELECT * FROM job_queue WHERE status = 'error'`)
- `SELECT ... FOR UPDATE SKIP LOCKED` gir trygg concurrent polling uten låsekonflikt
## 3. Datastruktur
```sql
CREATE TYPE job_status AS ENUM ('pending', 'running', 'completed', 'error', 'retry');
CREATE TABLE job_queue (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
collection_node_id UUID NOT NULL REFERENCES nodes(id) ON DELETE CASCADE, -- Samlings-node jobben tilhører
job_type TEXT NOT NULL, -- 'whisper_transcribe', 'openrouter_analyze', 'stats_parse', 'research_clip'
payload JSONB NOT NULL, -- Inputdata (filsti, tekst, tema_id, etc.)
status job_status NOT NULL DEFAULT 'pending',
priority SMALLINT NOT NULL DEFAULT 0, -- Høyere = viktigere
result JSONB, -- Resultatet ved fullført jobb
error_msg TEXT, -- Feilmelding ved error
attempts SMALLINT NOT NULL DEFAULT 0,
max_attempts SMALLINT NOT NULL DEFAULT 3,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
started_at TIMESTAMPTZ,
completed_at TIMESTAMPTZ,
scheduled_for TIMESTAMPTZ NOT NULL DEFAULT now() -- For utsatte jobber / retry med backoff
);
CREATE INDEX idx_job_queue_pending ON job_queue (priority DESC, scheduled_for ASC)
WHERE status IN ('pending', 'retry');
```
## 4. Worker-arkitektur (Rust)
### 4.1 Designprinsipp: Orkestrator, ikke prosesseringsmotor
Workeren gjør lite tung prosessering selv. Den er en **orkestrator** som spawner
CLI-verktøy (`Command::new("synops-X")`) som gjør selve jobben.
**Kontrakt mellom maskinrommet og CLI-verktøy:**
- **Stdout** → jobbresultat (JSON). Parses og lagres i `job_queue.result`.
- **Stderr** → feillogg. Logges av maskinrommet.
- **Exitkode** → status. 0 = suksess, != 0 = feil.
- **`--write` flag** → CLI-verktøyet skriver til PG selv. Uten flagget: kun stdout.
**Hva maskinrommet beholder inline:**
- Payload-parsing og validering
- Sikkerhetskontroller (kill switch, rate limiting, loop-prevensjon for agent)
- Voice/model-oppslag fra node metadata (orchestrator-logikk)
- STDB-synk etter CLI fullført (sanntidsvisning)
- Jobbstatus-håndtering (complete/fail/retry)
| Jobbtype | CLI-verktøy | Maskinrommet beholder |
|---|---|---|
| `whisper_transcribe` | `synops-transcribe` | — |
| `agent_respond` | `synops-respond` | Kill switch, rate limit, loop-prevensjon, STDB-synk |
| `suggest_edges` | `synops-suggest-edges` | — |
| `summarize_communication` | `synops-summarize` | — |
| `tts_generate` | `synops-tts` | Voice-oppslag fra node metadata, STDB-synk |
| `audio_process` | `synops-audio` | STDB-synk |
| `render_article` | `synops-render` | — |
| `render_index` | `synops-render` | — |
| `ai_process` | *(inline — mangler CLI)* | Alt (planlagt migrasjon) |
**Felles dispatcher-kode:** `cli_dispatch.rs``run_cli_tool()` og env-helpers.
Ny jobbtype = nytt CLI-verktøy + tynn dispatcher-handler. Ref: `docs/retninger/unix_filosofi.md`.
### 4.2 Én worker, prioritetsstyrt
Én enkelt worker-prosess håndterer **alle jobbtyper**. Prioritering skjer via `priority`-kolonnen i køen — SQL-spørringen plukker alltid viktigste jobb først. Ingen behov for separate prosesser per jobbtype.
```
┌──────────────────────────────────────────────────┐
│ Rust Worker (sidelinja-worker) │
│ │
│ Konfigurasjon: │
│ --max-concurrent 3 (samtidige jobber) │
│ --poll-interval 1s │
│ │
│ Loop (per ledig slot): │
│ 1. SELECT ... FOR UPDATE SKIP LOCKED │
│ WHERE status IN ('pending','retry') │
│ AND scheduled_for <= now() │
│ ORDER BY priority DESC, scheduled_for │
│ LIMIT 1 │
│ │
│ 2. UPDATE status = 'running' │
│ 3. Dispatch til handler basert på job_type │
│ 4a. OK: UPDATE status = 'completed' │
│ 4b. Feil: attempts += 1 │
│ Hvis attempts < max_attempts: │
│ status = 'retry' │
│ scheduled_for = now() │
│ + backoff(attempts) │
│ Ellers: status = 'error' │
│ │
└──────────────────────────────────────────────────┘
```
### 4.3 Prioritetsmodell
| Prioritet | Kategori | Eksempler |
|---|---|---|
| 10 | Brukerrettet / sanntid | `dictation_cleanup`, `research_clip` |
| 5 | Normal | `whisper_transcribe`, `openrouter_analyze`, `srt_parse` |
| 1 | Bakgrunn | `stats_parse`, `generate_embeddings`, `prompt_eval` |
Verdiene er veiledende — SvelteKit setter prioritet ved opprettelse basert på kontekst. En manuelt trigget transkripsjon kan få høyere prioritet enn en automatisk nattjobb.
### 4.4 Ressursstyring (implementert, oppgave 15.5)
#### Concurrency-kontroll
* **Semaphore:** Maks 3 samtidige jobber (tokio semaphore). Forhindrer at workeren overbelaster serveren.
* **CPU-vektgrense:** Hver jobbtype har en `cpu_weight` (15). Totalt maks 8 (passer 8 vCPU). En Whisper-jobb (vekt 5) blokkerer andre tunge jobber men tillater lette HTTP-kall (vekt 1).
* **Per-type max_concurrent:** Begrenser hvor mange jobber av samme type som kan kjøre samtidig (f.eks. maks 1 Whisper, maks 2 render).
#### Prioritetsregler (`job_priority_rules`-tabell)
Konfigurerbare regler per jobbtype, lagret i PG og cachet i minnet:
| Felt | Beskrivelse |
|------|-------------|
| `base_priority` | Standard prioritet (010, høyere = viktigere) |
| `livekit_priority_adj` | Prioritetsjustering under aktive LiveKit-sesjoner (typisk negativ) |
| `cpu_weight` | Ressursvekt 15 (brukes mot MAX_TOTAL_WEIGHT=8) |
| `max_concurrent` | Maks samtidige jobber av denne typen (0=ubegrenset) |
| `timeout_seconds` | Timeout per jobb (0=default 600s) |
| `block_during_livekit` | Om jobben skal utsettes helt under LiveKit-sesjoner |
Regler kan endres via `POST /admin/resources/update_rule` uten restart.
#### Resource Governor (LiveKit-bevisst)
Workeren sjekker om det finnes aktive LiveKit-rom (kommunikasjonsnoder med `metadata.livekit_active = true`). Status caches i 10 sekunder.
Når LiveKit er aktivt:
* Jobbtyper med `block_during_livekit = true` utsettes 30 sekunder
* Andre jobber nedprioriteres med `livekit_priority_adj` (f.eks. Whisper: -3)
* Totalt CPU-vektbudsjett begrenser tunge jobber (Whisper vekt=5 + LiveKit → kun lette jobber i parallell)
#### Timeout
Hver jobb har individuell timeout basert på `timeout_seconds` i prioritetsreglene. Default 600s (10 min). Ved timeout: jobben feiler og kan retryes.
#### Disk-overvåking
Bakgrunnsloop som sjekker diskbruk hvert 60. sekund:
| Terskel | Alert-nivå | Handling |
|---------|-----------|----------|
| <85% | OK | Ingen normal drift |
| 85% | `warning` | Loggmelding, vurder pruning |
| 90% | `critical` | Logges som warning, aggressiv pruning anbefalt |
| 95% | `emergency` | Logges som error, umiddelbar handling påkrevd |
Status lagres i `disk_status_log`-tabellen (siste 1000 målinger beholdes). Admin-API: `GET /admin/resources/disk`.
* **Skalering senere:** To nivåer:
1. **Worker-splitting:** Workeren splittes til to binærer fra samme crate (`worker-heavy`, `worker-light`) via CLI-argument (`--types whisper_transcribe,openrouter_analyze`). Ingen kodeendring nødvendig kun deploy-konfigurasjon.
2. **Compute-separasjon:** Flytt Rust-worker + faster-whisper til en separat Hetzner-node (evt. ARM/Ampere for pris/ytelse). LiveKit er ekstremt sensitivt for CPU-stotring ved samtidig WebRTC og Whisper samme maskin risikerer vi audio glitches uansett cgroups. Worker-noden poller jobbkøen i PostgreSQL over internt nettverk arkitekturen støtter dette uten kodeendring.
**Backoff-strategi:** Eksponentiell: `30s × 2^(attempts-1)` (30s, 60s, 120s).
## 5. Jobbtyper
| `job_type` | Konsument | Beskrivelse |
|---|---|---|
| `whisper_transcribe` | Universell lyd-tjeneste | Transkriber lydfil via faster-whisper (SRT) `transcription_segments` |
| `openrouter_analyze` | Podcastfabrikken | Metadata-uttrekk fra transkripsjon |
| `ai_text_process` | Editor (AI-knapp) | Rens, oppsummer, trekk ut fakta, skriv om (se `docs/proposals/editor.md`) |
| `stats_parse` | Podcast-Statistikk | Batch-prosesser Caddy-logger |
| `meeting_summarize` | Møterommet | Generer møtereferat og action points fra transkripsjon |
| `valgomat_generate_profile` | Valgomat | Generer syntetiske kandidatprofiler fra partiprogrammer |
| `valgomat_moderation` | Valgomat | Semantisk deduplisering og nøytralitetsvask av brukerspørsmål |
| `dictation_cleanup` | Lydmeldinger | AI-opprydding av diktert transkripsjon til strukturert notat |
| `generate_embeddings` | Kunnskaps-Bridge | Generer vector embeddings for noder (pgvector) |
| `prompt_eval` | Prompt-Laboratorium | Batch-evaluering av testsett mot valgte modeller |
| `suggest_edges` | Kunnskapsgraf (AI) | Analyser innhold via LLM, foreslå topics og mentions-edges. Trigges automatisk ved opprettelse av content-noder med tilstrekkelig tekst |
| `summarize_communication` | Oppsummering (AI) | Generer AI-sammendrag av kommunikasjonsnode (chat/møte). Oppretter content-node med summary-edge tilbake. Trigges via `/intentions/summarize` |
| `url_ingest` | Web Clipper (proposal) | Hent URL, oppsummer via AI, opprett research-klipp med graf-koblinger |
| `generate_waveform` | Waveforms (proposal) | Generer audio-peaks fra lydfil for visuell bølgeform |
## 6. Tilgangsisolasjon
Alle jobber merkes med `collection_node_id`. Rust-workers kjører som superuser (bypasser RLS) og sikrer isolasjon i applikasjonskode:
* Worker leser `collection_node_id` fra jobben og bruker det til å lagre resultater tilbake i riktig samlings-node
* Per samlings-node config (AI-prompts, navnelister) hentes fra samlings-nodens JSONB-metadata
* Feilede jobber vises kun for brukere med tilgang til samlings-noden (via node_access) i admin-visningen
## 7. Observabilitet og admin-API
### Implementert (oppgave 15.3)
Admin-oversikt over jobbkøen med filtrering og handlinger.
**API-endepunkter:**
- `GET /admin/jobs?status=&type=&collection_id=&limit=&offset=` paginert jobbliste med filtre
- `POST /intentions/retry_job` `{ job_id }` sett feilet/retry-jobb tilbake til pending
- `POST /intentions/cancel_job` `{ job_id }` avbryt ventende/retry-jobb
**Frontend:** `/admin/jobs` statusoppsummering (antall per status), filter type/status,
tabell med alle felter, retry/avbryt-knapper. Poller hvert 5. sekund.
**Kode:** `jobs.rs` (`list_jobs`, `count_by_status`, `distinct_job_types`, `retry_job`, `cancel_job`),
`intentions.rs` (API-handlers), `frontend/src/routes/admin/jobs/+page.svelte`
### Implementert (oppgave 15.5): Ressursstyring
**API-endepunkter:**
- `GET /admin/resources` samlet ressursstatus (disk, LiveKit, vekt, regler, kjørende jobber)
- `GET /admin/resources/disk` disk-status med siste 60 historiske målinger
- `POST /admin/resources/update_rule` oppdater/opprett prioritetsregel
**Kode:** `resources.rs` (prioritetsregler, governor, disk-overvåking),
`jobs.rs` (semaphore, vektbasert concurrency, LiveKit-sjekk i worker-loop),
`intentions.rs` (admin-handlers for `/admin/resources/*`)
**Migrasjon:** `014_resource_governor.sql` `job_priority_rules` + `disk_status_log`
- Valgfritt: SpacetimeDB-event ved statusendring slik at UI kan vise fremdrift i sanntid (f.eks. "Transkriberer... 2/3 forsøk")
## 8. Instruks for Claude Code
- Én binær: `sidelinja-worker`. Én Rust-crate med polling-loop + handler-dispatch
- Hver jobbtype implementeres som en handler-funksjon som registreres i en `HashMap<String, Handler>`
- Bruk `tokio` med semaphore for concurrency-kontroll (`--max-concurrent`)
- Aldri lagre lydfiler i `payload` bruk filstier
- Opprett alltid jobber med riktig `collection_node_id` hent fra konteksten (innlogget bruker, webhook, etc.)
- Ved `stats_parse`: denne erstatter den frittstående cronjobben beskrevet i podcast_statistikk.md bruk jobbkøen med `scheduled_for` for periodisk kjøring
- Splitt til flere binærer kun hvis det blir eksplisitt bedt om start med én