capypad
0 day streak
rust / intermediate
Snippet

Concurrent Programming with Channels

Rust's std::sync::mpsc channels enable safe thread communication. mpsc stands for multi-producer, single-consumer. The channel function creates a Sender and Receiver pair. send() transmits values and returns Result since the channel could be disconnected. recv() blocks until a message arrives. Multiple producers can send to the same channel, but only one consumer can receive. For multiple consumers, you would clone the receiver.

snippet.rs
rust
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
 
fn main() {
let (tx, rx) = mpsc::channel::<String>();
let (tx2, rx2) = mpsc::channel::<u32>();
 
let producer = thread::spawn(move || {
for i in 1..=5 {
let msg = format!("Message {}", i);
tx.send(msg).unwrap();
thread::sleep(Duration::from_millis(100));
}
});
 
let consumer = thread::spawn(move || {
for _ in 1..=5 {
if let Ok(msg) = rx2.recv_timeout(Duration::from_secs(1)) {
println!("Received number: {}", msg);
}
}
});
 
let _ = tx2.send(42);
drop(tx2);
 
producer.join().unwrap();
consumer.join().unwrap();
 
let (tx, rx) = mpsc::channel();
tx.send("Direct message").unwrap();
println!("Got: {}", rx.recv().unwrap());
}
Breakdown
1
let (tx, rx) = mpsc::channel::<String>();
Creates a typed channel; Sender (tx) and Receiver (rx)
2
tx.send(msg).unwrap();
send returns Result because receiver might be dropped
3
rx.recv_timeout(Duration::from_secs(1))
Waits up to 1 second for message; returns Timeout if none arrives
4
drop(tx2);
Closing sender signals end-of-stream to receiver