Using select in Tokio¶
I have been working on a network probe utility, and I ran across an article on Firezone's blog, written by Thomas Eizinger, about using Rust with a "SansIO" pattern. You should pause reading this and go read that article first! If you are unfamiliar with async
in Rust, you should read this article by Hayden Stainsby. Even though my background has always landed me on some flavor of network team, most of my software practice has come from configuration management and automation. Think data models, templates, and awful, buggy APIs. I digress. My team has had a need for better network monitoring, and off-the-shelf tools just don't quite tick all of the boxes of our want/need list. We finally decided to build our own Prometheus exporter. The first iteration uses ICMP over IPv4 and IPv6. How hard could it be?!
Requirements¶
I work on a small sub-team that is project focused. We don't have a ton of bandwidth for tool development and maintenance, but our lack of visibility into certain events has been a persistent thorn in our side. Traditionally, we are a Go/Python shop. I chose to use Rust for a few reasons. It is a systems language, plays nicely with C, no garbage collection, and the rich type system makes for a pretty tight end product. I need this tool to be small, fast, and predictable. In hindsight, I misjudged the complexity of the project, but I'll trade late delivery for some very valuable lessons learned. Where do you think blog articles come from? 😀
Select¶
select
is a mechanism to choose between jobs that might be in a blocking state. I am no historian, but I believe it has its roots in the C implementation which has been supplanted by poll and epoll. There is a flavor of it in Python and other languages. This mechanism is often on the critical path of an event loop such as the one in the SansIO article mentioned earlier. Let's take a look at a small example from the Tokio docs:
tokio::select! {
_ = do_stuff_async() => {
println!("do_stuff_async() completed first")
}
_ = more_async_work() => {
println!("more_async_work() completed first")
}
};
At first glance, this is a pretty simple idea. As you can imagine, the devil is in the details! select
waits for a collection of async tasks and executes the right side (=>
) block of the task that "wins". If two tasks complete at the same time, one is chose by random (although you can override this behavior). There are two things you need to consider next:
- What happens to the "losing" task?
- What could go wrong with
more_async_work
?
The losing task¶
The short answer is that it gets cancelled. Look here for a list of cancellation-safe functions. This is the first foot wound I created for myself. If the left-side task (e.g. do_stuff_async
) produces a value, that value can be dropped via cancellation. What does this mean, exactly? I think one of the misconceptions on the learning curve of async
is that it is not magic! There is a big emphasis on the cooperative part of cooperative concurrency. async
functions block the current thread until you tell them to yield. And how do we yield in Rust? By await
ing! Consider this block (run it in the Rust playground):
use log::info;
mod runner {
use log::info;
#[derive(Debug)]
pub struct DropJob {
pub val: u32,
}
impl Drop for DropJob {
fn drop(&mut self) {
info!("dropping DropJob: {}!", self.val);
}
}
pub async fn job(val: u32) -> DropJob {
info!("running job with val {}", val);
let f = tokio::spawn(async move { DropJob { val } }).await;
f.unwrap()
}
}
#[tokio::main]
async fn main() {
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
println!("running jobs...");
for _ in 0..10 {
tokio::select! {
v1 = runner::job(11) => {
info!("finished job 11 with result {v1:?}!");
},
v2 = runner::job(42) => {
info!("finished job 42 with result {v2:?}!");
}
}
}
}
What if rv
is a value read from a stream or socket? We are in the danger zone because this value can be dropped by cancellation!
[2025-03-25T18:57:37Z INFO playground::runner] running job with val 42
[2025-03-25T18:57:37Z INFO playground::runner] running job with val 11
[2025-03-25T18:57:37Z INFO playground::runner] dropping DropJob: 42!
[2025-03-25T18:57:37Z INFO playground] finished job 11 with result DropJob { val: 11 }!
[2025-03-25T18:57:37Z INFO playground::runner] dropping DropJob: 11!
Both jobs get run, but 42
gets lost to the bit bucket. Job 11
runs and gets processed before going out of scope at the end of the select
block.
Sidebar: When I first ran into this issue, I was really puzzled and confused. A quick internet search will quickly reveal that I am not the only one suffering from that problem. There are a lot of debates about the best way to design concurrent programming languages, and tons of controversy over async
in Rust. I must admit that this situation isn't markedly different than the same debate happening in the Python community and frustrations experienced by newcomers there. You can read about the "function coloring problem". I am not a language designer. Like a lot of other skills learned in life, frustration and head-scratching is part of the process. If it was easy to form these mental models, more people would probably pursue programming (and mathematics!).
On guard!¶
How do we guard our return value so it doesn't return to the universe as lost entropy? The same way we tell the compiler that we are at a "stopping point" in our function, with await
. If your function is performing other asynchronous actions, return the result after the last await
statement. It is a bit difficult to describe, so an example is helpful. Another playground link. Let's consider this crazy-contrived example:
use log::info;
mod runner {
use log::info;
#[derive(Debug)]
pub struct DropJob {
pub val: u32,
}
pub struct DumbSocket {
in_q: Vec<DropJob>,
}
impl Drop for DropJob {
fn drop(&mut self) {
info!("dropping DropJob: {}!", self.val);
}
}
pub async fn job(val: u32) -> u32 {
info!("running job with val {}", val);
let f = tokio::spawn(async move {
tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
DropJob { val }
})
.await;
f.unwrap().val
}
impl DumbSocket {
pub fn new() -> Self {
DumbSocket { in_q: vec![] }
}
pub async fn read(&mut self) {
for i in (0..10).rev() {
self.in_q.push(DropJob { val: i });
}
}
pub async fn recv(&mut self) -> DropJob {
let _sentinel = DropJob { val: 4096 };
// move this after `pop` and watch disaster strike!
tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
if let Some(job) = self.in_q.pop() {
// fool the compiler into using our dummy variable
if _sentinel.val + job.val > 99999 {
info!("unreachable");
}
job
} else {
// queue is empty, sit and spin
futures::future::pending().await
}
}
}
}
#[tokio::main]
async fn main() {
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
println!("running jobs...");
let mut socket = runner::DumbSocket::new();
socket.read().await;
info!("read some values!");
let mut c = 0;
while c < 100 {
tokio::select! {
rv = socket.recv() => {
info!("received {}!", rv.val);
if rv.val == 9 {
break;
}
},
rv = runner::job(42) => {
info!("ran other job with val {rv}");
},
}
c += 1;
}
}
What problem am I trying to solve here, exactly??? Network programming, like so many other paradigms, is a stochastic process. It feels to me like programming Schrödinger's cat. Did the packet make it to the wire, across the wire, up the remote stack? Yes, we can abandon this whole SansIO idea and use independent threads for send and receive. But we traded that problem for state synchronization. If I am tracking read and write events, I have to write to a shared object. Enter mutexes and channels and synchronization... There is no free lunch here! You have to tackle the complexity somewhere, and the Rust compiler won't save you in this case.
In the snippet above, DumbSocket::recv
can get cancelled at the await
. The value gets popped off the queue after this, and the value will definitely make it back to the select
statement. Move the sleep down past the pop
and run this a few times. What happens? Convince yourself why the resulting output occurs and what is causing it. To the best of my knowledge, there are no lints that will help you identify these cases! The compiler will not help you!
What could go wrong with more work?¶
The other foot injury I caused myself was running more async work after select
-ing. The whole idea behind using select
is to keep the event loop progressing as quickly as possible. Once select
has chosen a winner, we are back to code that blocks the current thread. As a contrived example, consider this loop (playground):
loop {
tokio::select! {
v1 = runner::job(11) => {
info!("finished job 11 with result {v1:?}!");
info!("doing more work...");
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
},
v2 = runner::job(42) => {
info!("finished job 42 with result {v2:?}!");
info!("doing more work...");
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}
}
}
It will take about 1 second per iteration. Is this really what you want? That's up to you, but this highlights the point that async
isn't magic. Be mindful of blocking actions and structure your code accordingly.
Other gotchas¶
UDPSocket::recv
in Tokio claims to be cancellation-safe. If you are doing some low-level network shenanigans, you might also spot the wrapper function from_std
. Maybe that std
socket is sitting on top of some socket2
code, and ... the short version is that you just replaced a lot of internals that Tokio relies on for safety with things that weren't designed specifically for Tokio (or async
for that matter). It might be better to write your own Future
or figure out how to guard socket access. That is left for another post or an exercise for the reader.
Wrapping up¶
Younger me would have gone on a rant over this sort of thing. Some problems are just hard to solve, and I don't have a better answer for the language designers. If you have any feedback, please don't hesitate to drop me a line. I'm no expert; just another person on the internet trying to make stuff work. Happy coding!