Chat Blast! A TCP chat server in Rust
Hello there! I’ve been playing with Rust for years now, but I’ve never gotten around to using it asynchronously. I thought I should change this and write something simple yet useful enough for learning. A TCP chat server! First, let’s define the problem.
All the code for this can be found here.
What is a TCP chat server?
As a user, I want to connect to a remote chat room in my terminal (it’s the 80s) and send messages that anyone else who’s connected can read. This means we need a client to send those messages and a server to handle them. The first part is super easy; we’ll use Netcat. It’s probably already on your machine. This will let us focus on writing the server. We might want to write our own client if we want to get fancy with a Chat Blast UI. Let’s avoid that for now.
For development, the server will be running on 127.0.0.1 on an arbitrary unused
port, 4888. To connect with netcat we use nc localhost 4888
, which does absolutely
nothing right now since there is no server. How bout we fix that!
A rusty server
// main.rs
mod server;
#[tokio::main]
async fn main() {
let address = "127.0.0.1".to_string();
let port = "4888".to_string();
server::start(address, port).await;
}
It’s a good idea to keep the main file pretty clean. It’s responsible for handling
inputs and starting the program. When business logic lives elsewhere, it’s reusable
and easier to test. Speaking of reusablility, you’ll notice that I’ve hardcoded
the socket’s address. If you deploy this to production, you’ll probably want to change
that. The address is passed to a function that will start the server. That
function lives in another file named server.rs
. mod server
tells the
compiler to include that file during compilation; by default it won’t, since it
doesn’t like to compile code that’s not used.
Now’s when you ask, “WTH is tokio::main?”. Good question! I had to figure that out myself. Tokio is an async runtime for Rust. The async/await feature comes with Rust, but Tokio makes it easier to use by treating async functions like tasks that it schedules for you. All you need to do is wait for them to finish. Tasks are good to use when you’re waiting for a lot of IO operations. If you’re doing lots of CPU computation, multithreading might be a better approach.
#[tokio::main]
is an attribute-like procedural macro. It’s simply a convenient way to
set up the Tokio runtime by converting main
into this:
fn main() {
let mut rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
// your main code
})
}
Async functions return lazy Future
types that do nothing until await
is called
on them. You can only await a future while inside an async function, which is why
main is labelled as async. We’ve started the server, and now we’re waiting for it
to do something.
// server.rs
use tokio::net::{TcpListener, TcpStream};
pub async fn start(address: String, port: String) {
let location = format!("{}:{}", address, port);
let listener = TcpListener::bind(&location)
.await
.expect("Failed to bind to addr");
loop {
let (stream, addr) = listener.accept().await.unwrap();
tokio::spawn(async move {
handle_stream(stream, addr).await;
});
}
In the start
function, the given socket address is bound using the Tokio
TcpListener
. We then enter an infinite loop and listen for and accept incoming socket
connections, which blocks execution during this waiting period.
The server is officially on and waiting for clients to join.
$ nc localhost 4888
Look at that! Our first customer. When Netcat makes the TCP connection, our listener
will return a tuple that holds the socket stream and its address. Since we want
to handle a lot of connections at once (lot’s of people are going to be chatting, I can’t wait!),
we’ll create an async task for each one by giving an async block to tokio::spawn
.
Defining the block with move
means the stream and addr ownerships will be
passed into the block and will no longer be usable in this scope, which is fine.
Then we await the handle_stream
function.
async fn handle_stream(stream: TcpStream, addr: std::net::SocketAddr) {
let mut reader = BufReader::new(stream);
let mut buffer = String::new();
loop {
tokio::select! {
read_result = reader.read_line(&mut buffer) => {
// chat message received!
}
}
}
}
When a user types in a message and hits enter, those bytes are sent into the socket.
If you’ve never worked with sockets before, they’re treated just like files!
BufReader#read_line
will read bytes from the stream until a newline is found
and then put them in the buffer, a string in this case.
This is a blocking operation; if nothing is in the socket, the reader says,
“EVERYBODY SHHHHH! I’m listening…”.
And this is why we spawned a task. Let’s say we’re on a single thread and we have many tasks doing important things. If one of them stops to take a break, we don’t want to prevent the others from working, too. When a task blocks, on IO for example, Tokio will raise an eyebrow and put that task back in the box and switch execution over to the next available task. Eventually, when the first one is no longer blocked, it will continue where it left off. Tokio also does this using multithreading, passing tasks across threads, but that is an implementation detail.
tokio::select!
The tokio::select!
macro is the shining jewel that made this whole chat server
possible. It’s similar to the match
statement where branches are
matched to patterns. The branches in this select!
, however, are futures that are
awaited on. The first one that returns and matches its pattern is the branch that gets evaluated.
If the return value does not match the pattern, the select!
drops it and waits for another future.
Due to the infinite loop, after a line is read and that branch evaluated,
the process begins again and waits for more data to enter the stream.
We only have one branch currently, but this will be important when we add more functionality
later. First thing’s first though: we got a message from the stream! What do we
do with it?
Handle a message
We have two things to work with, the result of the reading and the message itself. If the read was successful, we need to broadcast the message to all the other streams that are open.
match read_result {
Ok(_bytes_read) => {
publisher
.send(Event(addr, message))
.expect("Error publishing message.");
}
Err(e) => {
println!("Error reading stream: {}", e);
}
}
Wait a minute! Where’d that publisher
come from? Has this post been proofread?
Look here, I was witholding information until it was needed. Let’s go back and
add the code to get a publisher.
// server.rs
use tokio::sync::broadcast;
pub async fn start(address: String, port: String) {
let (tx, _) = broadcast::channel(32);
loop {
// ...
let publisher = tx.clone();
let consumer = tx.subscribe();
tokio::spawn(async move {
handle_stream(stream, publisher, consumer, addr).await;
});
}
}
In the start
function, we create a Tokio broadcast channel.
You set the maximum number of values to be stored in the channel,
and you get a tuple containing a transmitter and receiver. If you know Go,
you’ll be familiar with the concept.
Calling send
on the transmitter puts a value in the channel, and
using recv
on the receiver gets the value out. This broadcast allows for a multiple-producer,
multiple consumer communication method. We can clone
and subscribe
the transmitter
to get new transmitters and receivers, respectively, and move them into the
spawned tasks on each iteration.
Consumption
The message has been sent to the channel, and all the other tasks need to pick it
up and write it to their streams. But how are they supposed to do that while they are
blocked waiting for a read? Back to tokio::select!
!
loop {
tokio::select! {
read_result = reader.read_line(&mut buffer) => {
// chat message received, publish it!
}
event = consumer.recv() => {
let Event(id, msg) = event.expect("Parsing event failed");
if id != addr {
let formatted_msg = format!("[{}]: {}", id, msg);
let _ = reader.write(formatted_msg.as_bytes()).await.expect("Broadcast write failed");
}
}
}
}
Now we can see the power. The select awaits on read_line
and recv
(they are both async).
Whichever one finishes first will have their branch executed (those variable patterns match anything).
When a message arrives on the channel, it will be written to the stream through the reader
(yeah I was confused to).
Then the loop will start over and wait for another read or write. Beautiful!
One more thing to note is how to avoid broadcasting a message to the same socket
that it was read from. We can do this by including a unique task identifier with the
message and avoid writing to that stream if it’s from the same task. That’s what the
addr
variable is used for (it comes from accept
remember).
#[derive(Debug, Clone)] // automatic trait implementation
struct Event(SocketAddr, Message); // tuple struct
// publish
publisher.send(Event(addr, message))
// consume
let Event(id, msg) = event.expect("Parsing event failed");
if id != addr {
// write to stream
}
Final Summation
Remember that concurrency is not parallelism. You can achieve high concurrency on a single core in one thread. Vanilla async Rust acts simiarly to coroutines in Python, goroutines in Go, and processes in Elixir (although they all differ in implementation). They are green threads, which means application code manages the execution contexts rather than the OS. This generally happens in one actual OS thread, but the scheduler may also use multiple OS threads as an implementation detail. Green threads are great for heavy IO use as opposed to CPU crunching.
My initial attempt at this server stayed in the standard lib by using
std::thread
to manage threads directly and std::sync::mpsc
to communicate between them.
I ran into pain when trying to figure out how to make a thread read data from the
socket AND from the channel without blocking either. MPSC
(multi-producer, single consumer) was also not
the paradigm I was going for, but it seemed to be the only channel structure offered by stdlib.
After reading through the Tokio tutorial,
I discovered it was built to handle everything I needed.
Sometimes it’s fun to know how stuff runs under the covers, and sometimes you just
want to get shit done. Tokio excels at that part.
It makes concurrency a lot more ergonomic, and I highly recommend it.