This lesson is in the early stages of development (Alpha version)

HPC Parallelisation For Novices

Bonus session: Distributing computations among computers

Overview

Teaching: 45 min
Exercises: 10 min
Questions
  • What is the message passing interface (MPI)?

  • How do I exploit parallelism using the message passing interface (MPI)?

Objectives
  • Explain how message passing allows performing computations in more than 1 computer at the same time.

  • Observe the effects of parallel execution of commands with a simple hostname call.

  • Measure the run time of parallel and MPI version of the implementation.

Lola Lazy is now confident enough to work with the batch system of the cluster. She now turns her attention to the problem at hand, i.e. estimating the value of Pi to very high precision.

One of her more experienced colleagues has suggested to her, to use the Message Passing Interface (in short: MPI) for that matter. As she has no prior knowledge in the field, accepting this advice is as good as trying some other technique on her own. She first explores the documentation of MPI a bit to get a feeling about the philosophy behind this approach.

Message Passing Interface

A long time before we had smart phones or tablets, compute clusters were already around and consisted of interconnected computers that had merely enough memory to show the first two frames of a movie (2*1920*1080*4 Bytes = 16 MB). However, scientific problems back than were equally demanding more and more memory than today. To overcome the lack of available hardware memory, specialists from academia and industry came about with the idea to consider the memory of several interconnected compute nodes as one. Given a standardized software that synchronizes the various states of memory between the client/slave nodes during the execution of driver application through the network interfaces. With this performing large calculations that required more memory than each individual cluster node can offer was possible. Moreover, this technique by passing messages (hence Message Passing Interface or MPI) on memory updates in a controlled fashion allowed to write parallel programs that were capable of running on a diverse set of cluster architectures.

Schematic View of a Compute Cluster with 4 nodes (12 cores each)

Lola becomes curious. She wants to experiment with this parallelization technique a bit. For this, she would like to print the name of the node where a specific driver application is run.

$ cat call_hostname.sh
#!/bin/bash

mpirun hostname
$ sbatch -n 4 -o call_hostname.out -e call_hostname.err < call_hostname.sh

The log file that is filled by the command above, contains the following lines after finishing the job:

n01
n01
n01
n01

The output makes her wonder. Apparently, the command was cloned and executed on the same host 4 times. If she increases the number of processors to a number larger than the number of CPU cores each of here nodes has, this might change and the distributed nature of mpirun will reveal itself.

$ cat call_hostname.sh
#!/bin/bash

mpirun hostname
$ sbatch -n 16 -o call_hostname.out -e call_hostname.err < call_hostname.sh
n01
n01
n01
n01
n01
n01
n01
n01
n01
n01
n01
n02
n01
n02
n02
n02

Execution of `mpirun hostname` on a Compute Cluster with 4 nodes (12 cores
each)

As the figure above shows, 12 instances of hostname were called on n01 and 4 more on n02. Strange though, that the last 5 lines are not ordered correctly. Upon showing this result to her colleague, the latter explains: even though, the hostname command is run in parallel across the 2 nodes that are used here, the output of her 16 hostname calls need to be merged into one output file (that she called call_hostname.out) at the end. This synchronization performed by the mpirun application is not guaranteed to happen in an ordered fashion (how could it be as the commands were issued in parallel). Her colleague explains, that the hostname application itself is not aware of MPI in a way that it is not parallelized with it. Thus, the mpirun driver simply accesses the nodes that it is allowed to run on by the batch system and launches the hostname application. After that, mpirun collects the output of the executed commands at completion and writes it to the defined output file call_hostname.out.

Like a reflex, Lola asks how to write these MPI programs. Her colleague points out that she needs to program the languages that MPI supports, such as FORTRAN, C, C++, Python and many more. As Lola is most confident with Python, her colleague wants to give her a head start using mpi4py and provides a minimal example. This example is analogous to what Lola just played with. This Python script called print_hostname.py prints the number of the current MPI rank (i.e. the unique id of the execution thread within one mpirun invocation), the total number of MPI ranks available and the hostname this rank is currently run on.

$ cat print_hostname.py
from mpi4py import MPI

def print_hostname():
    comm = MPI.COMM_WORLD
    rank = comm.Get_rank()
    size = comm.Get_size()

    hname = MPI.Get_processor_name()

    print("this is rank = %2i (total: %2i) running on %s" % (rank,size,hname))

    comm.Barrier()


if __name__ == '__main__':

    print_hostname()

$ cat submit_16_print_hostname.sh
#!/bin/bash

mpirun python3 print_hostname.py
$ sbatch -n 16 -o call_hostname.out -e call_hostname.err < submit_16_print_hostname.sh
this is 16/16 running on n02
this is 15/16 running on n02
this is 13/16 running on n02
this is 14/16 running on n02
this is  3/16 running on n01
this is  5/16 running on n01
this is 11/16 running on n01
this is  1/16 running on n01
this is  7/16 running on n01
this is  2/16 running on n01
this is  4/16 running on n01
this is  6/16 running on n01
this is  8/16 running on n01
this is  9/16 running on n01
this is 10/16 running on n01
this is 12/16 running on n01

Again, the unordered output is visible. Now, the relation between the rank and the parameters -n to submit command becomes more clear. -n defines how many processors the current invocation of mpirun requires. If -n 16 is defined, the rank can run from 0 to 15.

Does mpirun really execute commands in parallel?

Launch the command date 16 times across your cluster. What do you observe? Play around with the precision of date through its flags (+%N for example) and study the distribution of the results.

Upgrade print_hostname.py and print the time-of-day as well

Open the print_hostname.py script with your editor and use the python3 datetime module to print the time of day next to the host name and rank number.

