use crate::handlers::HandlerRegistry; use chrono::Utc; use serde_json::Value; use sqlx::{PgPool, Row}; use std::sync::Arc; use tokio::sync::Semaphore; use tracing::{error, info, warn}; use uuid::Uuid; /// Hent og prosesser jobber i en uendelig loop. /// Semaphore begrenser antall samtidige jobber. pub async fn run( pool: PgPool, registry: Arc, max_concurrent: usize, poll_interval_secs: u64, ) -> anyhow::Result<()> { let semaphore = Arc::new(Semaphore::new(max_concurrent)); let interval = std::time::Duration::from_secs(poll_interval_secs); loop { // Vent til en slot er ledig let permit = semaphore.clone().acquire_owned().await?; // Forsøk å hente en jobb let job = claim_job(&pool).await; match job { Ok(Some(job)) => { let pool = pool.clone(); let registry = registry.clone(); tokio::spawn(async move { let _permit = permit; // holdes til jobben er ferdig process_job(&pool, ®istry, job).await; }); } Ok(None) => { drop(permit); tokio::time::sleep(interval).await; } Err(e) => { error!(error = %e, "Feil ved henting av jobb"); drop(permit); tokio::time::sleep(interval).await; } } } } #[derive(Debug)] struct Job { id: Uuid, workspace_id: Uuid, job_type: String, payload: Value, attempts: i16, max_attempts: i16, } /// Claim neste jobb med FOR UPDATE SKIP LOCKED. async fn claim_job(pool: &PgPool) -> anyhow::Result> { let row = sqlx::query( r#" UPDATE job_queue SET status = 'running', started_at = now(), attempts = attempts + 1 WHERE id = ( SELECT id FROM job_queue WHERE status IN ('pending', 'retry') AND scheduled_for <= now() ORDER BY priority DESC, scheduled_for ASC LIMIT 1 FOR UPDATE SKIP LOCKED ) RETURNING id, workspace_id, job_type, payload, attempts, max_attempts "#, ) .fetch_optional(pool) .await?; Ok(row.map(|r| Job { id: r.get("id"), workspace_id: r.get("workspace_id"), job_type: r.get("job_type"), payload: r.get("payload"), attempts: r.get("attempts"), max_attempts: r.get("max_attempts"), })) } /// Prosesser en jobb: dispatch til handler, oppdater status. async fn process_job(pool: &PgPool, registry: &HandlerRegistry, job: Job) { info!( job_id = %job.id, job_type = %job.job_type, workspace_id = %job.workspace_id, attempt = job.attempts, "Starter jobb" ); let handler = registry.get(&job.job_type); let result = match handler { Some(handler) => handler.handle(pool, &job.workspace_id, &job.id, &job.payload).await, None => { warn!(job_type = %job.job_type, "Ukjent jobbtype — ingen handler registrert"); Err(anyhow::anyhow!("Ukjent jobbtype: {}", job.job_type)) } }; match result { Ok(result_data) => { info!(job_id = %job.id, "Jobb fullført"); let _ = sqlx::query( r#" UPDATE job_queue SET status = 'completed', result = $1, completed_at = now() WHERE id = $2 "#, ) .bind(&result_data) .bind(job.id) .execute(pool) .await; } Err(e) => { error!(job_id = %job.id, error = %e, "Jobb feilet"); if job.attempts < job.max_attempts { // Retry med eksponentiell backoff: 30s × 2^(attempts-1) let backoff_secs = 30i64 * 2i64.pow((job.attempts - 1) as u32); let scheduled_for = Utc::now() + chrono::Duration::seconds(backoff_secs); info!( job_id = %job.id, next_retry = %scheduled_for, backoff_secs, "Setter opp retry" ); let _ = sqlx::query( r#" UPDATE job_queue SET status = 'retry', error_msg = $1, scheduled_for = $2 WHERE id = $3 "#, ) .bind(format!("{e}")) .bind(scheduled_for) .bind(job.id) .execute(pool) .await; } else { warn!(job_id = %job.id, attempts = job.attempts, "Maks forsøk nådd — permanent feil"); let _ = sqlx::query( r#" UPDATE job_queue SET status = 'error', error_msg = $1, completed_at = now() WHERE id = $2 "#, ) .bind(format!("{e}")) .bind(job.id) .execute(pool) .await; } } } }