This lesson is in the early stages of development (Alpha version)

Higher levels of parallelism

Overview

Teaching: 30 min
Exercises: 15 min
Questions
• What were the key changes when using the multiprocessing library?

• How could this be implemented with dask?

• How does a conversion using dask high level API compare?

Objectives
• Use a high level description of your algorithm.

• Measure the runtime of the dask implementation.

Lola observes the code she has just written. She asks her room mate if she could review it. So both of them sit down in front of the screen and go through the code again.

#!/usr/bin/env python3
import sys

import numpy as np

from multiprocessing import cpu_count, Pool

np.random.seed(2017)

def inside_circle(total_count):

x = np.float32(np.random.uniform(size=total_count))
y = np.float32(np.random.uniform(size=total_count))

return count

def estimate_pi(n_samples,n_cores):

partitions = [ ]
for i in range(n_cores):
partitions.append(int(n_samples/n_cores))

pool = Pool(processes=n_cores)
counts=pool.map(inside_circle, partitions)

total_count = sum(partitions)
return (4.0 * sum(counts) / total_count)

if __name__=='__main__':

ncores = cpu_count()
n_samples = 10000
if len(sys.argv) > 1:
n_samples = int(sys.argv)

partitions = [ int(n_samples/ncores) for item in range(ncores)]

sizeof = np.dtype(np.float32).itemsize

my_pi = estimate_pi(n_samples,ncores)

print("[parallel version] required memory %.3f MB" % (n_samples*sizeof*3/(1024*1024)))
print("[using %3i cores ] pi is %f from %i samples" % (ncores,my_pi,n_samples))

Lola’s office mate observes, that:

• the libraries used are all contained in the python3 standard library (anybody else that wants to use this software only needs python3 and nothing else)

• Lola uses implicit parallelisation by using multiprocessing.Pool, i.e. she doesn’t need to manage her workers/threads/cores in any way

• it’s nice that Lola can reuse parts of her serial implementation

• doing the calculation of the partitions looks quite error prone

• larger portions of the code appear dependent on ncores

• using multiprocessing limits Lola to using only one machine at a time

Lola agrees to these observations and both argue that an alternative implementation using higher level abstractions of the underlying hardware might be a good idea.

Another day, Lola discovers a library named dask (see more details here) that not only promises high performance, but also appears to be on par with the numpy library, so that she has to apply only minimal changes to the code. This library can be installed on Lola’s cluster by

She now sets out to study the documentation of dask and comes up with the following code:

#!/usr/bin/env python3
import sys
import math
import numpy as np

np.random.seed(2017)
da.random.seed(2017)

def inside_circle(total_count, chunk_size = -1):
x = da.random.uniform(size=(total_count),
chunks=(chunk_size))

y = da.random.uniform(size=(total_count),
chunks=(chunk_size))

indices = np.array(filtered)

return count

def estimate_pi(total_count, chunk_size=-1):

count = inside_circle(total_count, chunk_size)
return (4.0 * count / total_count)

def main():
n_samples = 10000
if len(sys.argv) > 1:
n_samples = int(sys.argv)

chunksize = .1*n_samples
if len(sys.argv) > 2:
chunksize = int(sys.argv)

my_pi = estimate_pi(n_samples, chunksize)
sizeof = np.dtype(np.float32).itemsize
print("[parallel version] required memory %.3f MB" % (n_samples*sizeof*3/(1024*1024)))
print("[using dask lib  ] pi is %f from %i samples" % (my_pi,n_samples))

if __name__=='__main__':
main()

This implementation can now be put to work. At this point, a paradigm shift has been introduced silently. Lola’s office mate makes her aware of this. It is a subtle change compared to using the multiprocessing library, but it is there.

In this example, the containers for the random numbers have become smart. This is only visible by good measure of the chunks=-1 argument to the da.random.uniform function. A flat container used to just hold numbers in memory wouldn’t have to be responsible for the chunking of the data. But dask offers us a container that does so.

Behind the curtains, the dask framework connects containers (da.array here) with functions (operator*, operator+, da.sqrt, da.where). The framework then infers which functions can act on which data independently. From this, the dask library can complete the program to any degree of parallelism that is needed.

All this automation comes at a price. The dask implementation is about 2x slower than the pure multiprocessing one. But there must be something, Lola has gained. The answer will become evident, when we dive into more details of the dask ecosystem as dask is HPC-ready.

\$ pip3 install --user distributed bokeh

