Jobbkø-worker: Rust-binær med polling, concurrency og retry
sidelinja-worker — tokio-basert orkestrator som poller job_queue: - SELECT FOR UPDATE SKIP LOCKED for trygg concurrent polling - Semaphore-styrt concurrency (--max-concurrent) - Eksponentiell backoff: 30s × 2^(attempts-1) - Handler-registry (HashMap<String, Box<dyn JobHandler>>) - Strukturert JSON-logging via tracing - Echo-handler for end-to-end testing - CLI: --database-url, --ai-gateway-url, --max-concurrent, --poll-interval Testet mot dev-database: echo-jobb fullført, ukjent type → retry → error. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
1d47119b1e
commit
3f8ef65c5f
6 changed files with 3337 additions and 0 deletions
3010
worker/Cargo.lock
generated
Normal file
3010
worker/Cargo.lock
generated
Normal file
File diff suppressed because it is too large
Load diff
18
worker/Cargo.toml
Normal file
18
worker/Cargo.toml
Normal file
|
|
@ -0,0 +1,18 @@
|
|||
[package]
|
||||
name = "sidelinja-worker"
|
||||
version = "0.1.0"
|
||||
edition = "2024"
|
||||
|
||||
[dependencies]
|
||||
tokio = { version = "1", features = ["full"] }
|
||||
sqlx = { version = "0.8", features = ["runtime-tokio", "tls-rustls", "postgres", "uuid", "chrono", "json"] }
|
||||
uuid = { version = "1", features = ["v4", "serde"] }
|
||||
chrono = { version = "0.4", features = ["serde"] }
|
||||
serde = { version = "1", features = ["derive"] }
|
||||
serde_json = "1"
|
||||
reqwest = { version = "0.12", features = ["json"] }
|
||||
clap = { version = "4", features = ["derive", "env"] }
|
||||
tracing = "0.1"
|
||||
tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] }
|
||||
async-trait = "0.1"
|
||||
anyhow = "1"
|
||||
22
worker/src/handlers/echo.rs
Normal file
22
worker/src/handlers/echo.rs
Normal file
|
|
@ -0,0 +1,22 @@
|
|||
use super::JobHandler;
|
||||
use serde_json::Value;
|
||||
use sqlx::PgPool;
|
||||
use tracing::info;
|
||||
use uuid::Uuid;
|
||||
|
||||
/// Test-handler som returnerer payload tilbake som resultat.
|
||||
/// Brukes for å verifisere at jobbkøen fungerer end-to-end.
|
||||
pub struct EchoHandler;
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl JobHandler for EchoHandler {
|
||||
async fn handle(
|
||||
&self,
|
||||
_pool: &PgPool,
|
||||
workspace_id: &Uuid,
|
||||
payload: &Value,
|
||||
) -> anyhow::Result<Option<Value>> {
|
||||
info!(workspace_id = %workspace_id, "Echo-handler kjører");
|
||||
Ok(Some(payload.clone()))
|
||||
}
|
||||
}
|
||||
38
worker/src/handlers/mod.rs
Normal file
38
worker/src/handlers/mod.rs
Normal file
|
|
@ -0,0 +1,38 @@
|
|||
use serde_json::Value;
|
||||
use sqlx::PgPool;
|
||||
use std::collections::HashMap;
|
||||
use uuid::Uuid;
|
||||
|
||||
mod echo;
|
||||
|
||||
/// Trait for jobbhandlere.
|
||||
/// Hver jobbtype implementerer dette.
|
||||
#[async_trait::async_trait]
|
||||
pub trait JobHandler: Send + Sync {
|
||||
async fn handle(
|
||||
&self,
|
||||
pool: &PgPool,
|
||||
workspace_id: &Uuid,
|
||||
payload: &Value,
|
||||
) -> anyhow::Result<Option<Value>>;
|
||||
}
|
||||
|
||||
pub type HandlerRegistry = HashMap<String, Box<dyn JobHandler>>;
|
||||
|
||||
/// Bygg registeret med alle tilgjengelige handlers.
|
||||
pub fn build_registry(http: reqwest::Client, ai_gateway_url: String) -> HandlerRegistry {
|
||||
let _ = (&http, &ai_gateway_url); // brukes av fremtidige handlers
|
||||
|
||||
let mut registry: HandlerRegistry = HashMap::new();
|
||||
|
||||
// Echo-handler for testing
|
||||
registry.insert("echo".into(), Box::new(echo::EchoHandler));
|
||||
|
||||
// Fremtidige handlers registreres her:
|
||||
// registry.insert("whisper_transcribe".into(), Box::new(whisper::WhisperHandler::new(http.clone())));
|
||||
// registry.insert("openrouter_analyze".into(), Box::new(ai::AnalyzeHandler::new(http.clone(), ai_gateway_url.clone())));
|
||||
// registry.insert("research_clip".into(), Box::new(ai::ResearchClipHandler::new(http.clone(), ai_gateway_url.clone())));
|
||||
// registry.insert("stats_parse".into(), Box::new(stats::StatsHandler));
|
||||
|
||||
registry
|
||||
}
|
||||
71
worker/src/main.rs
Normal file
71
worker/src/main.rs
Normal file
|
|
@ -0,0 +1,71 @@
|
|||
use clap::Parser;
|
||||
use sqlx::postgres::PgPoolOptions;
|
||||
use std::sync::Arc;
|
||||
use tracing::info;
|
||||
|
||||
mod handlers;
|
||||
mod worker;
|
||||
|
||||
#[derive(Parser)]
|
||||
#[command(name = "sidelinja-worker", about = "Jobbkø-worker for Sidelinja")]
|
||||
struct Cli {
|
||||
/// PostgreSQL connection string
|
||||
#[arg(
|
||||
long,
|
||||
env = "DATABASE_URL",
|
||||
default_value = "postgres://sidelinja:localdev@localhost:5432/sidelinja"
|
||||
)]
|
||||
database_url: String,
|
||||
|
||||
/// AI Gateway base URL
|
||||
#[arg(
|
||||
long,
|
||||
env = "AI_GATEWAY_URL",
|
||||
default_value = "http://localhost:4000/v1"
|
||||
)]
|
||||
ai_gateway_url: String,
|
||||
|
||||
/// Maks samtidige jobber
|
||||
#[arg(long, default_value = "3")]
|
||||
max_concurrent: usize,
|
||||
|
||||
/// Polling-intervall i sekunder
|
||||
#[arg(long, default_value = "1")]
|
||||
poll_interval: u64,
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
tracing_subscriber::fmt()
|
||||
.with_env_filter(
|
||||
tracing_subscriber::EnvFilter::try_from_default_env()
|
||||
.unwrap_or_else(|_| "sidelinja_worker=info,sqlx=warn".into()),
|
||||
)
|
||||
.json()
|
||||
.init();
|
||||
|
||||
let cli = Cli::parse();
|
||||
|
||||
info!(
|
||||
max_concurrent = cli.max_concurrent,
|
||||
poll_interval_s = cli.poll_interval,
|
||||
"Starter sidelinja-worker"
|
||||
);
|
||||
|
||||
let pool = PgPoolOptions::new()
|
||||
.max_connections(cli.max_concurrent as u32 + 2)
|
||||
.connect(&cli.database_url)
|
||||
.await?;
|
||||
|
||||
info!("Tilkoblet PostgreSQL");
|
||||
|
||||
let registry = Arc::new(handlers::build_registry(
|
||||
reqwest::Client::new(),
|
||||
cli.ai_gateway_url,
|
||||
));
|
||||
|
||||
let registered: Vec<&str> = registry.keys().map(|k| k.as_str()).collect();
|
||||
info!(?registered, "Registrerte jobbtyper");
|
||||
|
||||
worker::run(pool, registry, cli.max_concurrent, cli.poll_interval).await
|
||||
}
|
||||
178
worker/src/worker.rs
Normal file
178
worker/src/worker.rs
Normal file
|
|
@ -0,0 +1,178 @@
|
|||
use crate::handlers::HandlerRegistry;
|
||||
use chrono::Utc;
|
||||
use serde_json::Value;
|
||||
use sqlx::{PgPool, Row};
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::Semaphore;
|
||||
use tracing::{error, info, warn};
|
||||
use uuid::Uuid;
|
||||
|
||||
/// Hent og prosesser jobber i en uendelig loop.
|
||||
/// Semaphore begrenser antall samtidige jobber.
|
||||
pub async fn run(
|
||||
pool: PgPool,
|
||||
registry: Arc<HandlerRegistry>,
|
||||
max_concurrent: usize,
|
||||
poll_interval_secs: u64,
|
||||
) -> anyhow::Result<()> {
|
||||
let semaphore = Arc::new(Semaphore::new(max_concurrent));
|
||||
let interval = std::time::Duration::from_secs(poll_interval_secs);
|
||||
|
||||
loop {
|
||||
// Vent til en slot er ledig
|
||||
let permit = semaphore.clone().acquire_owned().await?;
|
||||
|
||||
// Forsøk å hente en jobb
|
||||
let job = claim_job(&pool).await;
|
||||
|
||||
match job {
|
||||
Ok(Some(job)) => {
|
||||
let pool = pool.clone();
|
||||
let registry = registry.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
let _permit = permit; // holdes til jobben er ferdig
|
||||
process_job(&pool, ®istry, job).await;
|
||||
});
|
||||
}
|
||||
Ok(None) => {
|
||||
drop(permit);
|
||||
tokio::time::sleep(interval).await;
|
||||
}
|
||||
Err(e) => {
|
||||
error!(error = %e, "Feil ved henting av jobb");
|
||||
drop(permit);
|
||||
tokio::time::sleep(interval).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct Job {
|
||||
id: Uuid,
|
||||
workspace_id: Uuid,
|
||||
job_type: String,
|
||||
payload: Value,
|
||||
attempts: i16,
|
||||
max_attempts: i16,
|
||||
}
|
||||
|
||||
/// Claim neste jobb med FOR UPDATE SKIP LOCKED.
|
||||
async fn claim_job(pool: &PgPool) -> anyhow::Result<Option<Job>> {
|
||||
let row = sqlx::query(
|
||||
r#"
|
||||
UPDATE job_queue SET
|
||||
status = 'running',
|
||||
started_at = now(),
|
||||
attempts = attempts + 1
|
||||
WHERE id = (
|
||||
SELECT id FROM job_queue
|
||||
WHERE status IN ('pending', 'retry')
|
||||
AND scheduled_for <= now()
|
||||
ORDER BY priority DESC, scheduled_for ASC
|
||||
LIMIT 1
|
||||
FOR UPDATE SKIP LOCKED
|
||||
)
|
||||
RETURNING id, workspace_id, job_type, payload, attempts, max_attempts
|
||||
"#,
|
||||
)
|
||||
.fetch_optional(pool)
|
||||
.await?;
|
||||
|
||||
Ok(row.map(|r| Job {
|
||||
id: r.get("id"),
|
||||
workspace_id: r.get("workspace_id"),
|
||||
job_type: r.get("job_type"),
|
||||
payload: r.get("payload"),
|
||||
attempts: r.get("attempts"),
|
||||
max_attempts: r.get("max_attempts"),
|
||||
}))
|
||||
}
|
||||
|
||||
/// Prosesser en jobb: dispatch til handler, oppdater status.
|
||||
async fn process_job(pool: &PgPool, registry: &HandlerRegistry, job: Job) {
|
||||
info!(
|
||||
job_id = %job.id,
|
||||
job_type = %job.job_type,
|
||||
workspace_id = %job.workspace_id,
|
||||
attempt = job.attempts,
|
||||
"Starter jobb"
|
||||
);
|
||||
|
||||
let handler = registry.get(&job.job_type);
|
||||
|
||||
let result = match handler {
|
||||
Some(handler) => handler.handle(pool, &job.workspace_id, &job.payload).await,
|
||||
None => {
|
||||
warn!(job_type = %job.job_type, "Ukjent jobbtype — ingen handler registrert");
|
||||
Err(anyhow::anyhow!("Ukjent jobbtype: {}", job.job_type))
|
||||
}
|
||||
};
|
||||
|
||||
match result {
|
||||
Ok(result_data) => {
|
||||
info!(job_id = %job.id, "Jobb fullført");
|
||||
let _ = sqlx::query(
|
||||
r#"
|
||||
UPDATE job_queue SET
|
||||
status = 'completed',
|
||||
result = $1,
|
||||
completed_at = now()
|
||||
WHERE id = $2
|
||||
"#,
|
||||
)
|
||||
.bind(&result_data)
|
||||
.bind(job.id)
|
||||
.execute(pool)
|
||||
.await;
|
||||
}
|
||||
Err(e) => {
|
||||
error!(job_id = %job.id, error = %e, "Jobb feilet");
|
||||
|
||||
if job.attempts < job.max_attempts {
|
||||
// Retry med eksponentiell backoff: 30s × 2^(attempts-1)
|
||||
let backoff_secs = 30i64 * 2i64.pow((job.attempts - 1) as u32);
|
||||
let scheduled_for = Utc::now() + chrono::Duration::seconds(backoff_secs);
|
||||
|
||||
info!(
|
||||
job_id = %job.id,
|
||||
next_retry = %scheduled_for,
|
||||
backoff_secs,
|
||||
"Setter opp retry"
|
||||
);
|
||||
|
||||
let _ = sqlx::query(
|
||||
r#"
|
||||
UPDATE job_queue SET
|
||||
status = 'retry',
|
||||
error_msg = $1,
|
||||
scheduled_for = $2
|
||||
WHERE id = $3
|
||||
"#,
|
||||
)
|
||||
.bind(format!("{e}"))
|
||||
.bind(scheduled_for)
|
||||
.bind(job.id)
|
||||
.execute(pool)
|
||||
.await;
|
||||
} else {
|
||||
warn!(job_id = %job.id, attempts = job.attempts, "Maks forsøk nådd — permanent feil");
|
||||
|
||||
let _ = sqlx::query(
|
||||
r#"
|
||||
UPDATE job_queue SET
|
||||
status = 'error',
|
||||
error_msg = $1,
|
||||
completed_at = now()
|
||||
WHERE id = $2
|
||||
"#,
|
||||
)
|
||||
.bind(format!("{e}"))
|
||||
.bind(job.id)
|
||||
.execute(pool)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Reference in a new issue