← Week 2: Custom Binary Protocols

Day 10: Request-Response Multiplexing

Phase 3 · Jul 10, 2026

← Week 2: Custom Binary Protocols

Agenda (2–3 hours)

  • Read (45 min): tower-h2 connection management internals; pipelined vs multiplexed protocols (HTTP/1.1 pipelining vs HTTP/2 streams)
  • Study (45 min): Design a correlation ID scheme; trace how a response is matched to the right caller
  • Practice (45 min): Extend the KV protocol with a request_id: u32 header field; implement a MultiplexedClient that matches responses to pending futures
  • Challenge (30 min): How does the h2 crate manage concurrent streams? What is stream-level flow control?
← Week 2: Custom Binary Protocols

The Multiplexing Problem

Single-connection request-response has a problem: each request must wait for the previous response.

HTTP/1.1: one request at a time per connection (pipelining helps but head-of-line blocking remains)
HTTP/2: each request is a stream; streams run concurrently on one TCP connection
Redis RESP: pipelining — send N requests, read N responses in order (ordered, not multiplexed)

For a custom binary protocol to support concurrency:

  1. Add a correlation ID (request ID) to each message
  2. Server echoes the ID in the response
  3. Client maps pending request_id → oneshot::Sender<Response>
← Week 2: Custom Binary Protocols

MultiplexedClient Architecture

Client thread:
  caller A ──→ (req, id=1, tx_a) ──→ outbound channel ──→ connection writer
  caller B ──→ (req, id=2, tx_b) ──→ outbound channel

Connection reader loop:
  reads response with id=1 → pending_map.remove(1) → tx_a.send(response)
  reads response with id=2 → pending_map.remove(2) → tx_b.send(response)
← Week 2: Custom Binary Protocols

Implementation

struct MultiplexedClient {
    outbound: mpsc::Sender<(KvMessage, oneshot::Sender<KvMessage>)>,
}

impl MultiplexedClient {
    async fn send(&self, mut req: KvMessage) -> Result<KvMessage, Error> {
        let (tx, rx) = oneshot::channel();
        req.request_id = self.next_id();
        self.outbound.send((req, tx)).await?;
        rx.await.map_err(|_| Error::ConnectionClosed)
    }
}

// Writer task
async fn writer_task(
    mut rx: mpsc::Receiver<(KvMessage, oneshot::Sender<KvMessage>)>,
    mut framed: FramedWrite<OwnedWriteHalf, KvCodec>,
    pending: Arc<DashMap<u32, oneshot::Sender<KvMessage>>>,
) {
    while let Some((msg, tx)) = rx.recv().await {
        pending.insert(msg.request_id, tx);
        framed.send(msg).await.unwrap();
    }
}
← Week 2: Custom Binary Protocols

Flow Control

Without flow control, a fast sender can overwhelm a slow receiver:

Application → [outbound channel: 64 msgs] → writer → TCP send buffer
                    ↑
            mpsc backpressure: send() awaits when channel is full

The channel bound IS the flow control mechanism. When the writer is slow (TCP send buffer full), the channel fills, and callers block at .send().

For finer-grained control per stream (like HTTP/2), track per-stream credits and only allow N outstanding requests.

← Week 2: Custom Binary Protocols

Key Takeaways

  • Correlation IDs enable concurrent requests over a single connection
  • Map request_id → oneshot::Sender in a shared DashMap (concurrent hash map)
  • Split connection into reader task + writer task; communicate via channel
  • Bounded channels provide natural backpressure; tune the bound to match expected burst size

Tomorrow: protocol versioning — making your protocol evolvable.