SpacetimeDB var brukt som «instant feedback»-lag mellom portvokteren og frontend. Nå som PG NOTIFY-triggere og WebSocket er på plass (oppgave 22.1–22.2), er STDB-skrivestien overflødig. Endringer: - intentions.rs: Alle CRUD-operasjoner (create/update/delete node/edge) skriver nå synkront til PG i stedet for STDB-først + async PG-jobbkø. PG NOTIFY-triggere gir umiddelbar sanntidsoppdatering til klienter. Tilgangsgivende edges (owner/admin/member_of/reader) bruker transaksjon med recompute_access direkte i handleren. - maintenance.rs: Fjernet StdbClient fra alle funksjoner. Varsler opprettes/oppdateres/slettes direkte i PG. - agent.rs, audio.rs, tts.rs, ai_process.rs: Fjernet STDB-synk etter CLI-verktøy-kjøring. PG NOTIFY dekker sanntidsvisning. - pg_writes.rs: Fjernet sync_node_access_to_stdb. access_changed NOTIFY-trigger håndterer dette. - workspace.rs: Synkrone PG-skrivinger med recompute_access. - summarize.rs, ai_edges.rs: Fjernet StdbClient fra signaturer. - jobs.rs: Fjernet StdbClient fra dispatch og start_worker. - main.rs: Fjernet STDB-initialisering, warmup, stdb_monitor. StdbClient fjernet fra AppState. stdb.rs beholdt som død kode (fjernes i oppgave 22.4). - health.rs: Fjernet STDB-helsesjekk fra dashboard. - Slettet warmup.rs og stdb_monitor.rs (PG→STDB-synk ikke lenger relevant). - docs/retninger/datalaget.md: Markert fase M3 som fullført.
303 lines
9.6 KiB
Rust
303 lines
9.6 KiB
Rust
// Graceful shutdown — vedlikeholdsmodus med koordinert nedstengning.
|
|
//
|
|
// Flyt:
|
|
// 1. Admin kaller initiate_maintenance med tidspunkt
|
|
// 2. System oppretter systemvarsel → frontend viser nedtelling via PG NOTIFY
|
|
// 3. Bakgrunnsoppgave venter til vedlikeholdstidspunkt
|
|
// 4. Setter maintenance_active → blokkerer nye LiveKit-rom + jobbkø stopper dequeue
|
|
// 5. Venter på at kjørende jobber fullføres (med timeout)
|
|
// 6. Avslutter prosessen → systemd restarter
|
|
//
|
|
// Ref: docs/concepts/adminpanelet.md, oppgave 15.2
|
|
|
|
use chrono::{DateTime, Utc};
|
|
use serde::Serialize;
|
|
use sqlx::PgPool;
|
|
use std::sync::atomic::{AtomicBool, Ordering};
|
|
use std::sync::Arc;
|
|
use tokio::sync::Mutex;
|
|
use uuid::Uuid;
|
|
|
|
/// Delt vedlikeholdstilstand — klones inn i AppState.
|
|
#[derive(Clone)]
|
|
pub struct MaintenanceState {
|
|
/// Satt til true når vedlikeholdstidspunktet er nådd.
|
|
/// Når true: jobbkø slutter å dequeue, nye LiveKit-tokens avvises.
|
|
pub active: Arc<AtomicBool>,
|
|
|
|
/// Satt til true når admin har initiert vedlikehold (men tidspunktet
|
|
/// trenger ikke være nådd ennå). Brukes for å vise status.
|
|
pub initiated: Arc<AtomicBool>,
|
|
|
|
/// Vedlikeholdstidspunkt og varsel-node-id.
|
|
inner: Arc<Mutex<MaintenanceInner>>,
|
|
}
|
|
|
|
struct MaintenanceInner {
|
|
scheduled_at: Option<DateTime<Utc>>,
|
|
announcement_node_id: Option<Uuid>,
|
|
initiated_by: Option<Uuid>,
|
|
/// Handle for å avbryte den planlagte shutdown-tasken.
|
|
abort_handle: Option<tokio::task::AbortHandle>,
|
|
}
|
|
|
|
/// Status-respons for admin-panelet.
|
|
#[derive(Serialize)]
|
|
pub struct MaintenanceStatus {
|
|
pub initiated: bool,
|
|
pub active: bool,
|
|
pub scheduled_at: Option<String>,
|
|
pub announcement_node_id: Option<Uuid>,
|
|
pub initiated_by: Option<Uuid>,
|
|
pub running_jobs: Vec<RunningJob>,
|
|
}
|
|
|
|
#[derive(Serialize)]
|
|
pub struct RunningJob {
|
|
pub id: Uuid,
|
|
pub job_type: String,
|
|
pub started_at: Option<String>,
|
|
pub collection_node_id: Option<Uuid>,
|
|
}
|
|
|
|
impl MaintenanceState {
|
|
pub fn new() -> Self {
|
|
Self {
|
|
active: Arc::new(AtomicBool::new(false)),
|
|
initiated: Arc::new(AtomicBool::new(false)),
|
|
inner: Arc::new(Mutex::new(MaintenanceInner {
|
|
scheduled_at: None,
|
|
announcement_node_id: None,
|
|
initiated_by: None,
|
|
abort_handle: None,
|
|
})),
|
|
}
|
|
}
|
|
|
|
/// Er vedlikeholdsmodus aktivert? (Tidspunktet er nådd.)
|
|
pub fn is_active(&self) -> bool {
|
|
self.active.load(Ordering::Relaxed)
|
|
}
|
|
|
|
/// Er vedlikehold initiert? (Planlagt, men kanskje ikke nådd ennå.)
|
|
pub fn is_initiated(&self) -> bool {
|
|
self.initiated.load(Ordering::Relaxed)
|
|
}
|
|
|
|
/// Hent full status inkludert kjørende jobber.
|
|
pub async fn status(&self, db: &PgPool) -> Result<MaintenanceStatus, sqlx::Error> {
|
|
let inner = self.inner.lock().await;
|
|
let running_jobs = fetch_running_jobs(db).await?;
|
|
|
|
Ok(MaintenanceStatus {
|
|
initiated: self.is_initiated(),
|
|
active: self.is_active(),
|
|
scheduled_at: inner.scheduled_at.map(|dt| dt.to_rfc3339()),
|
|
announcement_node_id: inner.announcement_node_id,
|
|
initiated_by: inner.initiated_by,
|
|
running_jobs,
|
|
})
|
|
}
|
|
|
|
/// Initier vedlikehold: sett tidspunkt, opprett varsel, start nedtelling.
|
|
pub async fn initiate(
|
|
&self,
|
|
db: &PgPool,
|
|
scheduled_at: DateTime<Utc>,
|
|
initiated_by: Uuid,
|
|
) -> Result<Uuid, String> {
|
|
if self.is_initiated() {
|
|
return Err("Vedlikehold er allerede initiert".to_string());
|
|
}
|
|
|
|
// Opprett systemvarsel
|
|
let node_id = Uuid::now_v7();
|
|
|
|
let metadata = serde_json::json!({
|
|
"announcement_type": "critical",
|
|
"scheduled_at": scheduled_at.to_rfc3339(),
|
|
"blocks_new_sessions": true,
|
|
"maintenance_shutdown": true,
|
|
});
|
|
|
|
// PG — persistent lagring (NOTIFY-trigger sender sanntidsoppdatering)
|
|
sqlx::query(
|
|
r#"INSERT INTO nodes (id, node_kind, title, content, visibility, metadata, created_by)
|
|
VALUES ($1, 'system_announcement', 'Planlagt vedlikehold',
|
|
'Systemet stenges for vedlikehold. Lagre arbeidet ditt.',
|
|
'open', $2, $3)"#,
|
|
)
|
|
.bind(node_id)
|
|
.bind(&metadata)
|
|
.bind(initiated_by)
|
|
.execute(db)
|
|
.await
|
|
.map_err(|e| format!("PG-feil: {e}"))?;
|
|
|
|
tracing::info!(
|
|
announcement_id = %node_id,
|
|
scheduled_at = %scheduled_at,
|
|
initiated_by = %initiated_by,
|
|
"Vedlikehold initiert"
|
|
);
|
|
|
|
// Start bakgrunnsoppgave for shutdown-koordinering
|
|
let state = self.clone();
|
|
let db2 = db.clone();
|
|
let handle = tokio::spawn(async move {
|
|
shutdown_coordinator(state, db2, scheduled_at, node_id).await;
|
|
});
|
|
|
|
// Lagre tilstand
|
|
let mut inner = self.inner.lock().await;
|
|
inner.scheduled_at = Some(scheduled_at);
|
|
inner.announcement_node_id = Some(node_id);
|
|
inner.initiated_by = Some(initiated_by);
|
|
inner.abort_handle = Some(handle.abort_handle());
|
|
self.initiated.store(true, Ordering::Relaxed);
|
|
|
|
Ok(node_id)
|
|
}
|
|
|
|
/// Avbryt planlagt vedlikehold.
|
|
pub async fn cancel(
|
|
&self,
|
|
db: &PgPool,
|
|
) -> Result<(), String> {
|
|
if !self.is_initiated() {
|
|
return Err("Ingen vedlikehold er initiert".to_string());
|
|
}
|
|
|
|
let mut inner = self.inner.lock().await;
|
|
|
|
// Avbryt bakgrunnsoppgaven
|
|
if let Some(handle) = inner.abort_handle.take() {
|
|
handle.abort();
|
|
}
|
|
|
|
// Slett varselet fra PG
|
|
if let Some(nid) = inner.announcement_node_id.take() {
|
|
if let Err(e) = sqlx::query("DELETE FROM nodes WHERE id = $1")
|
|
.bind(nid)
|
|
.execute(db)
|
|
.await
|
|
{
|
|
tracing::warn!("Kunne ikke slette varsel fra PG: {e}");
|
|
}
|
|
}
|
|
|
|
inner.scheduled_at = None;
|
|
inner.initiated_by = None;
|
|
|
|
self.initiated.store(false, Ordering::Relaxed);
|
|
self.active.store(false, Ordering::Relaxed);
|
|
|
|
tracing::info!("Vedlikehold avbrutt");
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
/// Hent kjørende jobber fra job_queue.
|
|
async fn fetch_running_jobs(db: &PgPool) -> Result<Vec<RunningJob>, sqlx::Error> {
|
|
let rows = sqlx::query_as::<_, (Uuid, String, Option<chrono::DateTime<Utc>>, Option<Uuid>)>(
|
|
"SELECT id, job_type, started_at, collection_node_id FROM job_queue WHERE status = 'running'"
|
|
)
|
|
.fetch_all(db)
|
|
.await?;
|
|
|
|
Ok(rows.into_iter().map(|(id, job_type, started_at, collection_node_id)| {
|
|
RunningJob {
|
|
id,
|
|
job_type,
|
|
started_at: started_at.map(|dt| dt.to_rfc3339()),
|
|
collection_node_id,
|
|
}
|
|
}).collect())
|
|
}
|
|
|
|
/// Bakgrunnsoppgave som koordinerer nedstengningen.
|
|
async fn shutdown_coordinator(
|
|
state: MaintenanceState,
|
|
db: PgPool,
|
|
scheduled_at: DateTime<Utc>,
|
|
announcement_id: Uuid,
|
|
) {
|
|
// Vent til vedlikeholdstidspunkt
|
|
let wait_duration = (scheduled_at - Utc::now()).to_std().unwrap_or_default();
|
|
if !wait_duration.is_zero() {
|
|
tracing::info!(
|
|
seconds = wait_duration.as_secs(),
|
|
"Venter til vedlikeholdstidspunkt"
|
|
);
|
|
tokio::time::sleep(wait_duration).await;
|
|
}
|
|
|
|
// Aktiver vedlikeholdsmodus
|
|
state.active.store(true, Ordering::Relaxed);
|
|
tracing::warn!("Vedlikeholdsmodus AKTIV — nye jobber og LiveKit-rom blokkert");
|
|
|
|
// Oppdater varselet til å reflektere at vedlikehold er i gang
|
|
let active_meta = serde_json::json!({
|
|
"announcement_type": "critical",
|
|
"scheduled_at": scheduled_at.to_rfc3339(),
|
|
"blocks_new_sessions": true,
|
|
"maintenance_shutdown": true,
|
|
"maintenance_active": true,
|
|
});
|
|
let _ = sqlx::query(
|
|
"UPDATE nodes SET title = $2, content = $3, metadata = $4 WHERE id = $1",
|
|
)
|
|
.bind(announcement_id)
|
|
.bind("Vedlikehold pågår")
|
|
.bind("Systemet stenger ned. Vent til vedlikeholdet er ferdig.")
|
|
.bind(&active_meta)
|
|
.execute(&db)
|
|
.await;
|
|
|
|
// Vent på at kjørende jobber fullføres (maks 5 minutter)
|
|
let timeout = std::time::Duration::from_secs(300);
|
|
let start = std::time::Instant::now();
|
|
|
|
loop {
|
|
match fetch_running_jobs(&db).await {
|
|
Ok(jobs) if jobs.is_empty() => {
|
|
tracing::info!("Ingen kjørende jobber — klar for restart");
|
|
break;
|
|
}
|
|
Ok(jobs) => {
|
|
tracing::info!(
|
|
count = jobs.len(),
|
|
"Venter på {} kjørende jobber",
|
|
jobs.len()
|
|
);
|
|
}
|
|
Err(e) => {
|
|
tracing::error!("Feil ved sjekk av kjørende jobber: {e}");
|
|
}
|
|
}
|
|
|
|
if start.elapsed() > timeout {
|
|
tracing::warn!("Timeout (5 min) — tvinger nedstengning med kjørende jobber");
|
|
break;
|
|
}
|
|
|
|
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
|
|
}
|
|
|
|
// Slett varselet
|
|
if let Err(e) = sqlx::query("DELETE FROM nodes WHERE id = $1")
|
|
.bind(announcement_id)
|
|
.execute(&db)
|
|
.await
|
|
{
|
|
tracing::warn!("Kunne ikke slette varsel fra PG: {e}");
|
|
}
|
|
|
|
tracing::warn!("Avslutter prosessen for vedlikehold — systemd vil restarte");
|
|
|
|
// Gi litt tid til at siste logglinjer skrives
|
|
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
|
|
|
|
// Avslutt prosessen — systemd vil restarte maskinrommet
|
|
std::process::exit(0);
|
|
}
|