Skip to content
Go back

Distributed Background Job Processing in Rust and Postgresql - Part 4

Part 4: Concurrency, Backpressure & Failure Handling

This is Part 4 of a 5-part series on building production-grade distributed job processing systems in Rust using PostgreSQL.

The Resource Exhaustion Problem

Without concurrency limits, a worker could dequeue jobs faster than it can process them. Memory bloats. Database connections exhaust. The system collapses under its own ambition.

We need backpressure — a mechanism that slows down job acquisition when the worker is at capacity.

Semaphore-Based Concurrency Control

Tokio provides an elegant primitive for this: the Semaphore. Think of it as a fixed pool of permits. Each concurrent job slot requires one permit. When all permits are held, the worker waits.

// Create worker with 4 concurrent job slots
let semaphore = Arc::new(Semaphore::new(config.max_concurrent_jobs));  // e.g., 4

Semaphore Flow

The dequeue loop acquires a permit before attempting to fetch a job:

async fn dequeue_loop(&self) {
    loop {
        // Block until a slot is available OR shutdown signal
        let permit = tokio::select! {
            _ = shutdown_rx.changed() => break,
            result = self.inner.semaphore.clone().acquire_owned() => {
                match result {
                    Ok(permit) => permit,
                    Err(_) => continue,  // Semaphore closed
                }
            }
        };

        // We have a permit - try to get a job
        match self.inner.repository.dequeue_job(...).await {
            Ok(Some(job)) => {
                // Spawn processing task, passing the permit
                let worker = self.clone();
                tokio::spawn(async move {
                    worker.process_job_task(job, permit).await;
                    // permit is dropped here, releasing the slot
                });
            }
            Ok(None) => {
                // No jobs available - release permit and wait
                drop(permit);
                sleep(poll_interval).await;
            }
            Err(e) => {
                drop(permit);
                error!("Dequeue error: {}", e);
                sleep(error_backoff).await;
            }
        }
    }
}

Key insight: The permit moves into the spawned task. It’s automatically released when the task completes (success or failure). No manual permit tracking required.

Why OwnedSemaphorePermit?

Rust’s ownership system shines here. We use acquire_owned() which returns an OwnedSemaphorePermit that can be moved across tasks:

async fn process_job_task(
    self: Arc<Self>,
    job: Job,
    _permit: OwnedSemaphorePermit,  // Held for lifetime of task
) {
    // ... process job ...
    // _permit dropped when function returns
}

The _permit parameter exists solely to tie its lifetime to the task. Even if we never reference it, it’s released when the task ends.

Dead Letter Queue: Handling Persistent Failures

Some jobs fail repeatedly despite retries. Maybe the payload is malformed. Maybe an external service is permanently down. These jobs need quarantine, not endless retries.

The Dead Letter Queue (DLQ) is a separate table where failed jobs go to await manual inspection:

CREATE TABLE dead_letter_jobs (
    id UUID PRIMARY KEY,
    original_job_id UUID NOT NULL,
    org_id UUID NOT NULL,
    job_type VARCHAR(255) NOT NULL,
    payload JSONB NOT NULL,
    attempts INT NOT NULL,
    max_attempts INT NOT NULL,
    error_message TEXT,
    execution_history JSONB,  -- All execution attempts
    created_at TIMESTAMPTZ DEFAULT NOW()
);

DLQ Flow

When a job exhausts its retries, we move it atomically:

WITH job_data AS (
    SELECT * FROM jobs WHERE id = $1
),
execution_history AS (
    -- Aggregate all execution attempts into JSON
    SELECT jsonb_agg(
        jsonb_build_object(
            'id', id,
            'worker_id', worker_id,
            'started_at', started_at,
            'completed_at', completed_at,
            'error_message', error_message
        )
    ) as history
    FROM job_executions
    WHERE job_id = $1
),
inserted AS (
    -- Insert into DLQ
    INSERT INTO dead_letter_jobs (
        original_job_id, org_id, job_type, payload,
        attempts, max_attempts, error_message, execution_history
    )
    SELECT
        id, org_id, job_type, payload,
        attempts, max_attempts, $2, history
    FROM job_data CROSS JOIN execution_history
    RETURNING *
),
deleted AS (
    -- Remove from main queue
    DELETE FROM jobs WHERE id = $1
)
SELECT * FROM inserted

