There’s a fascinating Rust library, loom, which can be used to
thoroughly test lock-free data structures. I always wanted to learn how it works. I still do! But
recently I accidentally implemented a small toy which, I think, contains some of the loom’s ideas,
and it seems worthwhile to write about that. The goal here isn’t to teach you what you should be
using in practice (if you need that, go read loom’s docs), but rather to derive a couple of neat
ideas from first principles.
One, Two, Three, Two
As usual, we need the simplest possible model program to mess with. The example we use comes from
this excellent article.
Behold, a humble (and broken) concurrent counter:
use std::sync::atomic::{
AtomicU32,
Ordering::SeqCst,
};
#[derive(Default)]
pub struct Counter {
value: AtomicU32,
}
impl Counter {
pub fn increment(&self) {
let value=self.value.load(SeqCst);
self.value.store(value + 1, SeqCst);
}
pub fn get(&self) -> u32 {
self.value.load(SeqCst)
}
}
The bug is obvious here — the increment is not atomic. But what is the best test we can write to
expose it?
Trivial Test
The simplest idea that comes to mind is to just hammer the same counter from multiple threads and
check the result at the end;
#[test]
fn threaded_test() {
let counter=Counter::default();
let thread_count=100;
let increment_count=100;
std::thread::scope(|scope| {
for _ in 0..thread_count {
scope.spawn(|| {
for _ in 0..increment_count {
counter.increment()
}
});
}
});
assert_eq!(counter.get(), thread_count * increment_count);
}
This fails successfully:
thread ‘counter::trivial’ panicked:
assertion `left==right` failed
left: 9598
right: 10000
But I wouldn’t call this test satisfactory — it very much depends on the timing, so you can’t
reproduce it deterministically and you can’t debug it. You also can’t minimize it — if you reduce
the number of threads and increments, chances are the test passes by luck!
PBT
Of course the temptation is to apply property based testing here! The problem almost fits: we have
easy-to-generate input (the sequence of increments spread over several threads), a good property to
check (result of concurrent increments is identical to that of sequential execution) and the desire
to minimize the test.
But just how can we plug threads into a property-based test?
PBTs are great for testing state machines. You can run your state machine through a series of steps
where at each step a PBT selects an arbitrary next action to apply to the state:
#[test]
fn state_machine_test() {
arbtest::arbtest(|rng| {
let mut state: i32=0;
let step_count: usize=rng.int_in_range(0..=100)?;
for _ in 0..step_count {
match *rng.choose(&[“inc”, “dec”])? {
“inc”=> state +=1,
“dec”=> state -=1,
_=> unreachable!(),
}
}
Ok(())
});
}
And it feels like we should be able to apply the same technique here. At every iteration, pick a
random thread and make it do a single step. If you can step the threads manually, it should be easy
to maneuver one thread in between load&store of a different thread.
But we can’t step through threads! Or can we?
Simple Instrumentation
Ok, let’s fake it until we make it! Let’s take a look at the buggy increment method:
pub fn increment(&self) {
let value=self.value.load(SeqCst);
self.value.store(value + 1, SeqCst);
}
Ideally, we’d love to be able to somehow “pause” the thread in-between atomic operations. Something
like this:
pub fn increment(&self) {
pause();
let value=self.value.load(SeqCst);
pause();
self.value.store(value + 1, SeqCst);
pause();
}
fn pause() {
}
So let’s start with implementing our own wrapper for AtomicU32 which includes calls to pause.
use std::sync::atomic::Ordering;
struct AtomicU32 {
inner: std::sync::atomic::AtomicU32,
}
impl AtomicU32 {
pub fn load(&self, ordering: Ordering) -> u32 {
pause();
let result=self.inner.load(ordering);
pause();
result
}
pub fn store(&self, value: u32, ordering: Ordering) {
pause();
self.inner.store(value, ordering);
pause();
}
}
fn pause() {
}
Managed Threads API
One rule of a great API design is that you start by implement a single user of an API, to
understand how the API should feel, and only then proceed to the actual implementation.
So, in the spirit of faking, let’s just write a PBT using these pausable, managed threads, even if
we still have no idea how to actually implement pausing.
We start with creating a counter and two managed threads. And we probably want to pass a reference
to the counter to each of the threads:
let counter=Counter::default();
let t1=managed_thread::spawn(&counter);
let t2=managed_thread::spawn(&counter);
Now, we want to step through the threads:
while !rng.is_empty() {
let coin_flip: bool=rng.arbitrary()?;
if t1.is_paused() {
if coin_flip {
t1.unpause();
}
} else if t2.is_paused() {
if coin_flip {
t2.unpause();
}
}
}
Or, refactoring this a bit to semantically compress:
let counter=Counter::default();
let t1=managed_thread::spawn(&counter);
let t2=managed_thread::spawn(&counter);
let threads=[t1, t2];
while !rng.is_empty() {
for t in &mut threads {
if t.is_paused() && rng.arbitrary()? {
t.unpause()
}
}
}
That is, on each step of our state machine, we loop through all threads and unpause a random subset
of them.
But besides pausing and unpausing, we need our threads to actually do something, to increment the
counter. One idea is to mirror the std::spawn API and pass a closure in:
let t1=managed_thread::spawn({
let counter=&counter;
move || {
for _ in 0..100 {
counter.increment();
}
}
});
But as these are managed threads, and we want to control them from our tests, lets actually go all
the way there and give the controlling thread an ability to change the code running in a managed
thread. That is, we’ll start managed threads without a “main” function, and provide an API to
execute arbitrary closures in the context of this by-default inert thread (universal
server anyone?):
let counter=Counter::default();
let t=managed_thread::spawn(&counter);
t.submit(|thread_state: &Counter| thread_state.increment());
t.submit(|thread_state: &Counter| thread_state.increment());
Putting everything together, we get a nice-looking property test:
#[cfg(test)]
use managed_thread::AtomicU32;
#[cfg(not(test))]
use std::sync::atomic::AtomicU32;
#[derive(Default)]
pub struct Counter {
value: AtomicU32,
}
impl Counter {
}
#[test]
fn test_counter() {
arbtest::arbtest(|rng| {
let counter=Counter::default();
let counter_model: u32=0;
let t1=managed_thread::spawn(&counter);
let t2=managed_thread::spawn(&counter);
let threads=[t1, t2];
while !rng.is_empty() {
for t in &mut [t1, t2] {
if rng.arbitrary() {
if t.is_paused() {
t.unpause()
} else {
t.submit(|c| c.increment());
counter_model +=1;
}
}
}
}
for t in threads {
t.join();
}
assert_eq!(counter_model, counter.get());
Ok(())
});
}
Now, if only we could make this API work… Remember, our pause implementation is a shrug emoji!
At this point, you might be mightily annoyed at me for this rhetorical device where I pretend that I
don’t know the answer. No need for annoyance — when writing this code for the first time, I traced
exactly these steps — I realized that I need a “pausing AtomicU32” so I did that (with dummy
pause calls), then I played with the API I wanted to have, ending at roughly this spot, without
yet knowing how I would make it work or, indeed, if it is possible at all.
Well, if I am being honest, there is a bit of up-front knowledge here. I don’t think we can avoid
spawning real threads here, unless we do something really cursed with inline assembly. When
something calls that pause() function, and we want it to stay paused until further notice, that
just has to happen in a thread which maintains a stack separate from the stack of our test. And, if
we are going to spawn threads, we might as well spawn scoped threads, so that we can freely borrow
stack-local data. And to spawn a scope thread, you need a
Scope parameter. So in reality
we’ll need one more level of indentation here:
std::thread::scope(|scope| {
let t1=managed_thread::spawn(scope, &counter);
let t2=managed_thread::spawn(scope, &counter);
let threads=[t1, t2];
while !rng.is_empty() {
for t in &mut [t1, t2] {
}
}
});
Managed Threads Implementation
Now, the fun part: how the heck are we going to make pausing and unpausing work? For starters, there
clearly needs to be some communication between the main thread (t.unpause()) and the managed
thread (pause()). And, because we don’t want to change Counter API to thread some kind of
test-only context, the context needs to be smuggled. So thread_local! it is. And this context
is going to be shared between two threads, so it must be wrapped in an Arc.
struct SharedContext {
}
thread_local! {
static INSTANCE: RefCell(
scope: &Scope,
state: T,
) {
}
There’s a bunch of stuff that needs to happen here:
As we have established, we are going to spawn a (scoped) thread, so we need the scope parameter
with its three lifetimes. I don’t know how it works, so I am just going by the docs here!
We are going to return some kind of handle, which we can use to pause and unpause our managed
thread. And that handle is going to be parametrized over the same ‘scope lifetime, because it’ll
hold onto the actual join handle.
We are going to pass the generic state to our new thread, and that state needs to be Send, and
bounded by the same lifetime as our scoped thread.
Inside, we are going to spawn a thread for sure, and we’ll need to setup the INSTANCE thread
local on that thread.
And it would actually be a good idea to stuff a reference to that SharedContext into the handle
we return.
A bunch of stuff, in other words. Let’s do it:
struct ManagedHandle,
ctx: Arc,
}
fn spawn(
scope: &’scope Scope,
state: T,
) -> ManagedHandle {
pub fn is_paused(&self,) -> bool {
let guard=self.ctx.state.lock().unwrap();
*guard==State::Paused
}
}
For unpause, we should additionally flip the state back to Running and notify the other thread:
impl ManagedHandle {
pub fn unpause(&self) {
let mut guard=self.ctx.state.lock().unwrap();
assert_eq!(*guard, State::Paused);
*guard=State::Running;
self.ctx.cv.notify_all();
guard=self
.ctx
.cv
.wait_while(guard, |state| *state==State::Running)
.unwrap();
}
}
At this point we can spawn a managed thread, pause it and resume. But right now it doesn’t do
anything. Next step is implementing that idea where the controlling thread can directly send an
arbitrary closure to the managed one to make it do something:
impl {
pub fn submit(&self, f: F)
}
Let’s figure this FnSomething bound! We are going to yeet this f over to the managed thread and
run it there once, so it is FnOnce. It is crossing thread-boundary, so it needs to be + Send.
And, because we are using scoped threads, it doesn’t have to be ‘static, just ‘scope is
enough. Moreover, in that managed thread the f will have exclusive access to thread’s state, T.
So we have:
impl {
pub fn submit {
inner: std::thread::ScopedJoinHandle(
scope: &’scope Scope,
mut state: T,
) -> ManagedHandle {
pub fn submit ManagedHandle {
inner: std::thread::ScopedJoinHandle(
scope: &’scope Scope,
mut state: T,
) -> ManagedHandle {
pub fn is_paused(&self) -> bool {
let guard=self.ctx.state.lock().unwrap();
*guard==State::Paused
}
pub fn unpause(&self) {
let mut guard=self.ctx.state.lock().unwrap();
assert_eq!(*guard, State::Paused);
*guard=State::Running;
self.ctx.cv.notify_all();
guard=self
.ctx
.cv
.wait_while(guard, |state| *state==State::Running)
.unwrap();
}
pub fn submit u32 {
pause();
let result=self.inner.fetch_add(value, ordering);
pause();
result
}
}
impl Counter {
pub fn increment(&self) {
self.value.fetch_add(1, SeqCst);
}
}
… we can get a rather specific correctness statements out of our test, that any sequence of at
most five increments is correct:
$ t cargo t -r — exhaustytest –nocapture
running 1 test
all 81133 interleavings are fine!
test exhaustytest … ok
real 8.65s
cpu 8.16s (2.22s user + 5.94s sys)
rss 63.91mb
And the last small thing. Recall that our PBT minimized the first sequence it found …:
begin trace
0: increment
1: increment
0: unpause
1: unpause
1: unpause
0: unpause
0: unpause
1: unpause
0: unpause
0: increment
1: unpause
0: unpause
1: increment
0: unpause
0: unpause
1: unpause
0: unpause
thread ‘test_counter’ panicked at src/lib.rs:56:7:
assertion `left==right` failed
left: 4
right: 3
arbtest failed!
Seed: 0x4fd7ddff00000020
… down to just
begin trace
0: increment
1: increment
0: unpause
1: unpause
thread ‘test_counter’ panicked at src/lib.rs:57:7:
assertion `left==right` failed
left: 2
right: 1
arbtest failed!
Seed: 0x9c2a13a600000001
But we never implemented shrinking! How is this possible? Well, strictly speaking, this is out of
scope for this post. And I’ve already described this
elsewhere. And, at 32k, this is the
third-longest post on this blog. And it’s 3AM here in Lisbon right now. But of course I’ll explain!
The trick is the simplified hypothesis
approach. The
arbtest PBT library we in this post is based on a
familiar interface of a PRNG:
arbtest::arbtest(|rng| {
let random_int: usize=rng.int_in_range(0..=100)?;
let random_bool: bool=rng.arbitrary()?;
Ok(())
});
But there’s a twist! This is a finite PRNG. So, if you ask it to flip a coin it can give you
heads. And next time it might give you tails. But if you continue asking it for more, at some point
it’ll give you Err(OutOfEntropy).
That’s why all these ? and the outer loop of
while !rng.is_empty() {.
In other words, as soon as the test runs out of entropy, it short-circuits and completes. And that
means that by reducing the amount of entropy available the test becomes shorter, and this works
irrespective of how complex is the logic inside the test!
And “entropy” is a big scary word here, what actually happens is that the PRNG is just an &mut
&[u8] inside. That is, a slice of random bytes, which is shortened every time you ask for a random
number. And the shorter the initial slice, the simpler the test gets. Minimization can be this
simple!
You can find source code for this article at
https://github.com/matklad/properly-concurrent
>>> Read full article>>>
Copyright for syndicated content belongs to the linked Source : Hacker News – https://matklad.github.io/2024/07/05/properly-testing-concurrent-data-structures.html
Speaker Mike Johnson Retracts GOP’s Potential Repeal of the CHIPS Act: What It Means for America’s Tech Future