dpzmick.com

"Garbage Collection" for Rust Arc Pointers

If you don't know anything at all about realtime audio programming, you might want to read the first post in this pseudo-series, Audio Programming 101, or watch this talk from the Audio Developers Conference to get a little bit of background.

In short, there's a realtime thread that can never be blocked in any way. The realtime thread is responsible for sending all of the audio which an application will produce to an audio system, at exactly the right moments. If the realtime thread ever fails to generate the audio it needs to generate, bad things happen. That means locks, I/O, allocation are all off limits in the realtime thread.

Sending messages from non-realtime threads to the realtime thread is trickier than it might be in a "normal" application because we can't do these things. There are many, many techniques which can be used to work around this trickiness. This post is a discussion of one such method (presented in this cppcon talk) implemented in Rust.

Messaging between threads

Suppose we are developing a synthesizer which produces sounds when keys are pressed on a MIDI keyboard. The audio library calls a function we provide once ever 6 or so milliseconds to request a list of samples from us. The library calls our function with 2 arguments: 1) How many samples it wants 2) what key presses we need to handle. The callback function uses a precomputed list of samples to generate sounds every time it is called. To modify the properties of the sounds that are produced, the user edits settings with a user interface.

It would be painful (and incorrect) to attempt to handle UI events in the realtime thread, so we will run a UI thread to handle the UI events. Whenever the UI thread gets an event to handle, it needs to compute a new sample list, then send the list to the realtime thread.

