On the road to realitγ.

January 30, 2019

MPI-Parallelization in Julia: A 1D shallow water example.

Domain decomposition 101: Shared work is half the work.

MPI-Parallelization in Julia: A 1D shallow water example.

To lazy to read? Then directly to

Intro: Parallel what? A quick motivation for parallel computing.

I always wanted to learn some parallel computing. What I mean is not just taking someone's parallelized code and then hurray the cluster is completely blocked and your colleagues start to write angry emails. No, I really wanted to get my hands dirty, writing the parallelization myself. But, you know, there is always more important things to do or you just have a lazy weekend while the cluster is working for you. Then there was Julia, and as weather and climate models heavily rely on MPI, I thought why not using MPI for Julia (instead of Julia's im-built parallelisation approaches). I came across an amazing blog post by Claudio Bellei, who parallelised a diffusion equation solver. However, and please don't judge me on that, I find diffusion problems quite boring. I want to have (at least a tiny bit of) action - so I decided to parallelise the non-linear shallow water equations. These equations also have advection, allow for waves to propagate and can be combined with diffusion too. To keep it simple I dropped 2D and considered 1D instead.

Some parallel code is fairly easy to understand: Say you want to analyse dataset X, which contains a large number of chunks and whatever you analyse in one chunk is completely independent from another. Some people call this embarassingly parallel problems. That's not what I was aiming for. Many people know the butterfly effect of chaotic systems such as weather: A little change on one side of the problem and eventually you get a huge change even on the other side of the globe. This means that weather forecasting is not an embarassingly parallel problem, as every computation will eventually influence every other computation. This influence takes places in terms of communication: information needs to exchanged between the chunks executed in parallel. Not just once, but again and again and again.

Okay, let's go. First a short idea what the shallow water model (and how simple it actually is in 1D); second the idea of domain decomposition; and third how to use MPI for the communication between the sub-domains.

Discretizing the physics: The 1D shallow water model

We will use finite-differences on a staggered grid, where the velocity points sit in between the surface elevation points - to interpolate from one to the other we use a simple 2-point average.

These are the two functions that act as the only two operators needed for the 1D shallow water model: A gradient in the only spatial dimension x and an interpolation from one grid to the other.

Don't worry about the @boundscheck and the @inbounds, these are Julia-details to make the code run faster. With these functions, the function rhs then computes the right-hand side of the partial differential equations. Again don't worry about the @views - this is another Julia macro that you can ignore. First we compute from the surface elevation η the layer thickness h, we square the velocity u for the Bernoulli potential and perform some interpolations to get the variables on the correct grids. We include the viscous term as a 1D "stress tensor" in the Bernoulli potential such that in the momentum equations only experience the tendency of this new potential plus the wind forcing

For the continuity equation we evaluate it's tendency straight forward including one interpolation and one gradient. So far so good, these three functions work for the whole domain or even sub-domains, as long as the boundary conditions are contained within additional ghost-points to the left and right - but we will come back to that in a second.

The time integration is done using Runge-Kutta 4th order in a low-memory version where tendencies get summed-up on the go. Once you've written your own RK4 time integrator, that should be pretty easy to understand. This version requires a few copying back and forth between 3 different version of the prognostic variables u,u0 and u1. Using the ghost-point approach where the boundary conditions are contained within additional grid-points just outside the domain, we only have to make sure that before every evaluation of the right-hand side this ghost-point copy got executed, such that the gradient & interpolation functions do the right thing at the boundary, we will come to the details of that ghost_point! function later.

Share the work - domain decomposition

The idea of domain decomposition (and there is many other approaches to parallelisation - often highly dependent on the discretisation method) is to split the whole domain into a few smaller ones, such that one processor doesn't have to calculate the whole thing but only a part of it. In 1D, we simply divide the number of grid cells by the number of processes.

We number the sub-domains by 0,1,2,3... and let every process (numbered by its rank) know what it's left and right neighbour is. This already includes the boundary conditions, which we pick to be periodic. So process with rank 0 has neighbour n-1 to the left and 1 to the right, etc.

Once the domain is split up onto different processes, we simply use the ghost-point copy on every time step to copy values not within one array, but across different array on different processes, let's unwrap this idea:

MPI: An interface for communication between processes.

Basically every MPI code starts with MPI.Init() and a communicator object MPI.COMM_WORLD. In our case, this just contains the information of many processes there are and gives every processes a rank, i.e. 0,1,2,3  etc. As with MPI the same code is executed by every processor, every processor will define the rank and size constant in line 185,186. "size" will be the same on each, but rank will obviously differ.

Before the time integration starts, we take the prognostic variables and pad them with zeros (simply as the ghost-point copy will be executed right thereafter). The ghost_points! function varies depending on whether the code is executed with a single process or in a parallel fashion:

(sequential) Due to the periodic boundary conditions, we take the 2nd entry of the array and copy it into the last and similarly the 2nd last replaces the first entry. This way, computing the gradient (or interpolation) on the boundary includes the correct information of the boundary condition. This is done for both prognostic variables.

(parallel) This is exactly the same except that the 2nd entry of the array is copied into the last of the left neighbour (and not into the very same array). So line 90 and 91, each process sends the 2nd / the 2nd last value (concatentated into an array with length two for both variables) to the respective left / right neighbour, via MPI.Isend. The third argument of that function is an identifier that tags the message you send (the array in the first argument) so that the receiving process (2nd argument) unambiguously can identify the arriving message. So it makes sense to tag this message somehow with the rank number, but as each process is sending and receiving two messages, to avoid overlap we simply offset the tags by 10 for all messages that go to the left and by 20 for all messages for messages that go to the right. But to be fair, that's a pretty random choice.

Once every process has send off these two messages to its neighbours, we will also receive the messages from the same neighbours, which happens in lines 94 and 95. To be precise, the function MPI.Irecv! only requests a message and already points its content to the variables in the first argument (hence the !-notation of Julia). The message will be received subsequently, but as many messages might be en route at the same time this does not happen immediately. Hence, the MPI.Irecv! function gives an object back that can be used with MPI.Waitall! to check that really all messages have been received and only then continue.

Then we simply copy the content of the received messages back into the arrays - et voila our ghost-point copy across processes succeeded!

And finally assembling the pieces

Great, just one last thing: How do we receive all the data from all processes and gather them on one, for output/plotting purposes? MPI.Gather requests the variable in the first argument from all processes. The second argument specifies which process is actually requesting. Here, we identify rank 0 with the main process and the communicator comm is passed on as always. The result of the MPI.Gather function will be stored on the receiving processes, but is essentially a long array that simply concatenates all arrays from all processes. Hence, process 0 will rashape this array in the desired shape (that we will use later for plotting).

Finally, MPI.Barrier and MPI.Finalize() ends the communication between the processes and we continue with the plotting of the data to make sure that actually did the right thing. And as you can see in the little gif above, yeha it worked! If I were a proper computer scientist I would probably need to finish here with a little speed comparison, some weak or strong scaling, but I really wanted to keep it simple. I learned something from this little project, and that was definitely the main goal.