← Week 1: Messaging: SQS, SNS, EventBridge

Day 2: SQS Consumer in Rust

Phase 5 · Aug 13, 2026

← Week 1: Messaging: SQS, SNS, EventBridge

Agenda (2–3 hours)

  • Read (45 min): AWS SDK for Rust documentation (aws-sdk-sqs); aws-config crate README
  • Study (45 min): How does the AWS SDK for Rust handle credential resolution? What is the credential provider chain?
  • Practice (45 min): Build a production-quality SQS consumer in Rust: batch receive, process concurrently, delete on success, DLQ on failure
  • Challenge (30 min): Implement a visibility timeout heartbeat: extend the timeout every N seconds while processing, stop if the task completes or the token is cancelled
← Week 1: Messaging: SQS, SNS, EventBridge

AWS SDK Setup

[dependencies]
aws-config = { version = "1.0", features = ["behavior-version-latest"] }
aws-sdk-sqs = "1.0"
tokio = { version = "1", features = ["full"] }
use aws_config::BehaviorVersion;
use aws_sdk_sqs::Client;

#[tokio::main]
async fn main() {
    let config = aws_config::load_defaults(BehaviorVersion::latest()).await;
    let client = Client::new(&config);
    run_consumer(client).await;
}
← Week 1: Messaging: SQS, SNS, EventBridge

Poll Loop

async fn run_consumer(client: Client, queue_url: String) {
    loop {
        let output = client.receive_message()
            .queue_url(&queue_url)
            .max_number_of_messages(10)
            .wait_time_seconds(20)
            .attribute_names(QueueAttributeName::ApproximateReceiveCount)
            .send()
            .await
            .expect("receive failed");

        let messages = output.messages.unwrap_or_default();
        if messages.is_empty() { continue; }

        let handles = messages.into_iter().map(|msg| {
            let client = client.clone();
            let queue_url = queue_url.clone();
            tokio::spawn(async move { process_message(client, queue_url, msg).await })
        });
        join_all(handles).await;
    }
}
← Week 1: Messaging: SQS, SNS, EventBridge

Message Processing

async fn process_message(client: Client, queue_url: String, msg: Message) {
    let receipt = msg.receipt_handle().unwrap().to_string();
    let body = msg.body().unwrap_or_default();

    match process_payload(body).await {
        Ok(()) => {
            // Delete on success
            client.delete_message()
                .queue_url(&queue_url)
                .receipt_handle(&receipt)
                .send().await.unwrap();
        }
        Err(e) => {
            error!(%e, "processing failed — message will be retried or sent to DLQ");
            // Don't delete — let visibility timeout expire for retry
        }
    }
}
← Week 1: Messaging: SQS, SNS, EventBridge

Visibility Heartbeat

async fn visibility_heartbeat(
    client: Client,
    queue_url: String,
    receipt: String,
    token: CancellationToken,
) {
    let mut interval = tokio::time::interval(Duration::from_secs(25));
    loop {
        tokio::select! {
            _ = interval.tick() => {
                client.change_message_visibility()
                    .queue_url(&queue_url)
                    .receipt_handle(&receipt)
                    .visibility_timeout(30)
                    .send().await.ok();
            }
            _ = token.cancelled() => break,
        }
    }
}

Spawn this task when processing starts; cancel the token when processing completes.

← Week 1: Messaging: SQS, SNS, EventBridge

Key Takeaways

  • AWS SDK for Rust uses async/await natively; credential chain resolves env vars → IAM role → profile
  • Always use long polling (wait_time_seconds(20)) and batch receive (max_number_of_messages(10))
  • Delete message only on success; let visibility timeout expire for retries
  • Visibility heartbeat prevents message becoming visible during long processing tasks

Tomorrow: SNS fan-out — publishing one event to many consumers.