Parallel computing

On our computers, we usually have multiple cores which can work in parallel. In this section, we will use several threads. Each thread will analyze a subset of the input data, and we will later aggregate the results.

Here is how it will work for n threads:

  • The number of lines (let's call it l) will be divided into n chunks. The goal is to give l/n lines to each thread.
  • A MPSC (multiple producers single consumer) channel will be created.
  • Each thread will compute the statistics on the lines it got and will put the map containing the result on the channel.
  • The main thread will aggregate the various statistics into a single map and return it.

We will use scoped threads to alleviate lifetime constraints.

Creating the parallel version

Exercise 2.a: Create a function with the following signature which uses up to n threads to compute the number of occurrences of each characters:

fn count_chars_parallel<S: AsRef<str> + Sync>(input: &[S], n: usize) ->
    HashMap<char, usize>

You will notice that we have an extra constraint on S: since objects of type S will b referenced from other threads (the one we will be creating), it must be Sync in addition to being AsRef<str>.

Breakdown of the count_chars_parallel() function

This function must do the following things, in order.

First it must create a (tx, rx) pair corresponding to a MPSC channel.

It must then create a scope in which new threads will be spawned.

For each chunk of size n or less (for the last chunk):

  • tx must be cloned into a new variable that will be captured by the new thread;
  • a new thread should be spawned from the current scope, capturing tx and the current chunk.

This thread will compute the characters' occurrences (using count_chars()) and send the result on the MPSC channel using its captured tx clone.

It is now time to close the thread scope and to concentrate on receiving data from the spawned threads. Since we are after the thread::scope() call, at this stage we know that all analysis have been sent on the MPSC channel.

How does the reception part work?

  • rx.recv() will wait until data is available. As long as there is data available on the channel, it will return Ok(some_data).
  • If all senders are dead (tx and all its clones have been dropped) and all data has been read, rx.recv() will return Err(…).

At this stage, all threads have terminated their work. We can read data from the MPSC channel through rx: we will get a bunch of Ok(…) containing data from the threads, then Err(…) saying that everything has terminated. Right? No… We have to get rid of the tx variable which is still present. We have cloned it each time we have created a thread, but since tx still exists the use of the channel is never considered complete.

How do we get rid of tx? std::mem::drop(tx) is intended for that usage.

Now that we have done that, we can collect every map which comes from the MPSC channel. Since you already know how to do that, let us share the code to build the aggregated hash map and save some time:

    let mut freq = HashMap::default();
    while let Ok(fr) = rx.recv() {
        for (c, n) in fr {
            *freq.entry(c).or_default() += n;

Returning freq is the last thing to do.

Calling the parallel version

Exercise 2.b: Update your main() function so that it uses count_chars_parallel() with, for example, 4 threads, so that it processes Moby Dick text faster.