synops/maskinrommet/src/maintenance.rs
vegard aee6adc425 Fjern STDB-skrivestien: all skriving går kun til PG (oppgave 22.3)
SpacetimeDB var brukt som «instant feedback»-lag mellom portvokteren
og frontend. Nå som PG NOTIFY-triggere og WebSocket er på plass
(oppgave 22.1–22.2), er STDB-skrivestien overflødig.

Endringer:
- intentions.rs: Alle CRUD-operasjoner (create/update/delete node/edge)
  skriver nå synkront til PG i stedet for STDB-først + async PG-jobbkø.
  PG NOTIFY-triggere gir umiddelbar sanntidsoppdatering til klienter.
  Tilgangsgivende edges (owner/admin/member_of/reader) bruker transaksjon
  med recompute_access direkte i handleren.
- maintenance.rs: Fjernet StdbClient fra alle funksjoner. Varsler
  opprettes/oppdateres/slettes direkte i PG.
- agent.rs, audio.rs, tts.rs, ai_process.rs: Fjernet STDB-synk etter
  CLI-verktøy-kjøring. PG NOTIFY dekker sanntidsvisning.
- pg_writes.rs: Fjernet sync_node_access_to_stdb. access_changed
  NOTIFY-trigger håndterer dette.
- workspace.rs: Synkrone PG-skrivinger med recompute_access.
- summarize.rs, ai_edges.rs: Fjernet StdbClient fra signaturer.
- jobs.rs: Fjernet StdbClient fra dispatch og start_worker.
- main.rs: Fjernet STDB-initialisering, warmup, stdb_monitor.
  StdbClient fjernet fra AppState. stdb.rs beholdt som død kode
  (fjernes i oppgave 22.4).
- health.rs: Fjernet STDB-helsesjekk fra dashboard.
- Slettet warmup.rs og stdb_monitor.rs (PG→STDB-synk ikke lenger
  relevant).
- docs/retninger/datalaget.md: Markert fase M3 som fullført.
2026-03-18 13:11:33 +00:00

303 lines
9.6 KiB
Rust

