From 0012a103736393456f83c99ba536592d6d3265a1 Mon Sep 17 00:00:00 2001 From: vegard Date: Wed, 18 Mar 2026 11:01:36 +0000 Subject: [PATCH] Observerbarhet: strukturert logging, metrikker, /metrics-endepunkt (oppgave 12.1) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Legger til observerbarhetslaget i maskinrommet: - Strukturert JSON-logging via LOG_FORMAT=json (maskinlesbart for log-aggregering). Default er human-readable for utvikling. - Ny metrics-modul med in-memory request latency tracking per rute (count, avg, min, max, p50/p95/p99 fra siste 1000 forespørsler). - Custom axum-middleware erstatter tower_http::TraceLayer — logger method, path, status og duration_ms per request, og mater metrikk-samleren. - GET /metrics-endepunkt som returnerer: - request_latency: per-rute statistikk - queue_depth: pending/running/error/retry fra job_queue - ai_cost: aggregert fra ai_usage_log (siste time/24h/30d) - Default loggnivå endret fra debug til info for mindre støy. --- CLAUDE.md | 1 + docs/infra/observerbarhet.md | 104 +++++++++++ maskinrommet/src/main.rs | 34 ++-- maskinrommet/src/metrics.rs | 326 +++++++++++++++++++++++++++++++++++ tasks.md | 3 +- 5 files changed, 455 insertions(+), 13 deletions(-) create mode 100644 docs/infra/observerbarhet.md create mode 100644 maskinrommet/src/metrics.rs diff --git a/CLAUDE.md b/CLAUDE.md index 835f012..d8c5ea1 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -63,6 +63,7 @@ CLAUDE.md er eneste startdokument. Alt annet ligger under `docs/`: - `jobbkø.md` — PostgreSQL-basert køsystem for bakgrunnsjobber - `synkronisering.md` — PostgreSQL ↔ SpacetimeDB dataflyt og eierskapsmodell - `claude_agent.md` — Claude som chat-deltaker: arkitektur, triggere, sikkerhet + - `observerbarhet.md` — Strukturert logging, metrikk-endepunkt (/metrics), AI-kostnad - `docs/erfaringer/` — Lærdommer fra v1 (adapter-mønster, Svelte 5, SpacetimeDB, Authentik) - `reference/` — Kode fra v1 med gjenbruksverdi (Editor.svelte) - `ops/` — Repeterbare vedlikeholdsjobber (ryddejobb, doc-audit, drift-sjekk) diff --git a/docs/infra/observerbarhet.md b/docs/infra/observerbarhet.md new file mode 100644 index 0000000..ead3cbe --- /dev/null +++ b/docs/infra/observerbarhet.md @@ -0,0 +1,104 @@ +# Observerbarhet + +Maskinrommets observerbarhetslag: strukturert logging, request-metrikker, +jobbkø-dybde og AI-kostnadsovervåking. + +## Strukturert logging + +Maskinrommet bruker `tracing` med `tracing-subscriber`. Format velges via +miljøvariabel: + +| `LOG_FORMAT` | Resultat | +|---|---| +| `json` | JSON-linjer (maskinlesbart, egnet for log-aggregering) | +| *(annet/utelatt)* | Human-readable (standard for utvikling) | + +Loggnivå styres via `RUST_LOG` (default: `maskinrommet=info`). + +### JSON-format eksempel + +```json +{"timestamp":"2026-03-18T12:00:00Z","level":"INFO","fields":{"method":"GET","path":"/health","status":200,"duration_ms":1.2},"target":"maskinrommet::metrics","message":"request"} +``` + +## Metrikk-endepunkt + +`GET /metrics` returnerer JSON med tre seksjoner: + +### request_latency + +In-memory per-rute statistikk (nullstilles ved restart): + +```json +{ + "uptime_secs": 3600, + "total_requests": 1234, + "routes": { + "/health": { + "count": 500, + "avg_ms": 1.2, + "min_ms": 0.3, + "max_ms": 15.0, + "p50_ms": 0.8, + "p95_ms": 3.5, + "p99_ms": 12.0 + } + } +} +``` + +Percentilene beregnes fra de siste 1000 forespørslene per rute. + +### queue_depth + +Sanntidsstatus fra `job_queue`-tabellen: + +```json +{ + "pending": 3, + "running": 1, + "error": 0, + "retry": 1, + "completed_last_hour": 42 +} +``` + +### ai_cost + +Aggregerte AI-kostnader fra `ai_usage_log` for tre tidsperioder +(siste time, 24 timer, 30 dager): + +```json +{ + "last_hour": { + "requests": 5, + "input_tokens": 12000, + "output_tokens": 3000, + "estimated_cost_usd": 0.08 + } +} +``` + +Kostnadsestimatet er basert på konservative gjennomsnittspriser +($3/MTok input, $15/MTok output). + +## Request-logging middleware + +Alle HTTP-forespørsler logges med: + +- `method` — HTTP-metode +- `path` — Forespørsels-sti +- `status` — HTTP-statuskode +- `duration_ms` — Total responstid + +Dette erstatter `tower_http::TraceLayer` med en mer strukturert +variant som også mater metrikk-samleren. + +## Arkitektur + +Metrikker samles i minnet (ingen ekstern avhengighet som Prometheus). +Designvalg: + +- **Enkel:** Ingen nye avhengigheter eller ekstern infrastruktur +- **Tilstrekkelig:** Queue depth og AI-kostnad fra PG, latency in-memory +- **Utvidbar:** Kan legge til Prometheus-eksport senere ved behov diff --git a/maskinrommet/src/main.rs b/maskinrommet/src/main.rs index 4592471..1e392ca 100644 --- a/maskinrommet/src/main.rs +++ b/maskinrommet/src/main.rs @@ -12,6 +12,7 @@ mod intentions; pub mod jobs; pub mod livekit; pub mod maintenance; +pub mod metrics; pub mod pruning; mod queries; pub mod publishing; @@ -30,11 +31,10 @@ pub mod user_usage; mod warmup; mod workspace; -use axum::{extract::State, http::StatusCode, routing::{get, post}, Json, Router}; +use axum::{extract::State, http::StatusCode, middleware, routing::{get, post}, Json, Router}; use serde::Serialize; use sqlx::postgres::PgPoolOptions; use sqlx::PgPool; -use tower_http::trace::TraceLayer; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter}; use auth::{AuthUser, JwksKeys}; @@ -51,6 +51,7 @@ pub struct AppState { pub dynamic_page_cache: publishing::DynamicPageCache, pub maintenance: maintenance::MaintenanceState, pub priority_rules: resources::PriorityRules, + pub metrics: metrics::MetricsCollector, } #[derive(Serialize)] @@ -69,13 +70,21 @@ struct MeResponse { #[tokio::main] async fn main() { - tracing_subscriber::registry() - .with( - EnvFilter::try_from_default_env() - .unwrap_or_else(|_| "maskinrommet=debug,tower_http=debug".parse().unwrap()), - ) - .with(tracing_subscriber::fmt::layer()) - .init(); + // Strukturert logging: LOG_FORMAT=json for maskinlesbart, ellers human-readable. + let env_filter = EnvFilter::try_from_default_env() + .unwrap_or_else(|_| "maskinrommet=info,tower_http=info".parse().unwrap()); + let log_format = std::env::var("LOG_FORMAT").unwrap_or_default(); + if log_format == "json" { + tracing_subscriber::registry() + .with(env_filter) + .with(tracing_subscriber::fmt::layer().json()) + .init(); + } else { + tracing_subscriber::registry() + .with(env_filter) + .with(tracing_subscriber::fmt::layer()) + .init(); + } // Database let database_url = std::env::var("DATABASE_URL") @@ -179,7 +188,8 @@ async fn main() { let index_cache = publishing::new_index_cache(); let dynamic_page_cache = publishing::new_dynamic_page_cache(); - let state = AppState { db, jwks, stdb, cas, index_cache, dynamic_page_cache, maintenance, priority_rules }; + let metrics = metrics::MetricsCollector::new(); + let state = AppState { db, jwks, stdb, cas, index_cache, dynamic_page_cache, maintenance, priority_rules, metrics }; // Ruter: /health er offentlig, /me krever gyldig JWT let app = Router::new() @@ -278,7 +288,9 @@ async fn main() { .route("/custom-domain/sok", get(custom_domain::serve_custom_domain_search)) .route("/custom-domain/om", get(custom_domain::serve_custom_domain_about)) .route("/custom-domain/{article_id}", get(custom_domain::serve_custom_domain_article)) - .layer(TraceLayer::new_for_http()) + // Observerbarhet (oppgave 12.1) + .route("/metrics", get(metrics::metrics_endpoint)) + .layer(middleware::from_fn_with_state(state.clone(), metrics::latency_middleware)) .with_state(state); let bind = std::env::var("BIND_ADDR").unwrap_or_else(|_| "0.0.0.0:3100".to_string()); diff --git a/maskinrommet/src/metrics.rs b/maskinrommet/src/metrics.rs new file mode 100644 index 0000000..cee300b --- /dev/null +++ b/maskinrommet/src/metrics.rs @@ -0,0 +1,326 @@ +// Observerbarhet — in-memory metrikker for maskinrommet. +// +// Samler request-latency per rute, eksponerer via /metrics-endepunkt. +// Queue depth og AI-kostnad hentes direkte fra PG ved forespørsel. +// +// Ref: oppgave 12.1 + +use axum::{ + body::Body, + extract::State, + http::{Request, StatusCode}, + middleware::Next, + response::{IntoResponse, Response}, + Json, +}; +use serde::Serialize; +use sqlx::PgPool; +use std::collections::HashMap; +use std::sync::{Arc, Mutex}; +use std::time::Instant; + +/// In-memory metrikk-samler. Deles via Arc i AppState. +#[derive(Clone)] +pub struct MetricsCollector { + inner: Arc>, +} + +struct MetricsInner { + /// Latency-data per rute: path → liste av varigheter i mikrosekunder + routes: HashMap, + /// Totale forespørsler siden oppstart + total_requests: u64, + /// Oppstartstidspunkt + started_at: Instant, +} + +struct RouteStats { + count: u64, + total_us: u64, + min_us: u64, + max_us: u64, + /// Siste N latencies for p50/p95/p99-beregning (ringbuffer) + recent: Vec, +} + +const RECENT_BUFFER_SIZE: usize = 1000; + +impl RouteStats { + fn new() -> Self { + Self { + count: 0, + total_us: 0, + min_us: u64::MAX, + max_us: 0, + recent: Vec::with_capacity(RECENT_BUFFER_SIZE), + } + } + + fn record(&mut self, duration_us: u64) { + self.count += 1; + self.total_us += duration_us; + if duration_us < self.min_us { + self.min_us = duration_us; + } + if duration_us > self.max_us { + self.max_us = duration_us; + } + if self.recent.len() >= RECENT_BUFFER_SIZE { + self.recent.remove(0); + } + self.recent.push(duration_us); + } + + fn percentile(&self, p: f64) -> u64 { + if self.recent.is_empty() { + return 0; + } + let mut sorted = self.recent.clone(); + sorted.sort_unstable(); + let idx = ((p / 100.0) * (sorted.len() as f64 - 1.0)).round() as usize; + sorted[idx.min(sorted.len() - 1)] + } +} + +impl MetricsCollector { + pub fn new() -> Self { + Self { + inner: Arc::new(Mutex::new(MetricsInner { + routes: HashMap::new(), + total_requests: 0, + started_at: Instant::now(), + })), + } + } + + /// Registrer en forespørsel med varighet. + pub fn record(&self, path: &str, duration_us: u64) { + let mut inner = self.inner.lock().unwrap(); + inner.total_requests += 1; + inner + .routes + .entry(path.to_string()) + .or_insert_with(RouteStats::new) + .record(duration_us); + } + + /// Hent snapshot av alle rute-metrikker. + pub fn snapshot(&self) -> MetricsSnapshot { + let inner = self.inner.lock().unwrap(); + let uptime_secs = inner.started_at.elapsed().as_secs(); + + let routes = inner + .routes + .iter() + .map(|(path, stats)| { + ( + path.clone(), + RouteMetrics { + count: stats.count, + avg_ms: if stats.count > 0 { + (stats.total_us as f64 / stats.count as f64) / 1000.0 + } else { + 0.0 + }, + min_ms: if stats.min_us == u64::MAX { + 0.0 + } else { + stats.min_us as f64 / 1000.0 + }, + max_ms: stats.max_us as f64 / 1000.0, + p50_ms: stats.percentile(50.0) as f64 / 1000.0, + p95_ms: stats.percentile(95.0) as f64 / 1000.0, + p99_ms: stats.percentile(99.0) as f64 / 1000.0, + }, + ) + }) + .collect(); + + MetricsSnapshot { + uptime_secs, + total_requests: inner.total_requests, + routes, + } + } +} + +#[derive(Serialize)] +pub struct MetricsSnapshot { + pub uptime_secs: u64, + pub total_requests: u64, + pub routes: HashMap, +} + +#[derive(Serialize)] +pub struct RouteMetrics { + pub count: u64, + pub avg_ms: f64, + pub min_ms: f64, + pub max_ms: f64, + pub p50_ms: f64, + pub p95_ms: f64, + pub p99_ms: f64, +} + +// --- Queue depth --- + +#[derive(Serialize)] +pub struct QueueDepth { + pub pending: i64, + pub running: i64, + pub error: i64, + pub retry: i64, + pub completed_last_hour: i64, +} + +async fn query_queue_depth(db: &PgPool) -> Result { + let row = sqlx::query_as::<_, (i64, i64, i64, i64)>( + r#" + SELECT + COUNT(*) FILTER (WHERE status = 'pending') AS pending, + COUNT(*) FILTER (WHERE status = 'running') AS running, + COUNT(*) FILTER (WHERE status = 'error') AS error, + COUNT(*) FILTER (WHERE status = 'retry') AS retry + FROM job_queue + "#, + ) + .fetch_one(db) + .await?; + + let completed: (i64,) = sqlx::query_as( + "SELECT COUNT(*) FROM job_queue WHERE status = 'completed' AND updated_at > now() - interval '1 hour'", + ) + .fetch_one(db) + .await?; + + Ok(QueueDepth { + pending: row.0, + running: row.1, + error: row.2, + retry: row.3, + completed_last_hour: completed.0, + }) +} + +// --- AI cost --- + +#[derive(Serialize)] +pub struct AiCostSummary { + pub last_hour: AiCostPeriod, + pub last_24h: AiCostPeriod, + pub last_30d: AiCostPeriod, +} + +#[derive(Serialize, Default)] +pub struct AiCostPeriod { + pub requests: i64, + pub input_tokens: i64, + pub output_tokens: i64, + pub estimated_cost_usd: f64, +} + +async fn query_ai_cost(db: &PgPool) -> Result { + // Hent aggregerte AI-kostnader fra ai_usage_log for tre tidsperioder + let row = sqlx::query_as::<_, (i64, i64, i64, i64, i64, i64, i64, i64, i64)>( + r#" + SELECT + COUNT(*) FILTER (WHERE created_at > now() - interval '1 hour'), + COALESCE(SUM(input_tokens) FILTER (WHERE created_at > now() - interval '1 hour'), 0), + COALESCE(SUM(output_tokens) FILTER (WHERE created_at > now() - interval '1 hour'), 0), + COUNT(*) FILTER (WHERE created_at > now() - interval '24 hours'), + COALESCE(SUM(input_tokens) FILTER (WHERE created_at > now() - interval '24 hours'), 0), + COALESCE(SUM(output_tokens) FILTER (WHERE created_at > now() - interval '24 hours'), 0), + COUNT(*) FILTER (WHERE created_at > now() - interval '30 days'), + COALESCE(SUM(input_tokens) FILTER (WHERE created_at > now() - interval '30 days'), 0), + COALESCE(SUM(output_tokens) FILTER (WHERE created_at > now() - interval '30 days'), 0) + FROM ai_usage_log + "#, + ) + .fetch_one(db) + .await?; + + fn estimate_cost(input_tokens: i64, output_tokens: i64) -> f64 { + // Grovt estimat basert på typiske Claude/GPT-priser (USD per 1M tokens) + // Input: ~$3/MTok, Output: ~$15/MTok (konservativt gjennomsnitt) + let input_cost = input_tokens as f64 * 3.0 / 1_000_000.0; + let output_cost = output_tokens as f64 * 15.0 / 1_000_000.0; + ((input_cost + output_cost) * 100.0).round() / 100.0 + } + + Ok(AiCostSummary { + last_hour: AiCostPeriod { + requests: row.0, + input_tokens: row.1, + output_tokens: row.2, + estimated_cost_usd: estimate_cost(row.1, row.2), + }, + last_24h: AiCostPeriod { + requests: row.3, + input_tokens: row.4, + output_tokens: row.5, + estimated_cost_usd: estimate_cost(row.4, row.5), + }, + last_30d: AiCostPeriod { + requests: row.6, + input_tokens: row.7, + output_tokens: row.8, + estimated_cost_usd: estimate_cost(row.7, row.8), + }, + }) +} + +// --- Middleware --- + +/// Axum-middleware som måler request-latency og logger strukturert. +pub async fn latency_middleware( + State(state): State, + request: Request, + next: Next, +) -> Response { + let path = request.uri().path().to_string(); + let method = request.method().clone(); + let start = Instant::now(); + + let response = next.run(request).await; + + let duration = start.elapsed(); + let duration_us = duration.as_micros() as u64; + let status = response.status().as_u16(); + + state.metrics.record(&path, duration_us); + + tracing::info!( + method = %method, + path = %path, + status = status, + duration_ms = duration.as_secs_f64() * 1000.0, + "request" + ); + + response +} + +// --- Endpoint --- + +#[derive(Serialize)] +struct MetricsResponse { + request_latency: MetricsSnapshot, + queue_depth: Option, + ai_cost: Option, +} + +/// GET /metrics — samlet observerbarhets-endepunkt. +pub async fn metrics_endpoint( + State(state): State, +) -> Result { + let request_latency = state.metrics.snapshot(); + + let queue_depth = query_queue_depth(&state.db).await.ok(); + let ai_cost = query_ai_cost(&state.db).await.ok(); + + Ok(Json(MetricsResponse { + request_latency, + queue_depth, + ai_cost, + })) +} diff --git a/tasks.md b/tasks.md index 17b1c9a..13ba755 100644 --- a/tasks.md +++ b/tasks.md @@ -266,8 +266,7 @@ kaller dem direkte. Samme verktøy, to brukere. ## Fase 12: Herding -- [~] 12.1 Observerbarhet: strukturert logging, metrikker (request latency, queue depth, AI cost). - > Påbegynt: 2026-03-18T10:55 +- [x] 12.1 Observerbarhet: strukturert logging, metrikker (request latency, queue depth, AI cost). - [ ] 12.2 Backup: PG-dump rutine, STDB → PG gjenoppbygging ved krasj. - [ ] 12.3 Feilhåndtering: retry med backoff i skrivestien, dead letter queue for feilede PG-skrivinger. - [ ] 12.4 Ytelse: profiler STDB-spørringer, optimaliser node_access-oppdatering.