← Week 2: Implementation

Day 9: Task Scheduler — Submit and Claim

Phase 7 · Oct 1, 2026

← Week 2: Implementation

Agenda (2–3 hours)

  • Implement (90 min): Write SubmitTask handler — idempotency check, DynamoDB event write, SQS enqueue; write ClaimTask — conditional DynamoDB update
  • Test (60 min): Integration test SubmitTask with duplicate idempotency key; verify second call returns same task_id without double-writing
  • Review (30 min): Verify the write path is consistent with the event sourcing model from Phase 4
← Week 2: Implementation

SubmitTask Handler

#[instrument(skip(state), fields(idempotency_key=%req.idempotency_key))]
async fn submit_task(
    State(state): State<AppState>,
    req: SubmitTaskRequest,
) -> Result<SubmitTaskResponse, AppError> {
    // 1. Idempotency check
    if let Some(existing) = state.db.get_idempotency(req.idempotency_key).await? {
        return Ok(SubmitTaskResponse { task_id: existing.task_id, status: existing.status });
    }

    let task_id = Uuid::new_v4().to_string();

    // 2. Write event + idempotency record atomically
    state.db.transact_write(vec![
        put_event(&task_id, TaskEvent::Submitted { priority: req.priority }),
        put_idempotency(&req.idempotency_key, &task_id),
        put_task_meta(&task_id, TaskStatus::Pending, req.task_type),
    ]).await?;

    // 3. Enqueue (outbox: if SQS fails, task is still in DynamoDB as PENDING)
    state.sqs.send_task_message(&task_id, req.task_type, req.payload).await?;

    counter!("tasks_submitted_total", "task_type" => req.task_type.clone()).increment(1);
    Ok(SubmitTaskResponse { task_id, status: TaskStatus::Pending })
}
← Week 2: Implementation

Claim Task (Worker Side)

async fn claim_task(db: &DynamoClient, task_id: &str, worker_id: &str) -> Result<bool, Error> {
    let result = db.update_item()
        .table_name("task-queue")
        .key("PK", av!(format!("TASK#{task_id}")))
        .key("SK", av!("META"))
        .update_expression("SET #s = :processing, worker_id = :wid")
        .condition_expression("#s = :pending")  // atomic claim
        .expression_attribute_names("#s", "status")
        .expression_attribute_values(":processing", av!("PROCESSING"))
        .expression_attribute_values(":pending",    av!("PENDING"))
        .expression_attribute_values(":wid",        av!(worker_id))
        .send().await;

    match result {
        Ok(_) => Ok(true),
        Err(e) if is_condition_failed(&e) => Ok(false), // already claimed
        Err(e) => Err(e.into()),
    }
}
← Week 2: Implementation

Key Takeaways

  • TransactWriteItems makes the event + idempotency record atomic — no partial writes
  • The idempotency check must happen before the TransactWrite; use strongly consistent GetItem
  • Conditional update for claim (status=PENDING) is atomic — no two workers can claim the same task
  • SQS SendMessage after DynamoDB write: if SQS fails, the task stays as PENDING and can be re-submitted or picked up by a scanner

Tomorrow: gRPC API service — tonic server, mTLS, and streaming.