← Week 2: Implementation

Day 8: SQS Worker Loop

Phase 7 · Sep 30, 2026

← Week 2: Implementation

Agenda (2–3 hours)

  • Implement (90 min): Write the SQS long-poll worker loop with graceful shutdown and visibility timeout heartbeat
  • Test (60 min): Unit test the worker loop with a mock SQS client; verify heartbeat fires before timeout expires
  • Review (30 min): Code review against Phase 2 async patterns and Phase 5 SQS best practices
← Week 2: Implementation

Worker Loop Structure

pub async fn run_worker(
    sqs: Arc<SqsClient>,
    db: Arc<DynamoClient>,
    queue_url: String,
    cancel: CancellationToken,
    metrics: WorkerMetrics,
) {
    let mut set = JoinSet::new();

    loop {
        tokio::select! {
            _ = cancel.cancelled() => break,
            msgs = poll_sqs(&sqs, &queue_url) => {
                for msg in msgs {
                    let ctx = (sqs.clone(), db.clone(), queue_url.clone(), metrics.clone());
                    set.spawn(process_message(ctx, msg));
                }
            }
        }
        // reap completed tasks
        while let Some(res) = set.try_join_next() {
            if let Err(e) = res { tracing::error!(error=%e, "task panicked"); }
        }
    }
    set.shutdown().await;
}
← Week 2: Implementation

Visibility Timeout Heartbeat

async fn process_message(ctx: Ctx, msg: Message) -> Result<(), Error> {
    let receipt = msg.receipt_handle.unwrap();
    let heartbeat = tokio::spawn(heartbeat_loop(
        ctx.sqs.clone(), ctx.queue_url.clone(), receipt.clone(),
        Duration::from_secs(20), // heartbeat every 20s for a 30s timeout
    ));

    let result = execute_task(&ctx.db, &msg).await;

    heartbeat.abort();  // stop heartbeat before acking

    match result {
        Ok(_)  => delete_message(&ctx.sqs, &ctx.queue_url, &receipt).await?,
        Err(e) => tracing::warn!(error=%e, "task failed; SQS will re-deliver"),
    }
    Ok(())
}
← Week 2: Implementation

Graceful Shutdown on SIGTERM

#[tokio::main]
async fn main() {
    let cancel = CancellationToken::new();

    let cancel_clone = cancel.clone();
    tokio::spawn(async move {
        tokio::signal::unix::signal(SignalKind::terminate())
            .unwrap()
            .recv()
            .await;
        tracing::info!("SIGTERM received; shutting down");
        cancel_clone.cancel();
    });

    run_worker(sqs, db, queue_url, cancel, metrics).await;
}
← Week 2: Implementation

Key Takeaways

  • JoinSet bounds concurrency and reaps completed tasks; shutdown() waits for all in-flight tasks on cancellation
  • Heartbeat loop must fire before visibility timeout expiry to prevent re-delivery during processing
  • heartbeat.abort() stops the heartbeat before DeleteMessage — prevents racing with SQS
  • SIGTERM → CancellationToken → graceful drain → ECS marks task stopped

Tomorrow: task scheduler — the API service that handles SubmitTask and claim coordination.