When consulting the dask.distributed documentation, Lola recognizes that she needs to adopt her code to work with dask.distributed.

#!/usr/bin/env python3
import sys
import math
import numpy as np

np.random.seed(2017)
da.random.seed(2017)

def inside_circle(total_count, chunk_size = -1):
x = da.random.uniform(size=(total_count),
chunks=(chunk_size))

y = da.random.uniform(size=(total_count),
chunks=(chunk_size))

indices = np.array(filtered)

return count

def estimate_pi(total_count, chunk_size=-1):

count = inside_circle(total_count, chunk_size)
return (4.0 * count / total_count)

def main():
n_samples = 10000
if len(sys.argv) > 1:
n_samples = int(sys.argv)

chunksize = .1*n_samples
if len(sys.argv) > 2:
chunksize = int(sys.argv)

client = Client("tcp://192.168.178.25:8786")
my_pi = estimate_pi(n_samples, chunksize)
sizeof = np.dtype(np.float32).itemsize
print("[parallel version] required memory %.3f MB" % (n_samples*sizeof*3/(1024*1024)))
print("[distributed dask] pi is %f from %i samples" % (my_pi,n_samples))

if __name__=='__main__':
main()

Following the advice from the dask documentation, she has to do some manual work first, before she can launch the dask processes.

First, she needs to start the dask-scheduler.

\$ dask-scheduler > scheduler.log 2>&1 &
distributed.scheduler - INFO - -----------------------------------------------
distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO -   Scheduler at: tcp://192.168.178.25:8786
distributed.scheduler - INFO -       bokeh at:                     :8787
distributed.scheduler - INFO - Local Directory: /tmp/user/1000/scheduler-k05cez
distributed.scheduler - INFO - -----------------------------------------------

Then, she starts one workers for testing:

\$ dask-worker 192.168.178.25:8786 > worker.log 2>&1 &

Lola notices how she has to connect the 2 processes by an IP address 192.168.178.25:8786. After doing all of this, she runs her script again:

Something has changed. She receives the result much quicker now. Is that reproducible? Lola measures and observes that the runtime of dask.distributed is very close to the runtime of the multiprocessing implementation.

By chance, she talks to her office mate about this. They discover that there is more waiting for them. She opens the URL at 192.168.178.25:8787 in her browser. Lola is sees an interesting dashboard: In an cluster environment, this is now a powerful feature. Scaling the application has just become manageable. So let’s get real and scale across multiple nodes on the cluster. For this, we start the central dask-scheduler on the login node. This is a process that only handles network traffic and hence should not (to be monitored) consume too many resources.

\$ dask-scheduler > scheduler.log 2>&1 &

Note, we are sending this process into the background immediately and route its output including all errors into scheduler.log. The output of this command should look like this (if not, there is a problem):

distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO -   Scheduler at:  tcp://1.1.1.42:8786
distributed.scheduler - INFO -       bokeh at:        1.1.1.42:8787
distributed.scheduler - INFO - Local Directory:    /tmp/scheduler-xp31e5sl
distributed.scheduler - INFO - -----------------------------------------------

Subsequently, we have to start a worker on a cluster node and connect it to the scheduler by means of the IP address:

\$ cat worker.sh
#!/bin/bash
#SBATCH --exclusive
#SBATCH -t 01:00:00
#SBATCH --exclusive

\$ sbatch -o worker1.log  worker.sh

Now, we have to update the address of the scheduler inside our dask python script:

client = Client("tcp://1.1.1.42:8786")

As Lola observes, all parts of this dask system are connected by a single point, i.e. the IP address of the dask-scheduler. Lola can now run her dask scripts from the node where the dask-scheduler was started.

She will notice that the dashboard at 1.1.1.42:8787 is now filled with moving boxes. Her application runs. But, how about adding another node?

\$ sbatch -o worker2.log  worker.sh

She is curious if the 2 workers can be used by her code.

Lola smiles while looking at the dashboard. This was after all very easy to setup. She has now reached a precision boundary that no other employee has reach for estimating pi.

Key Points

• The implementation using multiprocessing used python standard library components (very portable).

• The dask library offers parallelisation using constructs that are very numpy like.

• To port to dask only the import statements and the container construction needs to be changed.

• The advantage of these changes lie in the capability to scale the job to larger machines (test locally, scale globally).

• At the heart of the ease of use lie ‘standardized’ building blocks for algorithms using the map-reduce paradigm.

• Amdahl’s law still holds.