← Week 2: Implementation

Day 10: gRPC API Service

Phase 7 · Oct 2, 2026

← Week 2: Implementation

Agenda (2–3 hours)

  • Implement (90 min): Wire the tonic gRPC server — all four RPCs, mTLS, deadline propagation, and OTel instrumentation interceptor
  • Test (60 min): Integration test each RPC with grpcurl; verify ListTasks streams correctly; test deadline cancellation
  • Review (30 min): Verify error codes are correct (ALREADY_EXISTS for duplicate submit, NOT_FOUND for missing task)
← Week 2: Implementation

Tonic Server Setup

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    init_tracing();

    let tls = ServerTlsConfig::new()
        .identity(Identity::from_pem(cert_pem, key_pem))
        .client_ca_root(Certificate::from_pem(ca_pem));

    let handler = TaskQueueHandler::new(db, sqs);

    Server::builder()
        .tls_config(tls)?
        .layer(tower::ServiceBuilder::new()
            .layer(OtelGrpcLayer::default())     // tracing
            .timeout(Duration::from_secs(30)))   // global deadline
        .add_service(TaskQueueServer::new(handler))
        .serve("[::]:50051".parse()?)
        .await?;
    Ok(())
}
← Week 2: Implementation

ListTasks Streaming RPC

type ListTasksStream = ReceiverStream<Result<ListTasksResponse, Status>>;

async fn list_tasks(
    &self,
    req: Request<ListTasksRequest>,
) -> Result<Response<Self::ListTasksStream>, Status> {
    let (tx, rx) = tokio::sync::mpsc::channel(32);
    let db = self.db.clone();
    let filter = req.into_inner();

    tokio::spawn(async move {
        let mut last_key = None;
        loop {
            let (items, next_key) = db.scan_tasks(&filter, last_key).await?;
            for item in items {
                if tx.send(Ok(item)).await.is_err() { return Ok(()); } // client disconnected
            }
            match next_key {
                Some(k) => last_key = Some(k),
                None    => return Ok(()),
            }
        }
    });

    Ok(Response::new(ReceiverStream::new(rx)))
}
← Week 2: Implementation

Error Code Mapping

impl From<AppError> for Status {
    fn from(e: AppError) -> Self {
        match e {
            AppError::NotFound(id)      => Status::not_found(id),
            AppError::AlreadyExists(id) => Status::already_exists(id),
            AppError::InvalidArgument(m)=> Status::invalid_argument(m),
            AppError::Throttled         => Status::resource_exhausted("throttled"),
            AppError::Internal(e)       => {
                tracing::error!(error=%e, "internal error");
                Status::internal("internal error")
            }
        }
    }
}
← Week 2: Implementation

Key Takeaways

  • mTLS with client_ca_root enables mutual authentication — only trusted clients can connect
  • tower::timeout applies a global deadline; each RPC can add a tighter deadline via interceptor
  • Server-streaming with mpsc::channel lets the handler page through DynamoDB lazily
  • Map domain errors to gRPC status codes at the boundary — never leak internal details in Status::internal

Tomorrow: DynamoDB state store — event writes, projections, and conditional updates.