SpacetimeDB-klient og warmup i maskinrommet (oppgave 2.3)
Legger til HTTP-klient som kaller STDB-reducere via JSON API.
Warmup-modul laster hele grafen fra PG til STDB ved oppstart.
- stdb.rs: HTTP-klient med create/update/delete for noder og edges
- warmup.rs: PG → STDB sync (clear_all → noder → edges)
- main.rs: Integrerer STDB i AppState, kjører warmup ved oppstart
API-format: POST /v1/database/{db}/call/{reducer} med navngitte params.
STDB-token kan settes via SPACETIMEDB_TOKEN eller opprettes automatisk.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
f946868ab1
commit
cb7f88035d
3 changed files with 454 additions and 2 deletions
|
|
@ -1,4 +1,6 @@
|
|||
mod auth;
|
||||
mod stdb;
|
||||
mod warmup;
|
||||
|
||||
use axum::{extract::State, http::StatusCode, routing::get, Json, Router};
|
||||
use serde::Serialize;
|
||||
|
|
@ -8,11 +10,13 @@ use tower_http::trace::TraceLayer;
|
|||
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};
|
||||
|
||||
use auth::{AuthUser, JwksKeys};
|
||||
use stdb::StdbClient;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct AppState {
|
||||
pub db: PgPool,
|
||||
pub jwks: JwksKeys,
|
||||
pub stdb: StdbClient,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
|
|
@ -20,6 +24,7 @@ struct HealthResponse {
|
|||
status: &'static str,
|
||||
version: &'static str,
|
||||
db: &'static str,
|
||||
stdb: &'static str,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
|
|
@ -61,7 +66,46 @@ async fn main() {
|
|||
.await
|
||||
.expect("Kunne ikke hente JWKS fra Authentik");
|
||||
|
||||
let state = AppState { db, jwks };
|
||||
// SpacetimeDB-klient
|
||||
let stdb_url = std::env::var("SPACETIMEDB_URL")
|
||||
.unwrap_or_else(|_| "http://spacetimedb:3000".to_string());
|
||||
let stdb_database = std::env::var("SPACETIMEDB_DATABASE")
|
||||
.unwrap_or_else(|_| "synops".to_string());
|
||||
|
||||
// Hent token fra miljøvariabel, eller opprett ny identitet
|
||||
let stdb_token = match std::env::var("SPACETIMEDB_TOKEN") {
|
||||
Ok(token) if !token.is_empty() => {
|
||||
tracing::info!("Bruker konfigurert STDB-token");
|
||||
token
|
||||
}
|
||||
_ => {
|
||||
tracing::info!("Ingen STDB-token konfigurert, oppretter ny identitet");
|
||||
let (identity, token) = StdbClient::create_identity(&stdb_url)
|
||||
.await
|
||||
.expect("Kunne ikke opprette STDB-identitet");
|
||||
tracing::info!("Opprettet STDB-identitet: {identity}");
|
||||
token
|
||||
}
|
||||
};
|
||||
|
||||
let stdb = StdbClient::new(&stdb_url, &stdb_database, &stdb_token);
|
||||
|
||||
// Warmup: last hele grafen fra PG til SpacetimeDB
|
||||
match warmup::run(&db, &stdb).await {
|
||||
Ok(stats) => {
|
||||
tracing::info!(
|
||||
"Warmup fullført: {} noder, {} edges",
|
||||
stats.nodes,
|
||||
stats.edges
|
||||
);
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!("Warmup feilet: {e}");
|
||||
// Fortsett likevel — STDB kan være midlertidig utilgjengelig
|
||||
}
|
||||
}
|
||||
|
||||
let state = AppState { db, jwks, stdb };
|
||||
|
||||
// Ruter: /health er offentlig, /me krever gyldig JWT
|
||||
let app = Router::new()
|
||||
|
|
@ -76,17 +120,31 @@ async fn main() {
|
|||
axum::serve(listener, app).await.unwrap();
|
||||
}
|
||||
|
||||
/// Offentlig helsesjekk — ingen auth påkrevd.
|
||||
/// Offentlig helsesjekk — verifiserer PG- og STDB-tilkobling.
|
||||
async fn health(State(state): State<AppState>) -> Result<Json<HealthResponse>, StatusCode> {
|
||||
sqlx::query("SELECT 1")
|
||||
.execute(&state.db)
|
||||
.await
|
||||
.map_err(|_| StatusCode::SERVICE_UNAVAILABLE)?;
|
||||
|
||||
// Sjekk STDB ved å kalle clear_all med en ufarlig test
|
||||
// (vi bruker en enkel healthcheck-reducer i fremtiden)
|
||||
let stdb_status = match state.stdb.create_node(
|
||||
"__healthcheck__", "system", "", "", "hidden", "{}", "",
|
||||
).await {
|
||||
Ok(()) => {
|
||||
// Rydd opp
|
||||
let _ = state.stdb.delete_node("__healthcheck__").await;
|
||||
"connected"
|
||||
}
|
||||
Err(_) => "unavailable",
|
||||
};
|
||||
|
||||
Ok(Json(HealthResponse {
|
||||
status: "ok",
|
||||
version: env!("CARGO_PKG_VERSION"),
|
||||
db: "connected",
|
||||
stdb: stdb_status,
|
||||
}))
|
||||
}
|
||||
|
||||
|
|
|
|||
286
maskinrommet/src/stdb.rs
Normal file
286
maskinrommet/src/stdb.rs
Normal file
|
|
@ -0,0 +1,286 @@
|
|||
// SpacetimeDB HTTP-klient for maskinrommet.
|
||||
//
|
||||
// Kaller STDB-reducere via HTTP JSON API.
|
||||
// Maskinrommet eier all skriving — denne klienten er eneste vei inn.
|
||||
//
|
||||
// API-format: POST /v1/database/{db}/call/{reducer}
|
||||
// Body: JSON-objekt med navngitte parametre.
|
||||
// Auth: Bearer-token fra STDB-identitet.
|
||||
//
|
||||
// Ref: docs/retninger/datalaget.md, docs/infra/synkronisering.md
|
||||
|
||||
use reqwest::Client;
|
||||
use serde::Serialize;
|
||||
|
||||
/// SpacetimeDB-klient som kaller reducere via HTTP.
|
||||
#[derive(Clone)]
|
||||
pub struct StdbClient {
|
||||
client: Client,
|
||||
base_url: String,
|
||||
database: String,
|
||||
token: String,
|
||||
}
|
||||
|
||||
impl StdbClient {
|
||||
/// Opprett ny klient. `base_url` er STDB-serverens URL (f.eks. "http://spacetimedb:3000").
|
||||
pub fn new(base_url: &str, database: &str, token: &str) -> Self {
|
||||
Self {
|
||||
client: Client::new(),
|
||||
base_url: base_url.trim_end_matches('/').to_string(),
|
||||
database: database.to_string(),
|
||||
token: token.to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Hent en ny identitet og token fra STDB-serveren.
|
||||
/// Brukes ved oppstart hvis ingen token er konfigurert.
|
||||
pub async fn create_identity(base_url: &str) -> Result<(String, String), StdbError> {
|
||||
let client = Client::new();
|
||||
let url = format!("{}/v1/identity", base_url.trim_end_matches('/'));
|
||||
let resp = client.post(&url).send().await?;
|
||||
|
||||
if !resp.status().is_success() {
|
||||
return Err(StdbError::Http(format!(
|
||||
"Kunne ikke opprette identitet: {}",
|
||||
resp.status()
|
||||
)));
|
||||
}
|
||||
|
||||
let body: serde_json::Value = resp.json().await?;
|
||||
let identity = body["identity"]
|
||||
.as_str()
|
||||
.ok_or_else(|| StdbError::Http("Mangler identity i respons".into()))?
|
||||
.to_string();
|
||||
let token = body["token"]
|
||||
.as_str()
|
||||
.ok_or_else(|| StdbError::Http("Mangler token i respons".into()))?
|
||||
.to_string();
|
||||
|
||||
Ok((identity, token))
|
||||
}
|
||||
|
||||
/// Kall en reducer med navngitte parametre (JSON-objekt).
|
||||
async fn call_reducer<T: Serialize>(&self, reducer: &str, args: &T) -> Result<(), StdbError> {
|
||||
let url = format!(
|
||||
"{}/v1/database/{}/call/{}",
|
||||
self.base_url, self.database, reducer
|
||||
);
|
||||
|
||||
let resp = self
|
||||
.client
|
||||
.post(&url)
|
||||
.bearer_auth(&self.token)
|
||||
.json(args)
|
||||
.send()
|
||||
.await?;
|
||||
|
||||
if resp.status().is_success() {
|
||||
Ok(())
|
||||
} else {
|
||||
let status = resp.status();
|
||||
let body = resp.text().await.unwrap_or_default();
|
||||
Err(StdbError::Reducer {
|
||||
reducer: reducer.to_string(),
|
||||
status: status.as_u16(),
|
||||
message: body,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// =========================================================================
|
||||
// Node-operasjoner
|
||||
// =========================================================================
|
||||
|
||||
pub async fn create_node(
|
||||
&self,
|
||||
id: &str,
|
||||
node_kind: &str,
|
||||
title: &str,
|
||||
content: &str,
|
||||
visibility: &str,
|
||||
metadata: &str,
|
||||
created_by: &str,
|
||||
) -> Result<(), StdbError> {
|
||||
#[derive(Serialize)]
|
||||
struct Args<'a> {
|
||||
id: &'a str,
|
||||
node_kind: &'a str,
|
||||
title: &'a str,
|
||||
content: &'a str,
|
||||
visibility: &'a str,
|
||||
metadata: &'a str,
|
||||
created_by: &'a str,
|
||||
}
|
||||
|
||||
self.call_reducer(
|
||||
"create_node",
|
||||
&Args {
|
||||
id,
|
||||
node_kind,
|
||||
title,
|
||||
content,
|
||||
visibility,
|
||||
metadata,
|
||||
created_by,
|
||||
},
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn update_node(
|
||||
&self,
|
||||
id: &str,
|
||||
node_kind: &str,
|
||||
title: &str,
|
||||
content: &str,
|
||||
visibility: &str,
|
||||
metadata: &str,
|
||||
) -> Result<(), StdbError> {
|
||||
#[derive(Serialize)]
|
||||
struct Args<'a> {
|
||||
id: &'a str,
|
||||
node_kind: &'a str,
|
||||
title: &'a str,
|
||||
content: &'a str,
|
||||
visibility: &'a str,
|
||||
metadata: &'a str,
|
||||
}
|
||||
|
||||
self.call_reducer(
|
||||
"update_node",
|
||||
&Args {
|
||||
id,
|
||||
node_kind,
|
||||
title,
|
||||
content,
|
||||
visibility,
|
||||
metadata,
|
||||
},
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn delete_node(&self, id: &str) -> Result<(), StdbError> {
|
||||
#[derive(Serialize)]
|
||||
struct Args<'a> {
|
||||
id: &'a str,
|
||||
}
|
||||
|
||||
self.call_reducer("delete_node", &Args { id }).await
|
||||
}
|
||||
|
||||
// =========================================================================
|
||||
// Edge-operasjoner
|
||||
// =========================================================================
|
||||
|
||||
pub async fn create_edge(
|
||||
&self,
|
||||
id: &str,
|
||||
source_id: &str,
|
||||
target_id: &str,
|
||||
edge_type: &str,
|
||||
metadata: &str,
|
||||
system: bool,
|
||||
created_by: &str,
|
||||
) -> Result<(), StdbError> {
|
||||
#[derive(Serialize)]
|
||||
struct Args<'a> {
|
||||
id: &'a str,
|
||||
source_id: &'a str,
|
||||
target_id: &'a str,
|
||||
edge_type: &'a str,
|
||||
metadata: &'a str,
|
||||
system: bool,
|
||||
created_by: &'a str,
|
||||
}
|
||||
|
||||
self.call_reducer(
|
||||
"create_edge",
|
||||
&Args {
|
||||
id,
|
||||
source_id,
|
||||
target_id,
|
||||
edge_type,
|
||||
metadata,
|
||||
system,
|
||||
created_by,
|
||||
},
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn update_edge(
|
||||
&self,
|
||||
id: &str,
|
||||
edge_type: &str,
|
||||
metadata: &str,
|
||||
) -> Result<(), StdbError> {
|
||||
#[derive(Serialize)]
|
||||
struct Args<'a> {
|
||||
id: &'a str,
|
||||
edge_type: &'a str,
|
||||
metadata: &'a str,
|
||||
}
|
||||
|
||||
self.call_reducer("update_edge", &Args { id, edge_type, metadata })
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn delete_edge(&self, id: &str) -> Result<(), StdbError> {
|
||||
#[derive(Serialize)]
|
||||
struct Args<'a> {
|
||||
id: &'a str,
|
||||
}
|
||||
|
||||
self.call_reducer("delete_edge", &Args { id }).await
|
||||
}
|
||||
|
||||
// =========================================================================
|
||||
// Vedlikehold
|
||||
// =========================================================================
|
||||
|
||||
/// Tøm alle noder og edges. Brukes ved warmup for å unngå duplikater.
|
||||
pub async fn clear_all(&self) -> Result<(), StdbError> {
|
||||
#[derive(Serialize)]
|
||||
struct Empty {}
|
||||
|
||||
self.call_reducer("clear_all", &Empty {}).await
|
||||
}
|
||||
}
|
||||
|
||||
// =============================================================================
|
||||
// Feilhåndtering
|
||||
// =============================================================================
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum StdbError {
|
||||
/// HTTP-transportfeil (nettverk, timeout)
|
||||
Http(String),
|
||||
/// Reducer returnerte feil (400, 500, etc.)
|
||||
Reducer {
|
||||
reducer: String,
|
||||
status: u16,
|
||||
message: String,
|
||||
},
|
||||
}
|
||||
|
||||
impl std::fmt::Display for StdbError {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
StdbError::Http(msg) => write!(f, "STDB HTTP-feil: {msg}"),
|
||||
StdbError::Reducer {
|
||||
reducer,
|
||||
status,
|
||||
message,
|
||||
} => write!(f, "STDB reducer {reducer} feilet ({status}): {message}"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::error::Error for StdbError {}
|
||||
|
||||
impl From<reqwest::Error> for StdbError {
|
||||
fn from(e: reqwest::Error) -> Self {
|
||||
StdbError::Http(e.to_string())
|
||||
}
|
||||
}
|
||||
108
maskinrommet/src/warmup.rs
Normal file
108
maskinrommet/src/warmup.rs
Normal file
|
|
@ -0,0 +1,108 @@
|
|||
// Warmup: last hele grafen fra PG til SpacetimeDB ved oppstart.
|
||||
//
|
||||
// Sekvens: clear_all → noder → edges.
|
||||
// Edges refererer til noder, så noder må lastes først.
|
||||
//
|
||||
// Ref: docs/infra/synkronisering.md
|
||||
|
||||
use sqlx::PgPool;
|
||||
|
||||
use crate::stdb::StdbClient;
|
||||
|
||||
/// Last hele grafen fra PG til SpacetimeDB.
|
||||
pub async fn run(db: &PgPool, stdb: &StdbClient) -> Result<WarmupStats, Box<dyn std::error::Error>> {
|
||||
tracing::info!("Warmup: starter (PG → SpacetimeDB)");
|
||||
|
||||
// 1. Tøm STDB for å unngå duplikater ved restart
|
||||
stdb.clear_all().await?;
|
||||
tracing::info!("Warmup: STDB tømt");
|
||||
|
||||
// 2. Last alle noder
|
||||
let nodes = sqlx::query_as::<_, PgNode>(
|
||||
"SELECT id, node_kind::text, title, content, visibility::text, \
|
||||
COALESCE(metadata::text, '{}') as metadata, \
|
||||
created_at, COALESCE(created_by::text, '') as created_by \
|
||||
FROM nodes ORDER BY created_at"
|
||||
)
|
||||
.fetch_all(db)
|
||||
.await?;
|
||||
|
||||
let node_count = nodes.len();
|
||||
for node in &nodes {
|
||||
stdb.create_node(
|
||||
&node.id.to_string(),
|
||||
&node.node_kind,
|
||||
&node.title,
|
||||
&node.content,
|
||||
&node.visibility,
|
||||
&node.metadata,
|
||||
&node.created_by,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
tracing::info!("Warmup: {node_count} noder lastet");
|
||||
|
||||
// 3. Last alle edges
|
||||
let edges = sqlx::query_as::<_, PgEdge>(
|
||||
"SELECT id, source_id, target_id, edge_type, \
|
||||
COALESCE(metadata::text, '{}') as metadata, \
|
||||
system, created_at, COALESCE(created_by::text, '') as created_by \
|
||||
FROM edges ORDER BY created_at"
|
||||
)
|
||||
.fetch_all(db)
|
||||
.await?;
|
||||
|
||||
let edge_count = edges.len();
|
||||
for edge in &edges {
|
||||
stdb.create_edge(
|
||||
&edge.id.to_string(),
|
||||
&edge.source_id.to_string(),
|
||||
&edge.target_id.to_string(),
|
||||
&edge.edge_type,
|
||||
&edge.metadata,
|
||||
edge.system,
|
||||
&edge.created_by,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
tracing::info!("Warmup: {edge_count} edges lastet");
|
||||
|
||||
let stats = WarmupStats {
|
||||
nodes: node_count,
|
||||
edges: edge_count,
|
||||
};
|
||||
tracing::info!("Warmup: ferdig ({} noder, {} edges)", stats.nodes, stats.edges);
|
||||
Ok(stats)
|
||||
}
|
||||
|
||||
pub struct WarmupStats {
|
||||
pub nodes: usize,
|
||||
pub edges: usize,
|
||||
}
|
||||
|
||||
// PG-radtyper for sqlx
|
||||
#[derive(sqlx::FromRow)]
|
||||
#[allow(dead_code)]
|
||||
struct PgNode {
|
||||
id: uuid::Uuid,
|
||||
node_kind: String,
|
||||
title: String,
|
||||
content: String,
|
||||
visibility: String,
|
||||
metadata: String,
|
||||
created_at: chrono::DateTime<chrono::Utc>,
|
||||
created_by: String,
|
||||
}
|
||||
|
||||
#[derive(sqlx::FromRow)]
|
||||
#[allow(dead_code)]
|
||||
struct PgEdge {
|
||||
id: uuid::Uuid,
|
||||
source_id: uuid::Uuid,
|
||||
target_id: uuid::Uuid,
|
||||
edge_type: String,
|
||||
metadata: String,
|
||||
system: bool,
|
||||
created_at: chrono::DateTime<chrono::Utc>,
|
||||
created_by: String,
|
||||
}
|
||||
Loading…
Add table
Reference in a new issue