The DLQ preserves everything needed for debugging:

Graceful Shutdown

When deploying new code or scaling down, workers should finish their current work before exiting. Abrupt termination means jobs return to the queue (after lease expiry), but graceful shutdown is cleaner.

The strategy:

  1. Signal shutdown: Broadcast to all internal loops
  2. Stop accepting: Exit the dequeue loop
  3. Wait for completion: Acquire all semaphore permits
  4. Timeout fallback: Force exit if jobs hang
async fn shutdown_gracefully(&self) {
    // Signal shutdown to dequeue loop
    let _ = self.inner.shutdown_tx.send(());

    let timeout_duration = Duration::from_secs(
        self.inner.config.shutdown_timeout_seconds
    );

    // Wait for all jobs to complete by acquiring all permits
    // If all permits are acquired, all job tasks have finished
    let acquire_all = self.inner.semaphore
        .acquire_many(self.inner.config.max_concurrent_jobs as u32);

    match tokio::time::timeout(timeout_duration, acquire_all).await {
        Ok(Ok(_permits)) => {
            info!("Graceful shutdown complete - all jobs finished");
        }
        Ok(Err(_)) => {
            warn!("Semaphore closed during shutdown");
        }
        Err(_) => {
            warn!("Shutdown timeout ({timeout_duration:?}), {} jobs still running",
                self.active_job_count());
        }
    }
}

The semaphore trick: If a worker has 4 slots and we successfully acquire 4 permits, that means all 4 slots are idle—all jobs have finished.

Retry Strategies

Not all failures are equal. A network timeout might succeed on retry; a malformed payload never will. Sophisticated systems use exponential backoff with jitter:

fn calculate_retry_delay(attempt: u32) -> Duration {
    let base_delay = Duration::from_secs(5);
    let max_delay = Duration::from_secs(300);

    // Exponential: 5s, 10s, 20s, 40s, 80s, 160s, 300s...
    let exponential = base_delay * 2_u32.pow(attempt.saturating_sub(1));
    let capped = exponential.min(max_delay);

    // Add jitter (±25%) to prevent thundering herd
    let jitter_range = capped.as_millis() / 4;
    let jitter = rand::thread_rng()
        .gen_range(0..=jitter_range as u64);

    capped + Duration::from_millis(jitter)
}

Jobs are scheduled with delays by setting scheduled_at in the future:

UPDATE jobs
SET status = 'pending',
    scheduled_at = NOW() + calculate_retry_delay(attempts),
    error_message = $1
WHERE id = $2

Monitoring and Observability

Production systems need visibility. Key metrics to track:

MetricWhat It Tells You
jobs_dequeued_totalThroughput
jobs_completed_totalSuccess rate
jobs_failed_totalFailure rate
job_duration_secondsProcessing time distribution
dlq_jobs_totalPersistent failure rate
worker_active_jobsCurrent load per worker
dequeue_wait_secondsTime waiting for jobs (saturation indicator)

In Rust with metrics crate:

use metrics::{counter, histogram, gauge};

// On job completion
counter!("jobs_completed_total",
    "job_type" => job.job_type.clone(),
    "queue" => job.queue_name.clone()
).increment(1);

histogram!("job_duration_seconds",
    "job_type" => job.job_type.clone()
).record(duration.as_secs_f64());

// Current load
gauge!("worker_active_jobs",
    "worker_id" => worker_id.clone()
).set((max_jobs - available_permits) as f64);

Summary

This part covered the operational aspects of a production job queue:

These mechanisms transform a simple queue into a resilient, production-ready system.


Part 5: Production Patterns — Cancellation, Handlers & Multi-Tenancy

Previous: Part 3: Reliability Through Leases & Heartbeats


Share this post on:


Previous Post
Không Sắc - Sắc Không
Next Post
Vô Tâm