ListTasks Streaming RPC
type ListTasksStream = ReceiverStream<Result<ListTasksResponse, Status>>;
async fn list_tasks(
&self,
req: Request<ListTasksRequest>,
) -> Result<Response<Self::ListTasksStream>, Status> {
let (tx, rx) = tokio::sync::mpsc::channel(32);
let db = self.db.clone();
let filter = req.into_inner();
tokio::spawn(async move {
let mut last_key = None;
loop {
let (items, next_key) = db.scan_tasks(&filter, last_key).await?;
for item in items {
if tx.send(Ok(item)).await.is_err() { return Ok(()); }
}
match next_key {
Some(k) => last_key = Some(k),
None => return Ok(()),
}
}
});
Ok(Response::new(ReceiverStream::new(rx)))
}