// Graceful shutdown — vedlikeholdsmodus med koordinert nedstengning.
//
// Flyt:
// 1. Admin kaller initiate_maintenance med tidspunkt
// 2. System oppretter systemvarsel → frontend viser nedtelling via PG NOTIFY
// 3. Bakgrunnsoppgave venter til vedlikeholdstidspunkt
// 4. Setter maintenance_active → blokkerer nye LiveKit-rom + jobbkø stopper dequeue
// 5. Venter på at kjørende jobber fullføres (med timeout)
// 6. Avslutter prosessen → systemd restarter
//
// Ref: docs/concepts/adminpanelet.md, oppgave 15.2
use chrono::{DateTime, Utc};
use serde::Serialize;
use sqlx::PgPool;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use tokio::sync::Mutex;
use uuid::Uuid;
/// Delt vedlikeholdstilstand — klones inn i AppState.
#[derive(Clone)]
pub struct MaintenanceState {
/// Satt til true når vedlikeholdstidspunktet er nådd.
/// Når true: jobbkø slutter å dequeue, nye LiveKit-tokens avvises.
pub active: Arc<AtomicBool>,
/// Satt til true når admin har initiert vedlikehold (men tidspunktet
/// trenger ikke være nådd ennå). Brukes for å vise status.
pub initiated: Arc<AtomicBool>,
/// Vedlikeholdstidspunkt og varsel-node-id.
inner: Arc<Mutex<MaintenanceInner>>,
}
struct MaintenanceInner {
scheduled_at: Option<DateTime<Utc>>,
announcement_node_id: Option<Uuid>,
initiated_by: Option<Uuid>,
/// Handle for å avbryte den planlagte shutdown-tasken.
abort_handle: Option<tokio::task::AbortHandle>,
}
/// Status-respons for admin-panelet.
#[derive(Serialize)]
pub struct MaintenanceStatus {
pub initiated: bool,
pub active: bool,
pub scheduled_at: Option<String>,
pub announcement_node_id: Option<Uuid>,
pub initiated_by: Option<Uuid>,
pub running_jobs: Vec<RunningJob>,
}
#[derive(Serialize)]
pub struct RunningJob {
pub id: Uuid,
pub job_type: String,
pub started_at: Option<String>,
pub collection_node_id: Option<Uuid>,
}
impl MaintenanceState {
pub fn new() -> Self {
Self {
active: Arc::new(AtomicBool::new(false)),
initiated: Arc::new(AtomicBool::new(false)),
inner: Arc::new(Mutex::new(MaintenanceInner {
scheduled_at: None,
announcement_node_id: None,
initiated_by: None,
abort_handle: None,
})),
}
}
/// Er vedlikeholdsmodus aktivert? (Tidspunktet er nådd.)
pub fn is_active(&self) -> bool {
self.active.load(Ordering::Relaxed)
}
/// Er vedlikehold initiert? (Planlagt, men kanskje ikke nådd ennå.)
pub fn is_initiated(&self) -> bool {
self.initiated.load(Ordering::Relaxed)
}
/// Hent full status inkludert kjørende jobber.
pub async fn status(&self, db: &PgPool) -> Result<MaintenanceStatus, sqlx::Error> {
let inner = self.inner.lock().await;
let running_jobs = fetch_running_jobs(db).await?;
Ok(MaintenanceStatus {
initiated: self.is_initiated(),
active: self.is_active(),
scheduled_at: inner.scheduled_at.map(|dt| dt.to_rfc3339()),
announcement_node_id: inner.announcement_node_id,
initiated_by: inner.initiated_by,
running_jobs,
})
}
/// Initier vedlikehold: sett tidspunkt, opprett varsel, start nedtelling.
pub async fn initiate(
&self,
db: &PgPool,
scheduled_at: DateTime<Utc>,
initiated_by: Uuid,
) -> Result<Uuid, String> {
if self.is_initiated() {
return Err("Vedlikehold er allerede initiert".to_string());
}
// Opprett systemvarsel
let node_id = Uuid::now_v7();
let metadata = serde_json::json!({
"announcement_type": "critical",
"scheduled_at": scheduled_at.to_rfc3339(),
"blocks_new_sessions": true,
"maintenance_shutdown": true,
});
// PG — persistent lagring (NOTIFY-trigger sender sanntidsoppdatering)
sqlx::query(
r#"INSERT INTO nodes (id, node_kind, title, content, visibility, metadata, created_by)
VALUES ($1, 'system_announcement', 'Planlagt vedlikehold',
'Systemet stenges for vedlikehold. Lagre arbeidet ditt.',
'open', $2, $3)"#,
)
.bind(node_id)
.bind(&metadata)
.bind(initiated_by)
.execute(db)
.await
.map_err(|e| format!("PG-feil: {e}"))?;
tracing::info!(
announcement_id = %node_id,
scheduled_at = %scheduled_at,
initiated_by = %initiated_by,
"Vedlikehold initiert"
);
// Start bakgrunnsoppgave for shutdown-koordinering
let state = self.clone();
let db2 = db.clone();
let handle = tokio::spawn(async move {
shutdown_coordinator(state, db2, scheduled_at, node_id).await;
});
// Lagre tilstand
let mut inner = self.inner.lock().await;
inner.scheduled_at = Some(scheduled_at);
inner.announcement_node_id = Some(node_id);
inner.initiated_by = Some(initiated_by);
inner.abort_handle = Some(handle.abort_handle());
self.initiated.store(true, Ordering::Relaxed);
Ok(node_id)
}
/// Avbryt planlagt vedlikehold.
pub async fn cancel(
&self,
db: &PgPool,
) -> Result<(), String> {
if !self.is_initiated() {
return Err("Ingen vedlikehold er initiert".to_string());
}
let mut inner = self.inner.lock().await;
// Avbryt bakgrunnsoppgaven
if let Some(handle) = inner.abort_handle.take() {
handle.abort();
}
// Slett varselet fra PG
if let Some(nid) = inner.announcement_node_id.take() {
if let Err(e) = sqlx::query("DELETE FROM nodes WHERE id = $1")
.bind(nid)
.execute(db)
.await
{
tracing::warn!("Kunne ikke slette varsel fra PG: {e}");
}
}
inner.scheduled_at = None;
inner.initiated_by = None;
self.initiated.store(false, Ordering::Relaxed);
self.active.store(false, Ordering::Relaxed);
tracing::info!("Vedlikehold avbrutt");
Ok(())
}
}
/// Hent kjørende jobber fra job_queue.
async fn fetch_running_jobs(db: &PgPool) -> Result<Vec<RunningJob>, sqlx::Error> {
let rows = sqlx::query_as::<_, (Uuid, String, Option<chrono::DateTime<Utc>>, Option<Uuid>)>(
"SELECT id, job_type, started_at, collection_node_id FROM job_queue WHERE status = 'running'"
)
.fetch_all(db)
.await?;
Ok(rows.into_iter().map(|(id, job_type, started_at, collection_node_id)| {
RunningJob {
id,
job_type,
started_at: started_at.map(|dt| dt.to_rfc3339()),
collection_node_id,
}
}).collect())
}
/// Bakgrunnsoppgave som koordinerer nedstengningen.
async fn shutdown_coordinator(
state: MaintenanceState,
db: PgPool,
scheduled_at: DateTime<Utc>,
announcement_id: Uuid,
) {
// Vent til vedlikeholdstidspunkt
let wait_duration = (scheduled_at - Utc::now()).to_std().unwrap_or_default();
if !wait_duration.is_zero() {
tracing::info!(
seconds = wait_duration.as_secs(),
"Venter til vedlikeholdstidspunkt"
);
tokio::time::sleep(wait_duration).await;
}
// Aktiver vedlikeholdsmodus
state.active.store(true, Ordering::Relaxed);
tracing::warn!("Vedlikeholdsmodus AKTIV — nye jobber og LiveKit-rom blokkert");
// Oppdater varselet til å reflektere at vedlikehold er i gang
let active_meta = serde_json::json!({
"announcement_type": "critical",
"scheduled_at": scheduled_at.to_rfc3339(),
"blocks_new_sessions": true,
"maintenance_shutdown": true,
"maintenance_active": true,
});
let _ = sqlx::query(
"UPDATE nodes SET title = $2, content = $3, metadata = $4 WHERE id = $1",
)
.bind(announcement_id)
.bind("Vedlikehold pågår")
.bind("Systemet stenger ned. Vent til vedlikeholdet er ferdig.")
.bind(&active_meta)
.execute(&db)
.await;
// Vent på at kjørende jobber fullføres (maks 5 minutter)
let timeout = std::time::Duration::from_secs(300);
let start = std::time::Instant::now();
loop {
match fetch_running_jobs(&db).await {
Ok(jobs) if jobs.is_empty() => {
tracing::info!("Ingen kjørende jobber — klar for restart");
break;
}
Ok(jobs) => {
tracing::info!(
count = jobs.len(),
"Venter på {} kjørende jobber",
jobs.len()
);
}
Err(e) => {
tracing::error!("Feil ved sjekk av kjørende jobber: {e}");
}
}
if start.elapsed() > timeout {
tracing::warn!("Timeout (5 min) — tvinger nedstengning med kjørende jobber");
break;
}
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
}
// Slett varselet
if let Err(e) = sqlx::query("DELETE FROM nodes WHERE id = $1")
.bind(announcement_id)
.execute(&db)
.await
{
tracing::warn!("Kunne ikke slette varsel fra PG: {e}");
}
tracing::warn!("Avslutter prosessen for vedlikehold — systemd vil restarte");
// Gi litt tid til at siste logglinjer skrives
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
// Avslutt prosessen — systemd vil restarte maskinrommet
std::process::exit(0);
}