Skip to content

Parallel programming in Python: mpi4py (part 1)

In previous posts we have introduced the multiprocessing module which makes it possible to parallelize Python programs on shared memory systems. The limitation of the multiprocessing module is that it does not support parallelization over multiple compute nodes (i.e. on distributed memory systems). To overcome this limitation and enable cross-node parallelization, we can use MPI for Python, that is, the mpi4py module. This module provides an object-oriented interface that resembles the message passing interface (MPI),  and hence allows Python programs to exploit multiple processors on multiple compute nodes. The mpi4py module supports both point-to-point and collective communications for Python objects as well as buffer-like objects. This post will briefly introduce the use of the mpi4py module in communicating generic Python objects, via all-lowercase methods including send, recv, isend, irecv, bcast, scatter, gather, and reduce.

Basics of mpi4py

The mpi4py module has been installed on Beskow. To use mpi4py on Beskow, we need to load the module first

# on Beskow
module load mpi4py/3.0.2/py37

The mpi4py/3.0.2/py37 module will automatically load the anaconda/2019.03/py37 module, which is an open-source distribution of Python for scientific computing (see Anaconda). We recommend Python 3 because Python 2 is going to be retired soon. After loading the mpi4py module, we can check that the python3 command (which is needed to run  mpi4py code) is available in the PATH environment variable, using the which command

$ which python3
/pdc/vol/anaconda/2019.03/py37/bin/python3

Now we can write our first Python program with mpi4py

from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

print('Hello from process {} out of {}'.format(rank, size))

To understand the code we need to know several concepts in MPI. In parallel programming with MPI, we need the so-called communicator, which is a group of processes that can talk to each other. To identify the processes with that group, each process is assigned a rank that is unique within the communicator. It also makes sense to know the total number of processes, which is often referred to as the size of the communicator.

In the above code, we defined the variable comm as the default communicator MPI.COMM_WORLD. You may recognize that MPI.COMM_WORLD in mpi4py corresponds to MPI_COMM_WORLD in MPI programs written in C. The rank of each process is retrieved via the Get_rank method of the communicator, and the size of the communicator is obtained by the Get_size method. Now that the rank and size are known, we can print a “Hello” message from each process.

The usual way to execute an mpi4py code in parallel is to use mpirun and python3, for example “mpirun -n 4 python3 hello.py” will run the code on 4 processes, assuming that the code is saved in a file named “hello.py”. On Beskow, however, the setup is different since the resources (compute nodes) are managed by the SLURM workload manager. We’ll need to first request one or more compute nodes using salloc, and then execute “python3 hello.py” via srun. For example, the output from running the code on 4 processes is as follows:

$ salloc --nodes 1 -t 00:10:00 -A <your-project-account>
salloc: Granted job allocation <your-job-id>

$ srun -n 4 python3 hello.py
Hello from process 0 out of 4
Hello from process 1 out of 4
Hello from process 2 out of 4
Hello from process 3 out of 4

Point-to-point communication

For point-to-point communication between Python objects, mpi4py provides the send and recv methods that are similar to those in MPI. An example of code for passing (which is usually referred to as “communicating”) a Python dictionary object between the master process (which has rank = 0) and the worker processes (that all have rank > 0) is given below.

from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

# master process
if rank == 0:
    data = {'x': 1, 'y': 2.0}
    # master process sends data to worker processes by
    # going through the ranks of all worker processes
    for i in range(1, size):
        comm.send(data, dest=i, tag=i)
        print('Process {} sent data:'.format(rank), data)

# worker processes
else:
    # each worker process receives data from master process
    data = comm.recv(source=0, tag=rank)
    print('Process {} received data:'.format(rank), data)

In this example (which we will name “send-recv.py”), the dictionary object data is sent from the master process to each worker process. We check the value of rank to determine if a process is the master process. The for loop is only executed for the master process, and i (starting from 1) goes through the ranks of the worker processes. We can also use tag in the send and recv methods to distinguish between the messages if there are multiple communications between two processes. The output from running the example on 4 processes is as follows:

$ srun -n 4 python3 send-recv.py
Process 0 sent data: {'x': 1, 'y': 2.0}
Process 0 sent data: {'x': 1, 'y': 2.0}
Process 0 sent data: {'x': 1, 'y': 2.0}
Process 1 received data: {'x': 1, 'y': 2.0}
Process 2 received data: {'x': 1, 'y': 2.0}
Process 3 received data: {'x': 1, 'y': 2.0}

Note, however, that the above example uses blocking communication methods, which means that the execution of code will not proceed until the communication is completed. This blocking behaviour is not always desirable in parallel programming; in some cases it is beneficial to use non-blocking communication methods. As shown in the code below, isend and irecv are non-blocking methods that immediately return Request objects, and we can use the wait method to manage the completion of the communication. If necessary, one can do some computations between comm.isend(...) and req.wait() to increase the efficiency by overlapping the communications and computations.

