← Week 2: Custom Binary Protocols

Day 9: tokio-util Codec

Phase 3 · Jul 9, 2026

← Week 2: Custom Binary Protocols

Agenda (2–3 hours)

  • Read (45 min): tokio_util::codec module documentation; LengthDelimitedCodec source
  • Study (45 min): How does FramedRead/FramedWrite buffer TCP segments into complete frames?
  • Practice (45 min): Implement a KvCodec using the protocol from yesterday; test with a loopback TCP pair
  • Challenge (30 min): Add compression: if the value > 256 bytes, compress with LZ4 and set a flag bit in the header
← Week 2: Custom Binary Protocols

Encoder and Decoder Traits

use tokio_util::codec::{Encoder, Decoder};
use bytes::{BytesMut, BufMut, Buf};

struct KvCodec;

impl Encoder<KvMessage> for KvCodec {
    type Error = std::io::Error;
    fn encode(&mut self, item: KvMessage, dst: &mut BytesMut) -> Result<(), Self::Error> {
        // Write 12-byte header + body into dst
        dst.put_u16(0xCAFE);
        dst.put_u8(1);   // version
        dst.put_u8(item.opcode as u8);
        dst.put_u16(item.flags);
        dst.put_u16(item.key.len() as u16);
        dst.put_u32(item.value.len() as u32);
        dst.put_slice(&item.key);
        dst.put_slice(&item.value);
        Ok(())
    }
}
← Week 2: Custom Binary Protocols

Decoder

impl Decoder for KvCodec {
    type Item = KvMessage;
    type Error = std::io::Error;

    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
        const HEADER: usize = 12;
        if src.len() < HEADER { return Ok(None); } // Not enough for header

        let key_len = u16::from_be_bytes([src[6], src[7]]) as usize;
        let value_len = u32::from_be_bytes([src[8], src[9], src[10], src[11]]) as usize;
        let total = HEADER + key_len + value_len;

        if src.len() < total { return Ok(None); } // Not enough for full message

        let buf = src.split_to(total);
        // Parse header fields from buf...
        Ok(Some(KvMessage { /* ... */ }))
    }
}
← Week 2: Custom Binary Protocols

FramedRead and FramedWrite

use tokio_util::codec::{FramedRead, FramedWrite};
use tokio_stream::StreamExt;

// Server side
let (reader, writer) = stream.into_split();
let mut framed_read = FramedRead::new(reader, KvCodec);
let mut framed_write = FramedWrite::new(writer, KvCodec);

// Receive complete messages
while let Some(msg) = framed_read.next().await {
    let msg = msg?;
    let response = handle(msg);
    framed_write.send(response).await?;
}

FramedRead wraps a AsyncRead and emits complete decoded items. No manual buffering.

← Week 2: Custom Binary Protocols

LengthDelimitedCodec

For the common case of just framing bytes:

use tokio_util::codec::LengthDelimitedCodec;

let codec = LengthDelimitedCodec::builder()
    .length_field_offset(0)
    .length_field_length(4)
    .length_adjustment(0)
    .num_skip(0)
    .new_codec();

Wrap with your own Encoder<MyType> / Decoder that works on the length-delimited byte frames.

← Week 2: Custom Binary Protocols

Key Takeaways

  • Decoder::decode returns None until a complete frame is available — handle partial reads by returning Ok(None)
  • FramedRead + FramedWrite eliminate manual buffering — they implement Stream and Sink
  • bytes::BytesMut is the standard buffer type for network I/O in the Rust ecosystem
  • LengthDelimitedCodec handles the framing; you handle the parsing on top

Tomorrow: request-response multiplexing — multiple concurrent RPCs over one connection.