Legg til node_access i STDB + synk fra maskinrommet

Visibility-filtrering (oppgave 4.3, del 1/2):
- Ny node_access-tabell i STDB-modulen som speiler PG
- Reducers: upsert_node_access, delete_node_access, delete_node_access_for_subject
- STDB-klient i maskinrommet: metoder for node_access
- Warmup synker node_access fra PG til STDB ved oppstart
- Tilgangsgivende edges synker node_access til STDB etter PG-commit
- clear_all tømmer også node_access

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
vegard 2026-03-17 15:09:55 +01:00
parent 68bcd57d84
commit 8fa2849f0c
5 changed files with 220 additions and 5 deletions

View file

@ -309,6 +309,7 @@ pub async fn create_edge(
let edge_type = req.edge_type.clone();
spawn_pg_insert_edge(
state.db.clone(),
state.stdb.clone(),
edge_id,
req.source_id,
req.target_id,
@ -569,8 +570,10 @@ fn edge_type_to_access_level(edge_type: &str) -> Option<&'static str> {
/// Spawner en tokio-task som skriver edgen til PostgreSQL i bakgrunnen.
/// For tilgangsgivende edges (owner, admin, member_of, reader) kalles
/// recompute_access i samme transaksjon — ingen vindu med stale tilgang.
/// Synker også node_access til STDB for visibility-filtrering i frontend.
fn spawn_pg_insert_edge(
db: PgPool,
stdb: crate::stdb::StdbClient,
edge_id: Uuid,
source_id: Uuid,
target_id: Uuid,
@ -593,6 +596,9 @@ fn spawn_pg_insert_edge(
access_level = %level,
"Edge + node_access persistert til PostgreSQL"
);
// Synk oppdatert node_access til STDB
sync_node_access_to_stdb(&db, &stdb, source_id).await;
}
Err(e) => {
tracing::error!(
@ -632,6 +638,58 @@ fn spawn_pg_insert_edge(
});
}
/// Synkroniserer node_access-rader for et subject fra PG til STDB.
/// Kalles etter recompute_access for å holde STDB i synk.
async fn sync_node_access_to_stdb(db: &PgPool, stdb: &crate::stdb::StdbClient, subject_id: Uuid) {
let rows = sqlx::query_as::<_, NodeAccessRow>(
"SELECT subject_id, object_id, access::text as access, \
COALESCE(via_edge::text, '') as via_edge \
FROM node_access WHERE subject_id = $1",
)
.bind(subject_id)
.fetch_all(db)
.await;
match rows {
Ok(rows) => {
for row in &rows {
if let Err(e) = stdb
.upsert_node_access(
&row.subject_id.to_string(),
&row.object_id.to_string(),
&row.access,
&row.via_edge,
)
.await
{
tracing::error!(
subject_id = %row.subject_id,
object_id = %row.object_id,
error = %e,
"Kunne ikke synke node_access til STDB"
);
}
}
tracing::info!(
subject_id = %subject_id,
count = rows.len(),
"node_access synket til STDB"
);
}
Err(e) => {
tracing::error!(subject_id = %subject_id, error = %e, "Kunne ikke hente node_access fra PG");
}
}
}
#[derive(sqlx::FromRow)]
struct NodeAccessRow {
subject_id: Uuid,
object_id: Uuid,
access: String,
via_edge: String,
}
/// Inserter en tilgangsgivende edge og oppdaterer node_access i én transaksjon.
/// source_id = subject (bruker/team), target_id = object (noden det gis tilgang til).
async fn insert_edge_with_access(

View file

@ -95,9 +95,10 @@ async fn main() {
match warmup::run(&db, &stdb).await {
Ok(stats) => {
tracing::info!(
"Warmup fullført: {} noder, {} edges",
"Warmup fullført: {} noder, {} edges, {} access",
stats.nodes,
stats.edges
stats.edges,
stats.access
);
}
Err(e) => {

View file

@ -235,6 +235,52 @@ impl StdbClient {
self.call_reducer("delete_edge", &Args { id }).await
}
// =========================================================================
// NodeAccess-operasjoner
// =========================================================================
pub async fn upsert_node_access(
&self,
subject_id: &str,
object_id: &str,
access: &str,
via_edge: &str,
) -> Result<(), StdbError> {
#[derive(Serialize)]
struct Args<'a> {
subject_id: &'a str,
object_id: &'a str,
access: &'a str,
via_edge: &'a str,
}
self.call_reducer(
"upsert_node_access",
&Args {
subject_id,
object_id,
access,
via_edge,
},
)
.await
}
pub async fn delete_node_access(
&self,
subject_id: &str,
object_id: &str,
) -> Result<(), StdbError> {
#[derive(Serialize)]
struct Args<'a> {
subject_id: &'a str,
object_id: &'a str,
}
self.call_reducer("delete_node_access", &Args { subject_id, object_id })
.await
}
// =========================================================================
// Vedlikehold
// =========================================================================

View file

@ -68,17 +68,40 @@ pub async fn run(db: &PgPool, stdb: &StdbClient) -> Result<WarmupStats, Box<dyn
}
tracing::info!("Warmup: {edge_count} edges lastet");
// 4. Last alle node_access-rader
let access_rows = sqlx::query_as::<_, PgNodeAccess>(
"SELECT subject_id, object_id, access::text, \
COALESCE(via_edge::text, '') as via_edge \
FROM node_access"
)
.fetch_all(db)
.await?;
let access_count = access_rows.len();
for row in &access_rows {
stdb.upsert_node_access(
&row.subject_id.to_string(),
&row.object_id.to_string(),
&row.access,
&row.via_edge,
)
.await?;
}
tracing::info!("Warmup: {access_count} node_access-rader lastet");
let stats = WarmupStats {
nodes: node_count,
edges: edge_count,
access: access_count,
};
tracing::info!("Warmup: ferdig ({} noder, {} edges)", stats.nodes, stats.edges);
tracing::info!("Warmup: ferdig ({} noder, {} edges, {} access)", stats.nodes, stats.edges, stats.access);
Ok(stats)
}
pub struct WarmupStats {
pub nodes: usize,
pub edges: usize,
pub access: usize,
}
// PG-radtyper for sqlx
@ -95,6 +118,15 @@ struct PgNode {
created_by: String,
}
#[derive(sqlx::FromRow)]
#[allow(dead_code)]
struct PgNodeAccess {
subject_id: uuid::Uuid,
object_id: uuid::Uuid,
access: String,
via_edge: String,
}
#[derive(sqlx::FromRow)]
#[allow(dead_code)]
struct PgEdge {

View file

@ -29,6 +29,22 @@ pub struct Node {
pub created_by: String,
}
/// Speiler PG node_access-tabellen (materialisert tilgangsmatrise).
/// subject_id = bruker/team, object_id = noden det gis tilgang til.
/// Brukes av frontend for visibility-filtrering.
#[spacetimedb::table(accessor = node_access, public)]
pub struct NodeAccess {
#[primary_key]
pub id: String, // "{subject_id}:{object_id}" — kompositt-nøkkel som streng
#[index(btree)]
pub subject_id: String,
#[index(btree)]
pub object_id: String,
pub access: String, // reader, member, admin, owner
pub via_edge: String,
}
/// Speiler PG edges-tabellen.
#[spacetimedb::table(accessor = edge, public)]
pub struct Edge {
@ -140,6 +156,64 @@ pub fn delete_node(ctx: &ReducerContext, id: String) -> Result<(), String> {
Ok(())
}
// =============================================================================
// NodeAccess CRUD
// =============================================================================
/// Upsert: opprett eller oppdater tilgang. id = "{subject_id}:{object_id}".
#[reducer]
pub fn upsert_node_access(
ctx: &ReducerContext,
subject_id: String,
object_id: String,
access: String,
via_edge: String,
) -> Result<(), String> {
let id = format!("{subject_id}:{object_id}");
if let Some(existing) = ctx.db.node_access().id().find(&id) {
ctx.db.node_access().id().update(NodeAccess {
access,
via_edge,
..existing
});
} else {
ctx.db.node_access().insert(NodeAccess {
id,
subject_id,
object_id,
access,
via_edge,
});
}
Ok(())
}
/// Slett en spesifikk tilgangsrad.
#[reducer]
pub fn delete_node_access(
ctx: &ReducerContext,
subject_id: String,
object_id: String,
) -> Result<(), String> {
let id = format!("{subject_id}:{object_id}");
ctx.db.node_access().id().delete(&id);
Ok(())
}
/// Slett all tilgang for et gitt subject (brukes ved fjerning av bruker/team).
#[reducer]
pub fn delete_node_access_for_subject(
ctx: &ReducerContext,
subject_id: String,
) -> Result<(), String> {
let entries: Vec<_> = ctx.db.node_access().subject_id().filter(&subject_id).collect();
for entry in entries {
ctx.db.node_access().id().delete(&entry.id);
}
Ok(())
}
// =============================================================================
// Edge CRUD
// =============================================================================
@ -203,9 +277,13 @@ pub fn delete_edge(ctx: &ReducerContext, id: String) -> Result<(), String> {
// Warmup/vedlikehold
// =============================================================================
/// Tøm alle noder og edges (brukes ved restart/warmup for å unngå duplikater)
/// Tøm alle noder, edges og node_access (brukes ved restart/warmup for å unngå duplikater)
#[reducer]
pub fn clear_all(ctx: &ReducerContext) -> Result<(), String> {
let all_access: Vec<_> = ctx.db.node_access().iter().collect();
for a in all_access {
ctx.db.node_access().id().delete(&a.id);
}
let all_edges: Vec<_> = ctx.db.edge().iter().collect();
for e in all_edges {
ctx.db.edge().id().delete(&e.id);
@ -214,6 +292,6 @@ pub fn clear_all(ctx: &ReducerContext) -> Result<(), String> {
for n in all_nodes {
ctx.db.node().id().delete(&n.id);
}
log::info!("Alle noder og edges slettet (clear_all)");
log::info!("Alle noder, edges og node_access slettet (clear_all)");
Ok(())
}