Since we can't lock, let's use a queue to send some sort of message between threads. The queue that we choose needs to have a few properties:

  • Must be a lock free queue
  • Must be able to preallocate all of its nodes (cant't allocate or free memory for a node on a push or pull)

I want to place messages on the heap so that they do not need to be copied as we move them around. If messages lives on the heap, we must ensure they are allocated and freed outside of the realtime thread (we can't call allocation functions on the realtime thread).

Reference counted garbage collection

It is totally fine to allocate on the UI thread, so when the UI thread handles an event it will compute a new list of samples and stick them into a freshly allocated block of memory. Then we will ship this message over to the realtime thread.

When the realtime thread takes ownership of the message, it will need to hold onto the data for some undefined period of time. But, when the realtime thread is done with the message, it cannot free it (because we can't allocate or deallocatein the realtime thread).

To solve this, let's run one more thread to clean up messages which are no longer being used by the realtime thread.

Whenever the UI thread allocates space for a message using standard allocators, it will wrap the message in a reference-counted pointer. It then will let the collector thread know it should start keeping an eye on the reference-counted pointer. The collector will store the pointer in a list. When the reference count falls to 1, the collector is the only thread with a reference, and it can safely free the memory. The pointer is sent to the realtime thread, then, when the realtime thread drops the message, the reference count will drop. Sometime later, the collector thread will observe the decreased reference count and free the message.

Here is a slideshow/animation demonstrating this process.

Tradeoffs

Let's consider the theoretical behavior of this approach. Note that anything I have to say should be taken with grain of salt; I haven't benchmarked anything, so I really have no evidence to support anything I'm claiming.

First, let's talk about when we would not want to use this approach.

If the realtime thread always consumes new messages in a predictable amount of time, we can preallocate a certain number of messages and just keep reusing the same blocks of memory. When the UI needs to send a message it can grab one of the preallocated messages and use it. Some predictable amount of time later, when the realtime thread is done with it, the message can be returned to the pool (by the realtime thread).

This is also a bad idea if the UI thread generates messages significantly faster than the realtime thread consumes them. It might be fine for the realtime thread to lag behind the UI thread (if it eventually catches up), but the GC pointer list is going to get quite large. If we do our GC scan frequently, we will be using a lot of cpu time scanning this list. If we slow the collector down, the list is going to keep growing, and so will our memory usage. In other words, it's a sticky situation. A modern computer can probably handle this load, but we should avoid generating more load than necessary so that other audio applications running at the same time can use as much time as they need.

Finally, if the realtime thread needs to send a message to the UI thread, it can't just allocate memory and toss it at the GC thread for cleanup later. We could still use the GC+queue method discussed here to send messages to the realtime thread, but we probably only have time to build one good messaging system (we want to make audio, not send messages back and forth!)

If none of the above are true, a simple GC thread with some reference counted pointers might be a nice way to avoid adding lots of complexity to a small system. It also saves us from the need for a custom allocation mechanism, lets us send messages of various and dynamic sizes, and frees us from the burden of strict capacity constraints. So, if we don't need something more clever, maybe this is a good thing to try out.

Finally, since we are using reference counting to manage memory, there will be some runtime cost to increment and decrement the reference counts. This isn't a big deal for us, in this case, because the performance is predictable (we won't be suddenly surprised by the non-deterministic reference count incrementing).

There are many other variations of this technique (some which involve extra threads, some which don't, some which reuse freed memory, etc). Regardless of the actual efficacy of this approach, it will be interesting to try to build one in Rust, so let's get started.

Let's make one

For the sake of these examples, let's assume that the built-in Rust mpsc channel is an appropriate lock free queue. It will be pretty easy to swap this with something different later, and, if we use the standard library, all of the examples will easily run in the Rust playground. We are also going to fake a bunch of the details of the audio library.

Fake audio library

Rust Playground Link.

We don't need to walk through this code, it just makes some threads and calls some empty functions. The important bits are the RealtimeThread::realtime_callback function and the UIThread::run functions. In this example, the realtime callback function says "I'm done!" to let the realtime thread shutdown, and the UI thread does nothing at all.

Here's the code:

use std::thread;

#[derive(PartialEq)]
enum CallbackStatus {
    Continue,
    Shutdown,
}

// "library" code starts here
type Samples = [f32; 64];

fn run_threads(mut rt: RealtimeThread, mut ui: UIThread) {
    let join_handle = thread::spawn(move || {
        println!("[ui] thread started");
        ui.run();
        println!("[ui] thread shutting down");
    });

    println!("[realtime] thread started");
    let mut output = [0.0; 64];
    while rt.realtime_callback(&mut output) != CallbackStatus::Shutdown { }
    println!("[realtime] thread shutting down");

    join_handle.join().unwrap();
}
// end of "library" code

/// A struct containing the realtime callback and all data owned by the realtime thread
struct RealtimeThread {
    // some members here eventually
}

impl RealtimeThread {
    fn new() -> Self { RealtimeThread{} }

    /// realtime callback, called to get the list of samples
    fn realtime_callback(&mut self, output_samples: &mut Samples) -> CallbackStatus {
        CallbackStatus::Shutdown
    }
}

/// A struct which runs the UI thread and contains all of the data owned by the UI thread
struct UIThread {
    // some members here eventually
}

impl UIThread {
    fn new() -> Self { UIThread{} }

    /// All of the UI thread code
    fn run(&mut self) {
        // do nothing!
    }
}

fn main() {
    let rt = RealtimeThread::new();
    let ui = UIThread::new();
    run_threads(rt, ui);
}

Output (one of many possible):

[realtime] thread started
[realtime] thread shutting down
[ui] thread started
[ui] thread shutting down

Sending Arcs between threads

Now that we have an "audio library," let's try to make some messages and pass them between threads.

The RealtimeThread struct will need to hold on to a list of samples which it will use to populate the output samples every time the callback is called. We want these samples to be heap allocated and reference counted, so we wrap them in an Arc. Finally, we want to leave the samples uninitialized until the UI thread sends us some, so we wrap the Arc<Samples> in an Option.

struct RealtimeThread {
  current_samples: Option<Arc<Samples>>,
}

Now that the realtime thread has a list of samples, we can fill in a bit of the body of the realtime callback function:

fn realtime_callback(&mut self, output_samples: &mut Samples) -> CallbackStatus {
    self.current_samples.as_ref().map(|samples| {
        // samples: &Arc<[f32; 64]>
        output_samples.copy_from_slice(samples.as_ref())
    });

    CallbackStatus::Continue
}

The function copy_from_slice will memcpy the samples we are holding onto into the buffer provided by the audio library.

Moving over to the UI thread, first, we need to be able to compute a list of samples to compute. Here is a function that computes 64 samples along a sine wave with a given peak amplitude:

/// computes the samples needed for on cycle of a sine wave
/// the volume parameter sets the audible volume of sound produced
fn compute_samples(&self, volume: f32) -> Samples {
    assert!(volume >= 0.0);
    assert!(volume <= 1.0);

    // we need to populate 64 samples with 1 cycle of a sine wave (arbitrary choice)
    let constant_factor = (1.0/64.0) * 2.0 * f32::consts::PI;
    let mut samples = [0.0; 64];
    for i in 0..64 {
        samples[i] = (constant_factor * i as f32).sin() * volume;
    }

    samples
}

The UI thread will generate some fake events, and compute samples for these events:

/// All of the UI thread code
fn run(&mut self) {
    // create 5 "ui events"
    for i in 0..5 {
        let volume = i as f32 / 10.0;
        let samples = Arc::new(self.compute_samples(volume));

        // send the samples to the other thread
    }

    // tell the other thread to shutdown
}

Now that we've done all of that, we need to send the samples between threads.

Message type

As discussed previously, we will create the Arc on the UI thread, then send it to the realtime thread.

enum Message {
    NewSamples(Arc<Samples>),
    Shutdown,
}

Remember when I said that we would make a bunch of assumptions about the mpsc queues? Here's where I'm going to do that. We are going to assume that this queue follows all the properties we need a realtime queue to follow. For a quick reminder, those are:

  • No locks
  • No allocation (or deallocation) in the realtime thread.

To send messages between the threads, we will use mpcs::sync_channel to create a synchronous channel (queue). This channel is bounded, so a sender cannot add a new message to the queue unless there is currently space available. We are going to set the buffer size to zero. From the docs:

Note that a buffer size of 0 is valid, in which case this [channel] becomes "rendezvous channel" where each send will not return until a recv is paired with it.

This "channel" will have two ends; one which can send messages and one which can receive messages. Lets create both of them in the main method. The send side will be called tx (for transmit) and the receive side is called rx. Whenever a message is placed on tx it will become available on rx.

Then, we let each of our threads take ownership of the appropriate channel. We give rx to the RealtimeThread, because it will receive messages, and tx to the UIThread, because it will be sending them.

fn main() {
    let (tx, rx) = mpsc::sync_channel(0);
    let rt = RealtimeThread::new(rx);
    let ui = UIThread::new(tx);
    run_threads(rt, ui);
}

Then, modify both thread structs and both new functions.

struct RealtimeThread {
    current_samples: Option<Arc<Samples>>,
    incoming:        mpsc::Receiver<Message>,
}

// ...

struct UIThread {
    outgoing: mpsc::SyncSender<Message>,
}

// changes to new omitted

Now, let's get our threads sending messages, starting with the UI thread. If any sends fails, something has gone horribly wrong, so its fine to unwrap the result of these sends.

/// All of the UI thread code
fn run(&mut self) {
    // create 10 "ui events"
    for i in 0..10 {
        let volume = i as f32 / 10.0;
        let samples = Arc::new(self.compute_samples(volume));

        // send the samples to the other thread
        println!("[ui] sending new samples. Second sample: {}", samples[1]);
        self.outgoing.send(Message::NewSamples(samples)).unwrap();
    }

    // tell the other thread to shutdown
    self.outgoing.send(Message::Shutdown).unwrap();
}

In the realtime thread, we check if there is a new message on the queue. If there is, handle it. If not, just keep doing what we were doing.

/// realtime callback, called to get the list of samples
fn realtime_callback(&mut self, output_samples: &mut Samples) -> CallbackStatus {
    match self.incoming.try_recv() {
        // we've received a messaged
        Ok(message) => match message {
            Message::NewSamples(samples) => {
                println!("[realtime] received new samples. Second sample: {}", samples[1]);
                self.current_samples = Some(samples)
            },

            // If we got a shutdown message, shutdown the realtime thread
            Message::Shutdown => return CallbackStatus::Shutdown
        },

        // if we didn't receive anything, just keep sending samples
        Err(_) => ()
    }

    // copy our current samples into the output buffer
    self.current_samples.as_ref().map(|samples| {
        // samples: &Arc<[f32; 64>
        output_samples.copy_from_slice(samples.as_ref())
    });

    CallbackStatus::Continue
}

I've used a println! here only for the sake of demonstration. You shouldn't ever do this in real realtime code (because print statements usually allocate!)

Here is a link to this code in the Rust playground. It might timeout if you try running it. If you see any messages about timeout, don't worry, just try running the code again.

Here is an example output:

[realtime] thread started
[ui] thread started
[ui] sending new samples. Second sample: 0
[realtime] received new samples. Second sample: 0
[ui] sending new samples. Second sample: 0.009801715
[realtime] received new samples. Second sample: 0.009801715
[ui] sending new samples. Second sample: 0.01960343
[realtime] received new samples. Second sample: 0.01960343
[ui] sending new samples. Second sample: 0.029405143
[realtime] received new samples. Second sample: 0.029405143
[ui] sending new samples. Second sample: 0.03920686
[realtime] received new samples. Second sample: 0.03920686
[realtime] thread shutting down
[ui] thread shutting down

Problems?

The last example seems to do the right thing, let's take a look at what the realtime callback does when it receives a new set of samples.

// ...
Message::NewSamples(samples) => {
    self.current_samples = Some(samples)
},
// ...

What happens to the old array of samples? Rust will insert a call to drop here, because the old value has just gone out of scope. Something like this (in pseudo-Rust) sort of shows what is going on.

// ...
Message::NewSamples(samples) => {
    let tmp = Some(samples);
    mem::swap(self.current_samples, tmp);
    drop(tmp);
},
// ...

When an Arc gets drop=ped, what happens? Let's refer to the docs for =drop.

This will decrement the strong reference count. If the strong reference count becomes zero and the only other references are Weak<T> ones, drops the inner value.

In this case, the inner value is some heap allocated memory, so calling drop will deallocate that memory (since no one else is holding any references). This is a problem! We can't let our realtime callback perform memory allocation.

Build the GC

We now need to build the GC that I promised we would build, to clean up after us, outside of the realtime thread. Sneak peak, once the GC is implemented, all we have to change is UIThread::run, in a very small way:

/// All of the UI thread code
fn run(&mut self) {
    let mut gc = GC::new(); // + NEW LINE

    // create 5 "ui events"
    for i in 0..5 {
        let volume = i as f32 / 5.0;
        let samples = Arc::new(self.compute_samples(volume));
        self.collector.track(samples.clone()); // + NEW LINE

        // send the samples to the other thread
        println!("[ui] sending new samples. Second sample: {}", samples[1]);
        self.outgoing.send(Message::NewSamples(samples)).unwrap();
    }

    // tell the other thread to shutdown
    self.outgoing.send(Message::Shutdown).unwrap();
}

With that in mind, lets sketch out the interface for the Garbage Collector.

/// A garbage collector for Arc<T> pointers
struct GC<T> {
    // ...
}

impl<T> GC<T> {
    /// Construct a new garbage collector and start the collection thread
    fn new() -> Self {
        // ...
    }

    /// Instruct the garbage collector to monitor this Arc<T>
    /// When no references remain, the collector will `drop` the value
    fn track(&mut self, t: Arc<T>) {
        // ...
    }
}

First think about the track method. All this method needs to do is move it's argument into some list (vector) of pointers. We will keep this vector in the GC thread struct so that each of the references will live until the GC thread is shut down or until the GC drops them.

struct GC<T> {
    pool: Vec<Arc<T>>,
}

impl<T> GC<T> {
    // ...

    pub fn track(&mut self, t: Arc<T>) {
        self.pool.push(t);
    }
}

Now lets think about the garbage collection logic. Since we have a Vec<Arc<T>>, we will want to iterate over it, removing any elements which meet (or fail) a condition. We can use Vec::retain to do this. Something like the following might work:

pool.retain(|e| {
    if /* has more than one reference */ {
        return true
    } else {
        return false
    }
})

Looking at the Arc docs, there are a few ways we can figure out if the Arc has only one remaining reference:

  • Attempt to consume the Arc with Arc::try_unwrap, if this fails, we know that it has more than one reference. Unforunately, this method requires moving the Arc out of the vector, which is not ideal if we want to use Vec::retain.
  • Arc::strong_count - this is currently marked as unstable. Looks like what we might want to use though.
  • Arc::get_mut could possibly be used the same way we would use Arc::try_unwrap, without moving the Arc containing in the vector unless we want to remove it.

We don't have lots of options, so I'm going to go ahead and use Arc::strong_count. This is (for now) the most natural way to solve the problem:

pool.retain(|e: Arc<_>| {
    if Arc::strong_count(&e) {
        return true
    } else {
        return false
    }
})

Let's move on to new. The new method needs to start new thread which will run the pool.retain thing every once and a while. We also need to hold on to a thread handle so that we can eventually join the thread. The join handle is wrapped in an Option, we will see why quite a bit later.

/// A garbage collector for Arc<T> pointers
struct GC<T> {
    pool: Vec<Arc<T>>,
    thread: Option<thread::JoinHandle<()>>,
}

impl<T> GC<T> {
    // private. cleans up any dead pointers in a pool
    fn cleanup(pool: &mut Vec<Arc<T>>) {
        pool.retain(|e: &Arc<_>| {
            if Arc::strong_count(&e) > 1 {
                return true
            } else {
                return false
            }
        });
    }

    pub fn new() -> Self {
        let pool = Vec::new();

        // create a closure which will become a new thread
        let gc = || {
            loop {
                GC::cleanup(&mut pool);

                // wait for 100 milliseconds, then scan again
                let sleep = std::time::Duration::from_millis(100);
                thread::sleep(sleep);
            }
        };

        // spawns a new thread and returns a handle to the thread
        let gc_thread = thread::spawn(gc);

        GC {
            pool:   pool,
            thread: Some(gc_thread),
        }
    }

    pub fn track(&mut self, t: Arc<T>) {
        self.pool.push(t);
    }
}

fn main() {
    let (tx, rx) = mpsc::sync_channel(0);
    let rt = RealtimeThread::new(rx);
    let ui = UIThread::new(tx);
    run_threads(rt, ui);
}

We've written a bunch of new code, better make sure it compiles (Rust playground):

error[E0277]: the trait bound `T: std::marker::Send` is not satisfied
   --> <anon>:154:25
    |
154 |         let gc_thread = thread::spawn(gc);
    |                         ^^^^^^^^^^^^^ the trait `std::marker::Send` is not implemented for `T`
    |
    = help: consider adding a `where T: std::marker::Send` bound
    = note: required because of the requirements on the impl of `std::marker::Send` for `std::sync::Arc<T>`
    = note: required because of the requirements on the impl of `std::marker::Send` for `std::ptr::Unique<std::sync::Arc<T>>`
    = note: required because it appears within the type `alloc::raw_vec::RawVec<std::sync::Arc<T>>`
    = note: required because it appears within the type `std::vec::Vec<std::sync::Arc<T>>`
    = note: required because of the requirements on the impl of `std::marker::Send` for `&mut std::vec::Vec<std::sync::Arc<T>>`
    = note: required because it appears within the type `[closure@<anon>:143:18: 151:10 pool:&mut std::vec::Vec<std::sync::Arc<T>>]`
    = note: required by `std::thread::spawn`

error[E0277]: the trait bound `T: std::marker::Sync` is not satisfied
   --> <anon>:154:25
    |
154 |         let gc_thread = thread::spawn(gc);
    |                         ^^^^^^^^^^^^^ the trait `std::marker::Sync` is not implemented for `T`
    |
    = help: consider adding a `where T: std::marker::Sync` bound
    = note: required because of the requirements on the impl of `std::marker::Send` for `std::sync::Arc<T>`
    = note: required because of the requirements on the impl of `std::marker::Send` for `std::ptr::Unique<std::sync::Arc<T>>`
    = note: required because it appears within the type `alloc::raw_vec::RawVec<std::sync::Arc<T>>`
    = note: required because it appears within the type `std::vec::Vec<std::sync::Arc<T>>`
    = note: required because of the requirements on the impl of `std::marker::Send` for `&mut std::vec::Vec<std::sync::Arc<T>>`
    = note: required because it appears within the type `[closure@<anon>:143:18: 151:10 pool:&mut std::vec::Vec<std::sync::Arc<T>>]`
    = note: required by `std::thread::spawn`

error: aborting due to 2 previous errors

Oops, this isn't good. This error makes it feel sort of like Rust hates us, but the compiler is actually doing us a massive favor.

In Rust, there are a few thread safety "marker traits" called Send and Sync. The compiler is telling us that our generic type T doesn't implement either of them.

Put very loosely, if something implements Send, it is safe to send it between threads. Sync is considerably more subtle and quite difficult to wrap your head around, but we can sort of say that, if something implements Sync, we can access the same instance of it from multiple threads. For more info, you can read this blog post, but you shouldn't need any more than what I've given to get through the rest of my post.

So anyway, Rust is telling us that we have a thread safety problem, but we haven't guaranteed that we can safely copy and access values of our type T between the garbage collector thread and any other threads.

I know that T must be Send, because it has to be sent between threads, so let's go ahead and add that restriction:

/// A garbage collector for Arc<T> pointers
struct GC<T: Send> {
    pool: Vec<Arc<T>>,
    thread: Option<thread::JoinHandle<()>>,
}

impl<T: Send> GC<T> {
// ....

Rust playground link

Hoorary, the Send error is gone! Unfortunately, we still have the issue with Sync. Let's look more closely at the error we are getting:

error[E0277]: the trait bound `T: std::marker::Sync` is not satisfied
   --> <anon>:154:25
    |
154 |         let gc_thread = thread::spawn(gc);
    |                         ^^^^^^^^^^^^^ the trait `std::marker::Sync` is not implemented for `T`
    |
    = help: consider adding a `where T: std::marker::Sync` bound
    = note: required because of the requirements on the impl of `std::marker::Send` for `std::sync::Arc<T>`
    = note: required because of the requirements on the impl of `std::marker::Send` for `std::ptr::Unique<std::sync::Arc<T>>`
    = note: required because it appears within the type `alloc::raw_vec::RawVec<std::sync::Arc<T>>`
    = note: required because it appears within the type `std::vec::Vec<std::sync::Arc<T>>`
    = note: required because of the requirements on the impl of `std::marker::Send` for `&mut std::vec::Vec<std::sync::Arc<T>>`
    = note: required because it appears within the type `[closure@<anon>:143:18: 151:10 pool:&mut std::vec::Vec<std::sync::Arc<T>>]`
    = note: required by `std::thread::spawn`

error: aborting due to previous error

This error is really confusing, and my solution for it is not going to be much better, but stick with me.

The origin of this error is the Arc<T>. If we want an Arc<T> to implement Send, the T contained in it must implement BOTH Send and Sync. It makes sense that T would need to implement Send, but why does T need to be Sync? Basically, this is because the data the Arc<T> is holding will be shared by anyone who can access the Arc<T>. An Arc can be =clone=ed at any time, so, if we are allowed to pass it to other threads, it must also be safe for multiple threads to access the underlying data at the same time.

We could add the Sync constraint to our type T to resolve this problem, but does this really make any sense? Nowhere in our application will a message be accessible by more than one thread at a time.

When the UI thread creates a new message, it immediately surrenders all access to the underlying data, by moving the value into the channel. Once the realtime thread has the data, it will be the only thread that actually accesses the data until the data needs to be freed. The GC also is holding a reference to data, but it will never actually touch the data in any way, until it frees it. When the GC thread frees the memory holding the data, we know that there will be no other references to the memory in the program.

I might be wrong about this (please let me know if I am), but I think that we don't actually need the type T to be Sync. The compiler will never let us get away with this (because it doesn't know all of these properties) but we can let it know that it should trust us, with a new struct:

struct TrustMe<T> {
    pub inner: T
}

unsafe impl<T> Send for TrustMe<T> {}

This will tell the compiler "yes, this thing is Send", even when it actually isn't, so the implementation of the trait Send is unsafe.

Now, we can create a Send=able =TrustMe<Arc<T>>, and the compiler will trust us when we share these =Arc<T>=s between threads.

Now, lets add this to our GC:

/// A garbage collector for Arc<T> pointers
struct GC<T: Send> {
    pool: Vec<TrustMe<Arc<T>>>,
    thread: Option<thread::JoinHandle<()>>,
}

impl<T: Send> GC<T> {
    // private. cleans up any dead pointers in a pool
    fn cleanup(pool: &mut Vec<TrustMe<Arc<T>>>) {
        pool.retain(|e: &TrustMe<Arc<_>>| {
            if Arc::strong_count(&e.inner) > 1 {
                return true
            } else {
                return false
            }
        });
    }

    pub fn new() -> Self {
        let mut pool = Vec::new();

        // create a closure which will become a new thread
        let gc = || {
            loop {
                GC::cleanup(&mut pool);

                // wait for 100 milliseconds, then scan again
                let sleep = std::time::Duration::from_millis(100);
                thread::sleep(sleep);
            }
        };

        // spawns a new thread and returns a handle to the thread
        let gc_thread = thread::spawn(gc);

        GC {
            pool:   pool,
            thread: Some(gc_thread),
        }
    }

    pub fn track(&mut self, t: Arc<T>) {
        let t = TrustMe { inner: t };
        self.pool.push(t);
    }
}

Rust playground link

When we try to compile this, we get YET ANOTHER compiler error. This time, the compiler is whining at us with "the parameter type T may not live long enough". This error message is frustrating, but, we are using Rust because we want to be very careful with memory safety, so lets try to keep going.

The new thread that we have created could run until the termination of the program, so any data which the thread might be holding onto also must be able to live until the termination of the program.

The compiler is telling us that we need to add a "lifetime specifier" to our type T. In this case, it is telling us that the lifetime of any T which is managed by the GC must be 'static. The 'static lifetime indicates that values of type T + 'static might live for the entire duration of the program.

This might seem excessive, but, it is not possible for the compiler to determine when in the program our thread will terminate (if it could we would have solved the halting problem), so the maximum lifetime MUST potentially be the entire duration of the program. Note that, this doesn't mean that all the values stored in the GC will necessarily live for the entire lifetime of the program (if they did, we wouldn't be cleaning up garbage). This condition just means that they might live that long.

Anyway, we can now add the + 'static specifier the compiler has asked us to add, and try to compile this one more time.

/// A garbage collector for Arc<T> pointers
struct GC<T: Send + 'static> {

// ...

impl<T: Send + 'static> GC<T> {

// ...

GUESS WHAT IT DIDN'T WORK.

error[E0373]: closure may outlive the current function, but it borrows `pool`, which is owned by the current function
   --> <anon>:149:18
    |
149 |         let gc = || {
    |                  ^^ may outlive borrowed value `pool`
150 |             loop {
151 |                 GC::cleanup(&mut pool);
    |                                  ---- `pool` is borrowed here
    |
help: to force the closure to take ownership of `pool` (and any other referenced variables), use the `move` keyword, as shown:
    |         let gc = move || {

error: aborting due to previous error

Once again, this is a good thing, I promise! Now, the compiler is trying to tell us that the vector named pool is being accessed from two different places. The compiler wants us to have the new thread take ownership of the vector, but this highlights an interesting problem. We need to allow both the GC thread, and any other non-realtime thread, to access the vector, at the same time.

The compiler has prevented us from accessing the same data from multiple threads.

To solve this, we can just wrap the vector in a Mutex and an Arc. The Arc allows us to create one instance of the vector on the heap, and the Mutex makes sure that only one thread can access the heap allocated vector at any given time.

Here are most of the changes:

// introduce some news type aliases to make life a little bit easier
type TrustedArc<T> = TrustMe<Arc<T>>;
type ArcPool<T> = Vec<TrustedArc<T>>;

/// A garbage collector for Arc<T> pointers
struct GC<T: Send + 'static> {
    pool: Mutex<Arc<ArcPool<T>>>,
    thread: Option<thread::JoinHandle<()>>,
}

// ...

impl<T: Send + 'static> GC<T> {
    // ...
    pub fn new() -> Self {
        let pool = Arc::new(Mutex::new(Vec::new()));

        // create a copy of the pool. The GC thread will own this clone
        // and the reference count will be incremented by one
        let thread_arc_copy = pool.clone();

        // create a closure which will become a new thread
        let gc = move || {
            loop {
                // lock the mutex, then let go of it.
                // If we hold the mutex, the UI thread will be blocked every time it asks the
                // collector to track something.
                {
                    let mut pool = thread_arc_copy.lock().unwrap();
                    GC::cleanup(&mut pool);
                }

                // wait for a bit, then scan again
                let sleep = std::time::Duration::from_millis(5);
                thread::sleep(sleep);

            }
        };

        // ....
    }

    pub fn track(&mut self, t: Arc<T>) {
        let t = TrustMe { inner: t };
        let mut pool = self.pool.lock().unwrap();
        pool.push(t);
    }
}

We can finally compile this! Here's a link to the Rust playground. Note that you will need to make sure you compile with the "Nightly" channel.

There are only a few things left to do.

Start and Stop the GC

The GC thread that we have created will never terminate.

Ideally, when the GC goes out of scope, it will shut down the GC thread and clean up any tracked memory (if it can). Any Arc which can't be freed when the GC is shut down will not be freed, but (this is important) the reference count will drop by one. Now, if one of the previously tracked Arc s goes out of scope, it will be freed on whatever thread drops it (this could be the realtime thread!)

So, as long as the realtime thread keeps running, we must keep the GC thread running.

First, edit main:

fn main() {
    // start the collector
    let collector = GC::new();

    // create the channels
    let (tx, rx) = mpsc::sync_channel(0);

    // set up both of the threads
    let rt = RealtimeThread::new(rx);
    let ui = UIThread::new(tx);

    // start the threads
    run_threads(rt, ui);

    // GC thread will be shutdown here, where the GC goes out of scope
}

Then, edit the UIThread struct appropriately.

struct UIThread {
    outgoing: mpsc::SyncSender<Message>,
    collector: GC<Samples>
}

impl UIThread {
    fn new(outgoing: mpsc::SyncSender<Message>, collector: GC<Samples>) -> Self {
        UIThread { outgoing: outgoing, collector: collector }
    }

    // ...
}

Next, update the UIThread::run method:

/// All of the UI thread code
fn run(&mut self) {
    // create 5 "ui events"
    for i in 0..5 {
        let volume = i as f32 / 5.0;
        let samples = Arc::new(self.compute_samples(volume));

        // tell the GC thread to track our list of samples
        self.collector.track(samples.clone());

        // send the samples to the other thread
        println!("[ui] sending new samples. Second sample: {}", samples[1]);
        self.outgoing.send(Message::NewSamples(samples)).unwrap();
    }

    // tell the other thread to shutdown
    self.outgoing.send(Message::Shutdown).unwrap();
}

Drop the GC

Rust will make sure that Drop is called when the struct goes out of scope. This gives us a change to shut down the GC thread. We also set up a shared atomic boolean to indicate when the GC thread should shut down.

Here is most of that:

/// A garbage collector for Arc<T> pointers
struct GC<T: Send + 'static> {
    pool: Arc<Mutex<ArcPool<T>>>,
    thread: Option<thread::JoinHandle<()>>,
    running: Arc<AtomicBool>,
}

// initialize the running flag to false in GC::new

// ....

impl<T: Send + 'static> Drop for GC<T> {
    fn drop(&mut self) {
        self.running.store(false, Ordering::Relaxed);
        match self.thread.take() {
            Some(t) => t.join().unwrap(),
            None    => ()
        };
    }
}

And, here's the Rust playground link. You may have some trouble getting this to run (timeouts occur), but I promise it works sometimes.

Example output:

[realtime] thread started
[ui] thread started
[ui] sending new samples. Second sample: 0
[ui] sending new samples. Second sample: 0.01960343
[realtime] received new samples. Second sample: 0
[ui] sending new samples. Second sample: 0.03920686
[realtime] received new samples. Second sample: 0.01960343
[realtime] received new samples. Second sample: 0.03920686
[ui] thread shutting down
[realtime] thread shutting down

Proof

Let's add some logging so we can see when things are getting freed:

// private. cleans up any dead pointers in a pool
fn cleanup(pool: &mut Vec<TrustMe<Arc<T>>>) {
    pool.retain(|e: &TrustMe<Arc<_>>| {
        if Arc::strong_count(&e.inner) > 1 {
            return true
        } else {
            println!("[gc] dropping a value!");
            return false
        }
    });
}

The completed code lives at this Rust playground link.

Example Output:

[realtime] thread started
[ui] thread started
[ui] sending new samples. Second sample: 0
[realtime] received new samples. Second sample: 0
[ui] sending new samples. Second sample: 0.01960343
[realtime] received new samples. Second sample: 0.01960343
[gc] dropping a value!
[ui] sending new samples. Second sample: 0.03920686
[realtime] received new samples. Second sample: 0.03920686
[gc] dropping a value!
[ui] sending new samples. Second sample: 0.058810286
[realtime] received new samples. Second sample: 0.058810286
[gc] dropping a value!
[ui] sending new samples. Second sample: 0.07841372
[realtime] received new samples. Second sample: 0.07841372
[gc] dropping a value!
[ui] thread shutting down
[realtime] thread shutting down

Conclusion

We did it!

For me, this post exemplifies the reasons I am so excited about Rust. The realtime audio world places us into a world where many programming languages are simply not usable. Languages with runtimes that may behave unpredictably cannot meet the extremely strict requirements we must meet for correct realtime operation. Rust allows us to meet all of those requirements and gives us some nice abstractions.

On top of that, the Rust compiler meticulously checks for thread safety violations and memory safety violations. While writing this post, some of the issues the compiler threw at me ('static, for example), are issues I never considered. The compiler caught me and told me "no," so I had to think about what was actually going on.

These checks are absolutely irritating, and sometimes we might want to work around them (like we did with TrustMe). I'm glad to be exposed to potential issues, even if I have to work around the compiler sometimes (so far).

If you made it this far, thank you for reading. I hope you've learned something interesting (maybe even useful).

Discussion on reddit.

home switch-color-mode