Erstatter fire-and-forget tokio::spawn() i skrivestien med jobbkø-basert persistering. Alle PG-skriveoperasjoner (insert/update/delete for noder og edges) går nå gjennom den eksisterende jobbkøen som allerede har: - Eksponentiell backoff (30s × 2^n) ved feil - Dead letter queue (status='error' etter max_attempts=3) - Admin-API for overvåking, manuell retry og avbryt - Ressursstyring og prioritetsregler Ny modul pg_writes.rs med: - 5 enqueue-funksjoner (erstatter spawn_pg_*) - 5 job-handlere for dispatch i worker-loopen - Full paritet med gammel logikk: tilgangsgivende edges kjører recompute_access i transaksjon, synker til STDB, trigger rendering Før: PG-skrivefeil logget og glemt → data kun i STDB, tapt fra PG. Nå: automatisk retry → admin-synlig dead letter → manuell recovery.
235 lines
14 KiB
Markdown
235 lines
14 KiB
Markdown
# Infrastruktur: Jobbkø (PostgreSQL-basert)
|
||
**Filsti:** `docs/infra/jobbkø.md`
|
||
|
||
## 1. Konsept
|
||
Et felles, sentralisert køsystem for alle asynkrone bakgrunnsjobber i Sidelinja. Bygget som en enkel tabell i PostgreSQL med Rust-workers som konsumerer jobber. Ingen ekstern message broker — PostgreSQL er køen.
|
||
|
||
## 2. Hvorfor PostgreSQL?
|
||
- Allerede i stacken, ingen ny infrastruktur å drifte
|
||
- Transaksjonell garanti: jobben og resultatet kan committes sammen med dataendringer
|
||
- Lavt volum (titalls jobber/time) gjør polling neglisjerbart
|
||
- Enkel feilsøking via SQL (`SELECT * FROM job_queue WHERE status = 'error'`)
|
||
- `SELECT ... FOR UPDATE SKIP LOCKED` gir trygg concurrent polling uten låsekonflikt
|
||
|
||
## 3. Datastruktur
|
||
|
||
```sql
|
||
CREATE TYPE job_status AS ENUM ('pending', 'running', 'completed', 'error', 'retry');
|
||
|
||
CREATE TABLE job_queue (
|
||
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||
collection_node_id UUID NOT NULL REFERENCES nodes(id) ON DELETE CASCADE, -- Samlings-node jobben tilhører
|
||
job_type TEXT NOT NULL, -- 'whisper_transcribe', 'openrouter_analyze', 'stats_parse', 'research_clip'
|
||
payload JSONB NOT NULL, -- Inputdata (filsti, tekst, tema_id, etc.)
|
||
status job_status NOT NULL DEFAULT 'pending',
|
||
priority SMALLINT NOT NULL DEFAULT 0, -- Høyere = viktigere
|
||
result JSONB, -- Resultatet ved fullført jobb
|
||
error_msg TEXT, -- Feilmelding ved error
|
||
attempts SMALLINT NOT NULL DEFAULT 0,
|
||
max_attempts SMALLINT NOT NULL DEFAULT 3,
|
||
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
|
||
started_at TIMESTAMPTZ,
|
||
completed_at TIMESTAMPTZ,
|
||
scheduled_for TIMESTAMPTZ NOT NULL DEFAULT now() -- For utsatte jobber / retry med backoff
|
||
);
|
||
|
||
CREATE INDEX idx_job_queue_pending ON job_queue (priority DESC, scheduled_for ASC)
|
||
WHERE status IN ('pending', 'retry');
|
||
```
|
||
|
||
## 4. Worker-arkitektur (Rust)
|
||
|
||
### 4.1 Designprinsipp: Orkestrator, ikke prosesseringsmotor
|
||
Workeren gjør lite tung prosessering selv. Den er en **orkestrator** som spawner
|
||
CLI-verktøy (`Command::new("synops-X")`) som gjør selve jobben.
|
||
|
||
**Kontrakt mellom maskinrommet og CLI-verktøy:**
|
||
- **Stdout** → jobbresultat (JSON). Parses og lagres i `job_queue.result`.
|
||
- **Stderr** → feillogg. Logges av maskinrommet.
|
||
- **Exitkode** → status. 0 = suksess, != 0 = feil.
|
||
- **`--write` flag** → CLI-verktøyet skriver til PG selv. Uten flagget: kun stdout.
|
||
|
||
**Hva maskinrommet beholder inline:**
|
||
- Payload-parsing og validering
|
||
- Sikkerhetskontroller (kill switch, rate limiting, loop-prevensjon for agent)
|
||
- Voice/model-oppslag fra node metadata (orchestrator-logikk)
|
||
- STDB-synk etter CLI fullført (sanntidsvisning)
|
||
- Jobbstatus-håndtering (complete/fail/retry)
|
||
|
||
| Jobbtype | CLI-verktøy | Maskinrommet beholder |
|
||
|---|---|---|
|
||
| `whisper_transcribe` | `synops-transcribe` | — |
|
||
| `agent_respond` | `synops-respond` | Kill switch, rate limit, loop-prevensjon, STDB-synk |
|
||
| `suggest_edges` | `synops-suggest-edges` | — |
|
||
| `summarize_communication` | `synops-summarize` | — |
|
||
| `tts_generate` | `synops-tts` | Voice-oppslag fra node metadata, STDB-synk |
|
||
| `audio_process` | `synops-audio` | STDB-synk |
|
||
| `render_article` | `synops-render` | — |
|
||
| `render_index` | `synops-render` | — |
|
||
| `ai_process` | *(inline — mangler CLI)* | Alt (planlagt migrasjon) |
|
||
|
||
**Felles dispatcher-kode:** `cli_dispatch.rs` — `run_cli_tool()` og env-helpers.
|
||
|
||
Ny jobbtype = nytt CLI-verktøy + tynn dispatcher-handler. Ref: `docs/retninger/unix_filosofi.md`.
|
||
|
||
### 4.2 Én worker, prioritetsstyrt
|
||
Én enkelt worker-prosess håndterer **alle jobbtyper**. Prioritering skjer via `priority`-kolonnen i køen — SQL-spørringen plukker alltid viktigste jobb først. Ingen behov for separate prosesser per jobbtype.
|
||
|
||
```
|
||
┌──────────────────────────────────────────────────┐
|
||
│ Rust Worker (sidelinja-worker) │
|
||
│ │
|
||
│ Konfigurasjon: │
|
||
│ --max-concurrent 3 (samtidige jobber) │
|
||
│ --poll-interval 1s │
|
||
│ │
|
||
│ Loop (per ledig slot): │
|
||
│ 1. SELECT ... FOR UPDATE SKIP LOCKED │
|
||
│ WHERE status IN ('pending','retry') │
|
||
│ AND scheduled_for <= now() │
|
||
│ ORDER BY priority DESC, scheduled_for │
|
||
│ LIMIT 1 │
|
||
│ │
|
||
│ 2. UPDATE status = 'running' │
|
||
│ 3. Dispatch til handler basert på job_type │
|
||
│ 4a. OK: UPDATE status = 'completed' │
|
||
│ 4b. Feil: attempts += 1 │
|
||
│ Hvis attempts < max_attempts: │
|
||
│ status = 'retry' │
|
||
│ scheduled_for = now() │
|
||
│ + backoff(attempts) │
|
||
│ Ellers: status = 'error' │
|
||
│ │
|
||
└──────────────────────────────────────────────────┘
|
||
```
|
||
|
||
### 4.3 Prioritetsmodell
|
||
|
||
| Prioritet | Kategori | Eksempler |
|
||
|---|---|---|
|
||
| 10 | Brukerrettet / sanntid | `dictation_cleanup`, `research_clip` |
|
||
| 5 | Normal | `whisper_transcribe`, `openrouter_analyze`, `srt_parse` |
|
||
| 1 | Bakgrunn | `stats_parse`, `generate_embeddings`, `prompt_eval` |
|
||
|
||
Verdiene er veiledende — SvelteKit setter prioritet ved opprettelse basert på kontekst. En manuelt trigget transkripsjon kan få høyere prioritet enn en automatisk nattjobb.
|
||
|
||
### 4.4 Ressursstyring (implementert, oppgave 15.5)
|
||
|
||
#### Concurrency-kontroll
|
||
* **Semaphore:** Maks 3 samtidige jobber (tokio semaphore). Forhindrer at workeren overbelaster serveren.
|
||
* **CPU-vektgrense:** Hver jobbtype har en `cpu_weight` (1–5). Totalt maks 8 (passer 8 vCPU). En Whisper-jobb (vekt 5) blokkerer andre tunge jobber men tillater lette HTTP-kall (vekt 1).
|
||
* **Per-type max_concurrent:** Begrenser hvor mange jobber av samme type som kan kjøre samtidig (f.eks. maks 1 Whisper, maks 2 render).
|
||
|
||
#### Prioritetsregler (`job_priority_rules`-tabell)
|
||
Konfigurerbare regler per jobbtype, lagret i PG og cachet i minnet:
|
||
|
||
| Felt | Beskrivelse |
|
||
|------|-------------|
|
||
| `base_priority` | Standard prioritet (0–10, høyere = viktigere) |
|
||
| `livekit_priority_adj` | Prioritetsjustering under aktive LiveKit-sesjoner (typisk negativ) |
|
||
| `cpu_weight` | Ressursvekt 1–5 (brukes mot MAX_TOTAL_WEIGHT=8) |
|
||
| `max_concurrent` | Maks samtidige jobber av denne typen (0=ubegrenset) |
|
||
| `timeout_seconds` | Timeout per jobb (0=default 600s) |
|
||
| `block_during_livekit` | Om jobben skal utsettes helt under LiveKit-sesjoner |
|
||
|
||
Regler kan endres via `POST /admin/resources/update_rule` uten restart.
|
||
|
||
#### Resource Governor (LiveKit-bevisst)
|
||
Workeren sjekker om det finnes aktive LiveKit-rom (kommunikasjonsnoder med `metadata.livekit_active = true`). Status caches i 10 sekunder.
|
||
|
||
Når LiveKit er aktivt:
|
||
* Jobbtyper med `block_during_livekit = true` utsettes 30 sekunder
|
||
* Andre jobber nedprioriteres med `livekit_priority_adj` (f.eks. Whisper: -3)
|
||
* Totalt CPU-vektbudsjett begrenser tunge jobber (Whisper vekt=5 + LiveKit → kun lette jobber i parallell)
|
||
|
||
#### Timeout
|
||
Hver jobb har individuell timeout basert på `timeout_seconds` i prioritetsreglene. Default 600s (10 min). Ved timeout: jobben feiler og kan retryes.
|
||
|
||
#### Disk-overvåking
|
||
Bakgrunnsloop som sjekker diskbruk hvert 60. sekund:
|
||
|
||
| Terskel | Alert-nivå | Handling |
|
||
|---------|-----------|----------|
|
||
| <85% | OK | Ingen — normal drift |
|
||
| ≥85% | `warning` | Loggmelding, vurder pruning |
|
||
| ≥90% | `critical` | Logges som warning, aggressiv pruning anbefalt |
|
||
| ≥95% | `emergency` | Logges som error, umiddelbar handling påkrevd |
|
||
|
||
Status lagres i `disk_status_log`-tabellen (siste 1000 målinger beholdes). Admin-API: `GET /admin/resources/disk`.
|
||
|
||
* **Skalering senere:** To nivåer:
|
||
1. **Worker-splitting:** Workeren splittes til to binærer fra samme crate (`worker-heavy`, `worker-light`) via CLI-argument (`--types whisper_transcribe,openrouter_analyze`). Ingen kodeendring nødvendig — kun deploy-konfigurasjon.
|
||
2. **Compute-separasjon:** Flytt Rust-worker + faster-whisper til en separat Hetzner-node (evt. ARM/Ampere for pris/ytelse). LiveKit er ekstremt sensitivt for CPU-stotring — ved samtidig WebRTC og Whisper på samme maskin risikerer vi audio glitches uansett cgroups. Worker-noden poller jobbkøen i PostgreSQL over internt nettverk — arkitekturen støtter dette uten kodeendring.
|
||
|
||
**Backoff-strategi:** Eksponentiell: `30s × 2^(attempts-1)` (30s, 60s, 120s).
|
||
|
||
## 5. Jobbtyper
|
||
|
||
| `job_type` | Konsument | Beskrivelse |
|
||
|---|---|---|
|
||
| `whisper_transcribe` | Universell lyd-tjeneste | Transkriber lydfil via faster-whisper (SRT) → `transcription_segments` |
|
||
| `openrouter_analyze` | Podcastfabrikken | Metadata-uttrekk fra transkripsjon |
|
||
| `ai_text_process` | Editor (AI-knapp) | Rens, oppsummer, trekk ut fakta, skriv om (se `docs/proposals/editor.md`) |
|
||
| `stats_parse` | Podcast-Statistikk | Batch-prosesser Caddy-logger |
|
||
| `meeting_summarize` | Møterommet | Generer møtereferat og action points fra transkripsjon |
|
||
| `valgomat_generate_profile` | Valgomat | Generer syntetiske kandidatprofiler fra partiprogrammer |
|
||
| `valgomat_moderation` | Valgomat | Semantisk deduplisering og nøytralitetsvask av brukerspørsmål |
|
||
| `dictation_cleanup` | Lydmeldinger | AI-opprydding av diktert transkripsjon til strukturert notat |
|
||
| `generate_embeddings` | Kunnskaps-Bridge | Generer vector embeddings for noder (pgvector) |
|
||
| `prompt_eval` | Prompt-Laboratorium | Batch-evaluering av testsett mot valgte modeller |
|
||
| `suggest_edges` | Kunnskapsgraf (AI) | Analyser innhold via LLM, foreslå topics og mentions-edges. Trigges automatisk ved opprettelse av content-noder med tilstrekkelig tekst |
|
||
| `summarize_communication` | Oppsummering (AI) | Generer AI-sammendrag av kommunikasjonsnode (chat/møte). Oppretter content-node med summary-edge tilbake. Trigges via `/intentions/summarize` |
|
||
| `url_ingest` | Web Clipper (proposal) | Hent URL, oppsummer via AI, opprett research-klipp med graf-koblinger |
|
||
| `generate_waveform` | Waveforms (proposal) | Generer audio-peaks fra lydfil for visuell bølgeform |
|
||
| `pg_insert_node` | PG-skrivestien | Persister node til PostgreSQL med retry og dead letter (oppgave 12.3) |
|
||
| `pg_insert_edge` | PG-skrivestien | Persister edge til PostgreSQL, inkl. recompute_access for tilgangsgivende edges |
|
||
| `pg_update_node` | PG-skrivestien | Oppdater node i PostgreSQL med retry |
|
||
| `pg_delete_node` | PG-skrivestien | Slett node fra PostgreSQL med retry |
|
||
| `pg_delete_edge` | PG-skrivestien | Slett edge fra PostgreSQL med retry, invalider publiserings-cache |
|
||
|
||
## 6. Tilgangsisolasjon
|
||
Alle jobber merkes med `collection_node_id`. Rust-workers kjører som superuser (bypasser RLS) og sikrer isolasjon i applikasjonskode:
|
||
* Worker leser `collection_node_id` fra jobben og bruker det til å lagre resultater tilbake i riktig samlings-node
|
||
* Per samlings-node config (AI-prompts, navnelister) hentes fra samlings-nodens JSONB-metadata
|
||
* Feilede jobber vises kun for brukere med tilgang til samlings-noden (via node_access) i admin-visningen
|
||
|
||
## 7. Observabilitet og admin-API
|
||
|
||
### Implementert (oppgave 15.3)
|
||
|
||
Admin-oversikt over jobbkøen med filtrering og handlinger.
|
||
|
||
**API-endepunkter:**
|
||
- `GET /admin/jobs?status=&type=&collection_id=&limit=&offset=` — paginert jobbliste med filtre
|
||
- `POST /intentions/retry_job` `{ job_id }` — sett feilet/retry-jobb tilbake til pending
|
||
- `POST /intentions/cancel_job` `{ job_id }` — avbryt ventende/retry-jobb
|
||
|
||
**Frontend:** `/admin/jobs` — statusoppsummering (antall per status), filter på type/status,
|
||
tabell med alle felter, retry/avbryt-knapper. Poller hvert 5. sekund.
|
||
|
||
**Kode:** `jobs.rs` (`list_jobs`, `count_by_status`, `distinct_job_types`, `retry_job`, `cancel_job`),
|
||
`intentions.rs` (API-handlers), `frontend/src/routes/admin/jobs/+page.svelte`
|
||
|
||
### Implementert (oppgave 15.5): Ressursstyring
|
||
|
||
**API-endepunkter:**
|
||
- `GET /admin/resources` — samlet ressursstatus (disk, LiveKit, vekt, regler, kjørende jobber)
|
||
- `GET /admin/resources/disk` — disk-status med siste 60 historiske målinger
|
||
- `POST /admin/resources/update_rule` — oppdater/opprett prioritetsregel
|
||
|
||
**Kode:** `resources.rs` (prioritetsregler, governor, disk-overvåking),
|
||
`jobs.rs` (semaphore, vektbasert concurrency, LiveKit-sjekk i worker-loop),
|
||
`intentions.rs` (admin-handlers for `/admin/resources/*`)
|
||
|
||
**Migrasjon:** `014_resource_governor.sql` — `job_priority_rules` + `disk_status_log`
|
||
|
||
- Valgfritt: SpacetimeDB-event ved statusendring slik at UI kan vise fremdrift i sanntid (f.eks. "Transkriberer... 2/3 forsøk")
|
||
|
||
## 8. Instruks for Claude Code
|
||
- Én binær: `sidelinja-worker`. Én Rust-crate med polling-loop + handler-dispatch
|
||
- Hver jobbtype implementeres som en handler-funksjon som registreres i en `HashMap<String, Handler>`
|
||
- Bruk `tokio` med semaphore for concurrency-kontroll (`--max-concurrent`)
|
||
- Aldri lagre lydfiler i `payload` — bruk filstier
|
||
- Opprett alltid jobber med riktig `collection_node_id` — hent fra konteksten (innlogget bruker, webhook, etc.)
|
||
- Ved `stats_parse`: denne erstatter den frittstående cronjobben beskrevet i podcast_statistikk.md — bruk jobbkøen med `scheduled_for` for periodisk kjøring
|
||
- Splitt til flere binærer kun hvis det blir eksplisitt bedt om — start med én
|