← Week 1: Tokio Fundamentals

Day 4: Tokio Channels

Phase 2 · Jun 13, 2026

← Week 1: Tokio Fundamentals

Agenda (2–3 hours)

  • Read (45 min): tokio::sync module documentation for mpsc, oneshot, broadcast, watch
  • Study (45 min): Draw the ownership and lifetime model for each channel type; when does a sender/receiver block?
  • Practice (45 min): Build a pipeline: producer task → bounded mpsc channel → worker pool (5 tasks) → results channel → aggregator
  • Challenge (30 min): Implement a request-response multiplexer: single connection, multiple concurrent requesters, each with its own oneshot result channel
← Week 1: Tokio Fundamentals

Four Channel Types

Type Producers Consumers Use case
mpsc Many One Fan-in: many tasks → one consumer
oneshot One One Request-response: send one value
broadcast One Many Fan-out: one event → all subscribers
watch One writer Many readers Latest value, not history
← Week 1: Tokio Fundamentals

mpsc (Multi-Producer Single-Consumer)

let (tx, mut rx) = tokio::sync::mpsc::channel::<String>(32); // bounded

// Multiple senders (clone tx for each)
let tx2 = tx.clone();
tokio::spawn(async move { tx2.send("hello".to_string()).await.unwrap(); });
tokio::spawn(async move { tx.send("world".to_string()).await.unwrap(); });

// Single receiver
while let Some(msg) = rx.recv().await {
    println!("{}", msg);
}

Bounded channel provides backpressure: tx.send() awaits when the buffer is full, preventing unbounded memory growth.

← Week 1: Tokio Fundamentals

oneshot

let (tx, rx) = tokio::sync::oneshot::channel::<u64>();

tokio::spawn(async move {
    let result = compute_something().await;
    tx.send(result).unwrap();
});

let value = rx.await.unwrap();

The canonical pattern for request-response within the same process.

Combine with mpsc to build an actor: send (Request, oneshot::Sender<Response>) tuples to the actor task.

← Week 1: Tokio Fundamentals

broadcast and watch

// broadcast: all active receivers see every message
let (tx, mut rx1) = tokio::sync::broadcast::channel::<String>(16);
let mut rx2 = tx.subscribe();
tx.send("event".to_string()).unwrap();
// rx1.recv() and rx2.recv() both yield "event"
// Lagged receivers get RecvError::Lagged(n)

// watch: only the latest value matters
let (tx, rx) = tokio::sync::watch::channel("initial");
tx.send("updated").unwrap();
println!("{}", *rx.borrow()); // "updated"
// Missed intermediate values — only current state

watch is ideal for configuration updates, health status, or any "current state" notification.

← Week 1: Tokio Fundamentals

Key Takeaways

  • mpsc with a bound is the standard for fan-in pipelines; the bound enforces backpressure
  • oneshot is the request-response primitive; combine with mpsc for actor patterns
  • broadcast delivers to all current subscribers; watch delivers only the latest value
  • All four channel types are async-aware: they integrate with Tokio's task scheduler

Tomorrow: timers, select!, and cancellation.