Profilert alle kritiske PG-spørringer med EXPLAIN ANALYZE. Identifiserte at recompute_access brukte single-column index (idx_edges_type) med lav selektivitet, og RLS-policyer manglet composite indexes for effektive oppslag. Endringer: Migrasjon 017_query_performance.sql: - 6 nye composite indexes: - idx_edges_target_type (target_id, edge_type) — recompute_access + belongs_to - idx_edges_source_type (source_id, edge_type) — alias-oppslag - idx_edges_target_memberof (partial, member_of) — team-propagering - idx_nodes_created_at_desc — ORDER BY created_at DESC - idx_nodes_kind_created — filtrer på kind + sorter - idx_na_subject_covering INCLUDE (object_id) — RLS without heap lookup - Optimalisert recompute_access(): steg 3 og 4 kjøres nå bare når det er relevant (EXISTS-sjekk først). For vanlige brukere (ikke team) unngår dette to fulle INSERT-SELECT-operasjoner. - via_edge oppdateres nå korrekt ved access-nivå-endring. Slow query logging (maskinrommet): - Forespørsler >200ms logges som WARN med tag slow_request - PG-spørringer >100ms logges som WARN med tag slow_query - recompute_access-kall logges med varighet for overvåking - Nytt pg_stats-felt i /metrics med tabell- og index-statistikk, cache hit ratio, og node_access-telling Dokumentasjon oppdatert i docs/infra/observerbarhet.md.
462 lines
13 KiB
Rust
462 lines
13 KiB
Rust
// Observerbarhet — in-memory metrikker for maskinrommet.
|
|
//
|
|
// Samler request-latency per rute, eksponerer via /metrics-endepunkt.
|
|
// Queue depth og AI-kostnad hentes direkte fra PG ved forespørsel.
|
|
// Slow query logging: spørringer over terskel logges med kontekst.
|
|
//
|
|
// Ref: oppgave 12.1, 12.4
|
|
|
|
use axum::{
|
|
body::Body,
|
|
extract::State,
|
|
http::{Request, StatusCode},
|
|
middleware::Next,
|
|
response::{IntoResponse, Response},
|
|
Json,
|
|
};
|
|
use serde::Serialize;
|
|
use sqlx::PgPool;
|
|
use std::collections::HashMap;
|
|
use std::sync::{Arc, Mutex};
|
|
use std::time::Instant;
|
|
|
|
/// In-memory metrikk-samler. Deles via Arc i AppState.
|
|
#[derive(Clone)]
|
|
pub struct MetricsCollector {
|
|
inner: Arc<Mutex<MetricsInner>>,
|
|
}
|
|
|
|
struct MetricsInner {
|
|
/// Latency-data per rute: path → liste av varigheter i mikrosekunder
|
|
routes: HashMap<String, RouteStats>,
|
|
/// Totale forespørsler siden oppstart
|
|
total_requests: u64,
|
|
/// Oppstartstidspunkt
|
|
started_at: Instant,
|
|
}
|
|
|
|
struct RouteStats {
|
|
count: u64,
|
|
total_us: u64,
|
|
min_us: u64,
|
|
max_us: u64,
|
|
/// Siste N latencies for p50/p95/p99-beregning (ringbuffer)
|
|
recent: Vec<u64>,
|
|
}
|
|
|
|
const RECENT_BUFFER_SIZE: usize = 1000;
|
|
|
|
impl RouteStats {
|
|
fn new() -> Self {
|
|
Self {
|
|
count: 0,
|
|
total_us: 0,
|
|
min_us: u64::MAX,
|
|
max_us: 0,
|
|
recent: Vec::with_capacity(RECENT_BUFFER_SIZE),
|
|
}
|
|
}
|
|
|
|
fn record(&mut self, duration_us: u64) {
|
|
self.count += 1;
|
|
self.total_us += duration_us;
|
|
if duration_us < self.min_us {
|
|
self.min_us = duration_us;
|
|
}
|
|
if duration_us > self.max_us {
|
|
self.max_us = duration_us;
|
|
}
|
|
if self.recent.len() >= RECENT_BUFFER_SIZE {
|
|
self.recent.remove(0);
|
|
}
|
|
self.recent.push(duration_us);
|
|
}
|
|
|
|
fn percentile(&self, p: f64) -> u64 {
|
|
if self.recent.is_empty() {
|
|
return 0;
|
|
}
|
|
let mut sorted = self.recent.clone();
|
|
sorted.sort_unstable();
|
|
let idx = ((p / 100.0) * (sorted.len() as f64 - 1.0)).round() as usize;
|
|
sorted[idx.min(sorted.len() - 1)]
|
|
}
|
|
}
|
|
|
|
impl MetricsCollector {
|
|
pub fn new() -> Self {
|
|
Self {
|
|
inner: Arc::new(Mutex::new(MetricsInner {
|
|
routes: HashMap::new(),
|
|
total_requests: 0,
|
|
started_at: Instant::now(),
|
|
})),
|
|
}
|
|
}
|
|
|
|
/// Registrer en forespørsel med varighet.
|
|
pub fn record(&self, path: &str, duration_us: u64) {
|
|
let mut inner = self.inner.lock().unwrap();
|
|
inner.total_requests += 1;
|
|
inner
|
|
.routes
|
|
.entry(path.to_string())
|
|
.or_insert_with(RouteStats::new)
|
|
.record(duration_us);
|
|
}
|
|
|
|
/// Hent snapshot av alle rute-metrikker.
|
|
pub fn snapshot(&self) -> MetricsSnapshot {
|
|
let inner = self.inner.lock().unwrap();
|
|
let uptime_secs = inner.started_at.elapsed().as_secs();
|
|
|
|
let routes = inner
|
|
.routes
|
|
.iter()
|
|
.map(|(path, stats)| {
|
|
(
|
|
path.clone(),
|
|
RouteMetrics {
|
|
count: stats.count,
|
|
avg_ms: if stats.count > 0 {
|
|
(stats.total_us as f64 / stats.count as f64) / 1000.0
|
|
} else {
|
|
0.0
|
|
},
|
|
min_ms: if stats.min_us == u64::MAX {
|
|
0.0
|
|
} else {
|
|
stats.min_us as f64 / 1000.0
|
|
},
|
|
max_ms: stats.max_us as f64 / 1000.0,
|
|
p50_ms: stats.percentile(50.0) as f64 / 1000.0,
|
|
p95_ms: stats.percentile(95.0) as f64 / 1000.0,
|
|
p99_ms: stats.percentile(99.0) as f64 / 1000.0,
|
|
},
|
|
)
|
|
})
|
|
.collect();
|
|
|
|
MetricsSnapshot {
|
|
uptime_secs,
|
|
total_requests: inner.total_requests,
|
|
routes,
|
|
}
|
|
}
|
|
}
|
|
|
|
#[derive(Serialize)]
|
|
pub struct MetricsSnapshot {
|
|
pub uptime_secs: u64,
|
|
pub total_requests: u64,
|
|
pub routes: HashMap<String, RouteMetrics>,
|
|
}
|
|
|
|
#[derive(Serialize)]
|
|
pub struct RouteMetrics {
|
|
pub count: u64,
|
|
pub avg_ms: f64,
|
|
pub min_ms: f64,
|
|
pub max_ms: f64,
|
|
pub p50_ms: f64,
|
|
pub p95_ms: f64,
|
|
pub p99_ms: f64,
|
|
}
|
|
|
|
// --- Queue depth ---
|
|
|
|
#[derive(Serialize)]
|
|
pub struct QueueDepth {
|
|
pub pending: i64,
|
|
pub running: i64,
|
|
pub error: i64,
|
|
pub retry: i64,
|
|
pub completed_last_hour: i64,
|
|
}
|
|
|
|
async fn query_queue_depth(db: &PgPool) -> Result<QueueDepth, sqlx::Error> {
|
|
let row = sqlx::query_as::<_, (i64, i64, i64, i64)>(
|
|
r#"
|
|
SELECT
|
|
COUNT(*) FILTER (WHERE status = 'pending') AS pending,
|
|
COUNT(*) FILTER (WHERE status = 'running') AS running,
|
|
COUNT(*) FILTER (WHERE status = 'error') AS error,
|
|
COUNT(*) FILTER (WHERE status = 'retry') AS retry
|
|
FROM job_queue
|
|
"#,
|
|
)
|
|
.fetch_one(db)
|
|
.await?;
|
|
|
|
let completed: (i64,) = sqlx::query_as(
|
|
"SELECT COUNT(*) FROM job_queue WHERE status = 'completed' AND updated_at > now() - interval '1 hour'",
|
|
)
|
|
.fetch_one(db)
|
|
.await?;
|
|
|
|
Ok(QueueDepth {
|
|
pending: row.0,
|
|
running: row.1,
|
|
error: row.2,
|
|
retry: row.3,
|
|
completed_last_hour: completed.0,
|
|
})
|
|
}
|
|
|
|
// --- AI cost ---
|
|
|
|
#[derive(Serialize)]
|
|
pub struct AiCostSummary {
|
|
pub last_hour: AiCostPeriod,
|
|
pub last_24h: AiCostPeriod,
|
|
pub last_30d: AiCostPeriod,
|
|
}
|
|
|
|
#[derive(Serialize, Default)]
|
|
pub struct AiCostPeriod {
|
|
pub requests: i64,
|
|
pub input_tokens: i64,
|
|
pub output_tokens: i64,
|
|
pub estimated_cost_usd: f64,
|
|
}
|
|
|
|
async fn query_ai_cost(db: &PgPool) -> Result<AiCostSummary, sqlx::Error> {
|
|
// Hent aggregerte AI-kostnader fra ai_usage_log for tre tidsperioder
|
|
let row = sqlx::query_as::<_, (i64, i64, i64, i64, i64, i64, i64, i64, i64)>(
|
|
r#"
|
|
SELECT
|
|
COUNT(*) FILTER (WHERE created_at > now() - interval '1 hour'),
|
|
COALESCE(SUM(input_tokens) FILTER (WHERE created_at > now() - interval '1 hour'), 0),
|
|
COALESCE(SUM(output_tokens) FILTER (WHERE created_at > now() - interval '1 hour'), 0),
|
|
COUNT(*) FILTER (WHERE created_at > now() - interval '24 hours'),
|
|
COALESCE(SUM(input_tokens) FILTER (WHERE created_at > now() - interval '24 hours'), 0),
|
|
COALESCE(SUM(output_tokens) FILTER (WHERE created_at > now() - interval '24 hours'), 0),
|
|
COUNT(*) FILTER (WHERE created_at > now() - interval '30 days'),
|
|
COALESCE(SUM(input_tokens) FILTER (WHERE created_at > now() - interval '30 days'), 0),
|
|
COALESCE(SUM(output_tokens) FILTER (WHERE created_at > now() - interval '30 days'), 0)
|
|
FROM ai_usage_log
|
|
"#,
|
|
)
|
|
.fetch_one(db)
|
|
.await?;
|
|
|
|
fn estimate_cost(input_tokens: i64, output_tokens: i64) -> f64 {
|
|
// Grovt estimat basert på typiske Claude/GPT-priser (USD per 1M tokens)
|
|
// Input: ~$3/MTok, Output: ~$15/MTok (konservativt gjennomsnitt)
|
|
let input_cost = input_tokens as f64 * 3.0 / 1_000_000.0;
|
|
let output_cost = output_tokens as f64 * 15.0 / 1_000_000.0;
|
|
((input_cost + output_cost) * 100.0).round() / 100.0
|
|
}
|
|
|
|
Ok(AiCostSummary {
|
|
last_hour: AiCostPeriod {
|
|
requests: row.0,
|
|
input_tokens: row.1,
|
|
output_tokens: row.2,
|
|
estimated_cost_usd: estimate_cost(row.1, row.2),
|
|
},
|
|
last_24h: AiCostPeriod {
|
|
requests: row.3,
|
|
input_tokens: row.4,
|
|
output_tokens: row.5,
|
|
estimated_cost_usd: estimate_cost(row.4, row.5),
|
|
},
|
|
last_30d: AiCostPeriod {
|
|
requests: row.6,
|
|
input_tokens: row.7,
|
|
output_tokens: row.8,
|
|
estimated_cost_usd: estimate_cost(row.7, row.8),
|
|
},
|
|
})
|
|
}
|
|
|
|
// --- Middleware ---
|
|
|
|
/// Terskelverdi for treg forespørsel (millisekunder).
|
|
/// Forespørsler over denne verdien logges som warning.
|
|
const SLOW_REQUEST_THRESHOLD_MS: f64 = 200.0;
|
|
|
|
/// Axum-middleware som måler request-latency og logger strukturert.
|
|
/// Forespørsler over SLOW_REQUEST_THRESHOLD_MS logges som warning med ekstra kontekst.
|
|
pub async fn latency_middleware(
|
|
State(state): State<crate::AppState>,
|
|
request: Request<Body>,
|
|
next: Next,
|
|
) -> Response {
|
|
let path = request.uri().path().to_string();
|
|
let method = request.method().clone();
|
|
let start = Instant::now();
|
|
|
|
let response = next.run(request).await;
|
|
|
|
let duration = start.elapsed();
|
|
let duration_us = duration.as_micros() as u64;
|
|
let duration_ms = duration.as_secs_f64() * 1000.0;
|
|
let status = response.status().as_u16();
|
|
|
|
state.metrics.record(&path, duration_us);
|
|
|
|
if duration_ms >= SLOW_REQUEST_THRESHOLD_MS {
|
|
tracing::warn!(
|
|
method = %method,
|
|
path = %path,
|
|
status = status,
|
|
duration_ms = duration_ms,
|
|
"slow_request"
|
|
);
|
|
} else {
|
|
tracing::info!(
|
|
method = %method,
|
|
path = %path,
|
|
status = status,
|
|
duration_ms = duration_ms,
|
|
"request"
|
|
);
|
|
}
|
|
|
|
response
|
|
}
|
|
|
|
// --- Endpoint ---
|
|
|
|
#[derive(Serialize)]
|
|
struct MetricsResponse {
|
|
request_latency: MetricsSnapshot,
|
|
queue_depth: Option<QueueDepth>,
|
|
ai_cost: Option<AiCostSummary>,
|
|
pg_stats: Option<PgQueryStats>,
|
|
}
|
|
|
|
/// GET /metrics — samlet observerbarhets-endepunkt.
|
|
pub async fn metrics_endpoint(
|
|
State(state): State<crate::AppState>,
|
|
) -> Result<impl IntoResponse, StatusCode> {
|
|
let request_latency = state.metrics.snapshot();
|
|
|
|
let queue_depth = query_queue_depth(&state.db).await.ok();
|
|
let ai_cost = query_ai_cost(&state.db).await.ok();
|
|
let pg_stats = query_pg_stats(&state.db).await.ok();
|
|
|
|
Ok(Json(MetricsResponse {
|
|
request_latency,
|
|
queue_depth,
|
|
ai_cost,
|
|
pg_stats,
|
|
}))
|
|
}
|
|
|
|
// --- PG Query Stats ---
|
|
|
|
#[derive(Serialize)]
|
|
pub struct PgTableStats {
|
|
pub table_name: String,
|
|
pub row_count: i64,
|
|
pub index_scan_count: i64,
|
|
pub seq_scan_count: i64,
|
|
/// Ratio av index scans til totale scans (0.0-1.0). Høyere er bedre.
|
|
pub index_scan_ratio: f64,
|
|
}
|
|
|
|
#[derive(Serialize)]
|
|
pub struct PgIndexStats {
|
|
pub index_name: String,
|
|
pub table_name: String,
|
|
pub scan_count: i64,
|
|
pub size_bytes: i64,
|
|
}
|
|
|
|
#[derive(Serialize)]
|
|
pub struct PgQueryStats {
|
|
pub tables: Vec<PgTableStats>,
|
|
pub indexes: Vec<PgIndexStats>,
|
|
pub node_access_count: i64,
|
|
pub cache_hit_ratio: f64,
|
|
}
|
|
|
|
async fn query_pg_stats(db: &PgPool) -> Result<PgQueryStats, sqlx::Error> {
|
|
// Tabell-statistikk: seq_scan vs index_scan for kjernetabellene
|
|
let tables = sqlx::query_as::<_, (String, i64, i64, i64)>(
|
|
r#"
|
|
SELECT
|
|
relname::text,
|
|
COALESCE(n_live_tup, 0),
|
|
COALESCE(idx_scan, 0),
|
|
COALESCE(seq_scan, 0)
|
|
FROM pg_stat_user_tables
|
|
WHERE relname IN ('nodes', 'edges', 'node_access', 'job_queue', 'auth_identities')
|
|
ORDER BY relname
|
|
"#,
|
|
)
|
|
.fetch_all(db)
|
|
.await?;
|
|
|
|
let table_stats: Vec<PgTableStats> = tables
|
|
.into_iter()
|
|
.map(|(name, rows, idx_scans, seq_scans)| {
|
|
let total = idx_scans + seq_scans;
|
|
PgTableStats {
|
|
table_name: name,
|
|
row_count: rows,
|
|
index_scan_count: idx_scans,
|
|
seq_scan_count: seq_scans,
|
|
index_scan_ratio: if total > 0 {
|
|
idx_scans as f64 / total as f64
|
|
} else {
|
|
1.0
|
|
},
|
|
}
|
|
})
|
|
.collect();
|
|
|
|
// Index-bruksstatistikk for de viktigste indeksene
|
|
let indexes = sqlx::query_as::<_, (String, String, i64, i64)>(
|
|
r#"
|
|
SELECT
|
|
indexrelname::text,
|
|
relname::text,
|
|
COALESCE(idx_scan, 0),
|
|
pg_relation_size(indexrelid)
|
|
FROM pg_stat_user_indexes
|
|
WHERE relname IN ('nodes', 'edges', 'node_access')
|
|
ORDER BY idx_scan DESC
|
|
"#,
|
|
)
|
|
.fetch_all(db)
|
|
.await?;
|
|
|
|
let index_stats: Vec<PgIndexStats> = indexes
|
|
.into_iter()
|
|
.map(|(idx_name, tbl_name, scans, size)| PgIndexStats {
|
|
index_name: idx_name,
|
|
table_name: tbl_name,
|
|
scan_count: scans,
|
|
size_bytes: size,
|
|
})
|
|
.collect();
|
|
|
|
// node_access row count
|
|
let (na_count,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM node_access")
|
|
.fetch_one(db)
|
|
.await?;
|
|
|
|
// Cache hit ratio
|
|
let (hit_ratio,): (f64,) = sqlx::query_as(
|
|
r#"
|
|
SELECT COALESCE(
|
|
SUM(heap_blks_hit)::float / NULLIF(SUM(heap_blks_hit) + SUM(heap_blks_read), 0),
|
|
1.0
|
|
)
|
|
FROM pg_statio_user_tables
|
|
WHERE relname IN ('nodes', 'edges', 'node_access')
|
|
"#,
|
|
)
|
|
.fetch_one(db)
|
|
.await?;
|
|
|
|
Ok(PgQueryStats {
|
|
tables: table_stats,
|
|
indexes: index_stats,
|
|
node_access_count: na_count,
|
|
cache_hit_ratio: (hit_ratio * 10000.0).round() / 10000.0,
|
|
})
|
|
}
|