To finalize this day’s work, Lola wants to tackle distributed memory parallelization using the Message Passing Interface (MPI). For this, she uses the mpi4py library that is pre-installed on her cluster. She again starts from the serial implementation. At first, she expands the include statements a bit.

from mpi4py import MPI

comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()

These 4 lines will be very instrumental throughout the entire MPI program. The entire MPI software stack builds upon the notion of a communicator. Here, we see the MPI.COMM_WORLD communicator by which all processes that are created talk to each other. We will use it as a hub to initiate communications among all participating processes. Subsequently, we ask comm how many participants are connected by calling comm.Get_size(). Then we’ll ask the communicator, what rank the current process is comm.Get_rank(). And with this, Lola has entered the dungeon of MPI.

Every Line Is Running in Parallel!

As discussed in the previous section, a call to ` mpirun

` will do the following: - `mpirun` will obtain a list of available nodes from the scheduler - mpirun will then `ssh` onto these nodes for you and instantiate a local mpirun there - this local mpirun will execute `` in parallel to all the others and call every line of it from top to bottom - only if your program reaches a statement of the form `comm.do_something(...)`, your program will start communicating through the mpi library with the other mpi processes; this communication can entail point-to-point data transfers or collective data transfers (that's why it's called 'message passing' because MPI does nothing else than provide mechanism to send messages around the cluster), depending on the type of communication, the MPI library might make your program wait until the all message passing has been completed In case you want to do something only on one rank specifically, you can do that by: ~~~~~ if rank == 0: print("Hello World") ~~~~~ {: .language-python }

Pushing the implementation further, the list of partitions needs to be established similar to what was done in the parallel implementation above. Also a list for the results is created and all items are initialized to 0.

if rank == 0:
    partitions = [ int(n_samples/size) for item in range(size)]
    counts = [ int(0) ] *size
else:
    partitions = None
    counts = None

In this example, you can see how the lists are only created on one rank for now (rank 0 to be precise). At this, point the contents of partitions and counts reside on rank 0 only. They now have to send to all other participating ranks.

partition_item = comm.scatter(partitions, root=0)
count_item = comm.scatter(counts, root=0)

Note how the input variable is partitions (aka a list of values) and the output variable is named partition_item. This is because, mpi4py returns only one item (namely the one item in partitions matching the rank of the current process, i.e. partitions[rank]) rather than the full list. Now, the actual work can be done.

count_item = inside_circle(partition_item)

This is the known function call from the serial implementation. After this, the results have to be communicated back again.

counts = comm.gather(count_item, root=0)

The logic from above is reverted now. A single item is used as input, aka count_item, and the result counts is a list again. In order to compute pi from this, the following operations should be restricted to rank=0 in order to minimize redundant operations:

if rank == 0:
    my_pi = 4.0 * sum(counts) / sum(partitions)

And that’s it. The complete script the looks like this:

#!/usr/bin/env python3
import sys
from mpi4py import MPI
import numpy as np

np.random.seed(2017)

comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()


def inside_circle(total_count):

    hname = MPI.Get_processor_name()

    x = np.float32(np.random.uniform(size=total_count))
    y = np.float32(np.random.uniform(size=total_count))

    radii = np.sqrt(x*x + y*y)

    count = len(radii[np.where(radii<=1.0)])

    return count

def estimate_pi(n_samples):

    counts = inside_circle(n_samples)
    return (4.0 * sum(counts) / total_count)

if __name__=='__main__':

    n_samples = 10000
    if len(sys.argv) > 1:
        n_samples = int(sys.argv[1])

    if rank == 0:
        partitions = [ int(n_samples/size) for item in range(size)]
        counts = [ int(0) ] *size
    else:
        partitions = None
        counts = None

    partition_item = comm.scatter(partitions, root=0)
    count_item = comm.scatter(counts, root=0)

    count_item = inside_circle(partition_item)

    counts = comm.gather(count_item, root=0)
    if rank == 0:
        my_pi = 4.0 * sum(counts) / sum(partitions)
        sizeof = np.dtype(np.float32).itemsize
        print("[     mpi version] required memory %.3f MB" % (n_samples*sizeof*3/(1024*1024)))
        print("[using %3i cores ] pi is %f from %i samples" % (size,my_pi,n_samples))

Now, Lola can submit her first MPI job.

$ cat submit_mpi_numpi.sh
#!/bin/bash

mpirun python3 mpi_numpi.py 1000000000
$ sbatch -n 48 -o mpi_numpi.out -e mpi_numpi.err < submit_mpi_numpi.sh

The output file mpi_numpi.out yields the following lines:

[     mpi version] required memory 11444.092 MB
[using  48 cores ] pi is 3.141679 from 1000000000 samples

real    0m6.546s
user    0m36.436s
sys     0m8.445s

Note here, that we are now free to scale this application to hundreds of cores if we wanted to. We are only restricted by the size of our compute cluster. Before finishing the day, Lola looks at the run time that her MPI job consumed. 6.5 seconds for a job that ran on four times as much cores as here parallel implementation before (which took 12s for the same configuration). That is quite an achievement of the day!

Use the batch system!

Launch the serial and parallel version of the pi_estimate using the batch system.

Don’t Stress the Network

The MPI implementation given above transmits only the pi estimate per rank to the main program. Rewrite the program so that each rank generates the random numbers and sends them back to rank 0.

Submit the job and look at the time it took. What do you observe? Why did the run time change?

Key Points

  • The MPI driver mpirun sends compute jobs to a set of allocated computers.

  • The MPI software then executes these jobs on the remote hosts and synchronizes their state/memory.

  • The print_hostname.py infers the hostname of the current machine. If run in parallel with mpirun, it prints several different host names.

  • MPI can be used to split the random sampling into components and have several nodes generate random numbers and report back only the pi estimate of this partition.