if rank == 0:
    data {'x': 1, 'y': 2.0}
    for i in range(1, size):
        req = comm.isend(data, dest=i, tag=i)
        req.wait()
        print('Process {} sent data:'.format(rank), data)

else:
    req = comm.irecv(source=0, tag=rank)
    data = req.wait()
    print('Process {} received data:'.format(rank), data)

Collective communication

In parallel programming, it is often useful to perform what is known as collective communication, for example, broadcasting a Python object from the master process to all the worker processes. The example code below broadcasts a Numpy array using the bcast method.

from mpi4py import MPI
import numpy as np

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

if rank == 0:
    data = np.arange(4.0)
else:
    data = None

data = comm.bcast(data, root=0)

if rank == 0:
    print('Process {} broadcast data:'.format(rank), data)
else:
    print('Process {} received data:'.format(rank), data)

The output from running the above code (broadcast.py) on 4 processes is as follows:

$ srun -n 4 python3 broadcast.py
Process 0 broadcast data: [0. 1. 2. 3.]
Process 1 received data: [0. 1. 2. 3.]
Process 2 received data: [0. 1. 2. 3.]
Process 3 received data: [0. 1. 2. 3.]

If one needs to divide a task and distribute the sub-tasks to the processes, the scatter method will be useful. However note that it is not possible to distribute more elements than the number of processors. If one has a big list or array, it is necessary to make slices of the list or array before calling the scatter method. The code below is an example of scattering a Numpy array, which is converted into a list of array slices before the scatter method is called.

from mpi4py import MPI
import numpy as np

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
nprocs = comm.Get_size()

if rank == 0:
    data = np.arange(15.0)

    # determine the size of each sub-task
    ave, res = divmod(data.size, nprocs)
    counts = [ave + 1 if p < res else ave for p in range(nprocs)]

    # determine the starting and ending indices of each sub-task
    starts = [sum(counts[:p]) for p in range(nprocs)]
    ends = [sum(counts[:p+1]) for p in range(nprocs)]

    # converts data into a list of arrays 
    data = [data[starts[p]:ends[p]] for p in range(nprocs)]
else:
    data = None

data = comm.scatter(data, root=0)

print('Process {} has data:'.format(rank), data)

The output from running the above code (scatter-array.py) on 4 processes is as follows.

$ srun -n 4 python3 scatter-array.py
Process 0 has data: [0. 1. 2. 3.]
Process 1 has data: [4. 5. 6. 7.]
Process 2 has data: [8. 9. 10. 11.]
Process 3 has data: [12. 13. 14.]

Since there are 15 elements in total, we can see that each of the first 3 processes received 4 elements, and that the last process received 3 elements.

The gather method does the opposite to scatter. If each process has an element, one can use gather to collect them into a list of elements on the master process. The example code below uses scatter and gather to compute π in parallel.

from mpi4py import MPI
import time
import math

t0 = time.time()

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
nprocs = comm.Get_size()

# number of integration steps
nsteps = 10000000
# step size
dx = 1.0 / nsteps

if rank == 0:
    # determine the size of each sub-task
    ave, res = divmod(nsteps, nprocs)
    counts = [ave + 1 if p < res else ave for p in range(nprocs)]

    # determine the starting and ending indices of each sub-task
    starts = [sum(counts[:p]) for p in range(nprocs)]
    ends = [sum(counts[:p+1]) for p in range(nprocs)]

    # save the starting and ending indices in data  
    data = [(starts[p], ends[p]) for p in range(nprocs)]
else:
    data = None

data = comm.scatter(data, root=0)

# compute partial contribution to pi on each process
partial_pi = 0.0
for i in range(data[0], data[1]):
    x = (i + 0.5) * dx
    partial_pi += 4.0 / (1.0 + x * x)
partial_pi *= dx

partial_pi = comm.gather(partial_pi, root=0)

if rank == 0:
    print('pi computed in {:.3f} sec'.format(time.time() - t0))
    print('error is {}'.format(abs(sum(partial_pi) - math.pi)))

In addition to the gather method which collects the elements into a list, one can also use the reduce method to collect the results. The last five lines of the above example can be rewritten as follows, where “op=MPI.SUM” requires that pi is obtained as the sum of partial_pi from all processes.

pi = comm.reduce(partial_pi, op=MPI.SUM, root=0)

if rank == 0:
    print('pi computed in {:.3f} sec'.format(time.time() - t0))
    print('error is {}'.format(abs(pi - math.pi)))

Summary

We have briefly introduced the mpi4py module and the ways it enables communication of Python objects via all-lowercase methods.

  • Get_rank and Get_size provide the rank of the process and the size of the communicator (total number of processes), respectively.
  • send and recv are blocking point-to-point communication methods, while isend and irecv methods are non-blocking methods.
  • bcast, scatter, gather and reduce are collective communication methods.

You may read the MPI for Python tutorial page for more information about the mpi4py module.