← Week 2: Implementation

Day 11: DynamoDB State Store

Phase 7 · Oct 3, 2026

← Week 2: Implementation

Agenda (2–3 hours)

  • Implement (90 min): Write the full DynamoDB repository layer — event append, task meta projection update, and idempotency management
  • Test (60 min): Integration test against DynamoDB Local; verify event ordering, conditional failures, and idempotency
  • Review (30 min): Verify the repository layer correctly separates event writes from projection updates
← Week 2: Implementation

Event Append

pub async fn append_event(
    &self,
    task_id: &str,
    event: TaskEvent,
    expected_seq: u32,
) -> Result<(), AppError> {
    let seq = format!("{:04}", expected_seq);
    self.db.put_item()
        .table_name(&self.table)
        .item("PK",    av!(format!("TASK#{task_id}")))
        .item("SK",    av!(format!("EVENT#{seq}")))
        .item("event", AttributeValue::M(event.into_dynamo()))
        .condition_expression("attribute_not_exists(PK)")  // no duplicate events
        .send().await
        .map_err(|e| if is_condition_failed(&e) {
            AppError::OptimisticConflict
        } else {
            AppError::from(e)
        })
}
← Week 2: Implementation

Task Meta Projection

pub async fn update_task_status(
    &self,
    task_id: &str,
    new_status: TaskStatus,
    expected_status: TaskStatus,
) -> Result<(), AppError> {
    self.db.update_item()
        .table_name(&self.table)
        .key("PK", av!(format!("TASK#{task_id}")))
        .key("SK", av!("META"))
        .update_expression("SET #s = :new, updated_at = :now")
        .condition_expression("#s = :expected")
        .expression_attribute_names("#s", "status")
        .expression_attribute_values(":new",      av!(new_status.as_str()))
        .expression_attribute_values(":expected", av!(expected_status.as_str()))
        .expression_attribute_values(":now",      av!(Utc::now().to_rfc3339()))
        .send().await
        .map_err(|e| if is_condition_failed(&e) {
            AppError::StatusConflict { expected: expected_status, actual: TaskStatus::Unknown }
        } else {
            AppError::from(e)
        })
}
← Week 2: Implementation

Reading Task History

pub async fn get_task_events(&self, task_id: &str) -> Result<Vec<TaskEvent>, AppError> {
    let result = self.db.query()
        .table_name(&self.table)
        .key_condition_expression("PK = :pk AND begins_with(SK, :prefix)")
        .expression_attribute_values(":pk",     av!(format!("TASK#{task_id}")))
        .expression_attribute_values(":prefix", av!("EVENT#"))
        .scan_index_forward(true)  // ascending — event order
        .send().await?;

    result.items.unwrap_or_default()
        .into_iter()
        .map(TaskEvent::from_dynamo_item)
        .collect()
}
← Week 2: Implementation

Key Takeaways

  • attribute_not_exists(PK) on event append prevents duplicate event sequences
  • Conditional status update makes state transitions atomic and detectable
  • The repository layer exposes domain types (TaskEvent, TaskStatus) — DynamoDB attribute maps never leak above this layer
  • DynamoDB Local (amazon/dynamodb-local Docker image) enables fast integration tests without AWS

Tomorrow: retry logic, DLQ handling, and exponential backoff.