Parallel computing MPI(Message Passing Interface)

Parallel computing MPI(Message Passing Interface)

MPI(Message Passing Interface) with python and mpi4py

Problem Statement:

To demonstrate the various types of parallel computing operations.

  • How can I use MPI to parallelize a Python code?

Category of the problem:

Technical : MPI Hands-On - mpi4py

Objectives:

Objectives

  • Learn how to prepare an environment that includes mpi4py.

  • Learn the basics of writing an MPI-parallelized code.

  • Explore point-to-point and collective MPI operations

  • To achieve parallelism with the help of various methods in Python.

  • To understand how multithreading and multiprocessing work in Python.

Tool used:

Mpi sdk , VsCode

Description:

• Comparing the processing time of various operations and understanding the basic use of MPI Libraries.

Environment setup:

The most annoying part of MPI is actually the setting up. We will be testing everything on our local machine but it is more useful in a completely distributed environment.

Global concept:

In an MPI environment, every computer has an ID. It helps to identify who you send a message to and who is receiving it. There is a master and the slaves. The master ID or rank is 0. He is responsible for orchestrating the global communication scenario. The other process has ranks beginning at 1.

There are some general classes, methods and constants that are almost present in every implementation of MPI. Those are:

  • COMM_WORLD: default communicator, It represents the global environment setup to exchange messages

  • Finalize(): Shutdown the MPI system

  • GetSize(): Return the number of processes in the group.

  • GetRank(): Get the ID or rank of the current process(between 0 to size-1)

Installation of the python MPI API (mpi4py)

To install the interface of MPI in python, we can use multiple ways: the most simple one is to install it using pip:

pip install mpi4py

To verify everything is ok, let’s write a hello world program.

#!/usr/bin/env python3

from mpi4py import MPI

comm = MPI.COMM_WORLD
worker = comm.Get_rank()
size = comm.Get_size()

print("Hello world from worker ", worker, " of ", size)

And the result is :

  • Hello world from worker 1 of 4

  • Hello world from worker 2 of 4

  • Hello world from worker 3 of 4

  • Hello world from worker 0 of 4

2 – Point to Point communication

In a communication process, we have at least two actors (even if it is possible to have one side of communication with MPI. We will come back to it later). In point-to-point communication, we have exactly two processes communicating together. MPI provides us with a really simple API to perform communication between two processes. In the actual case, we have one source and a destination for our message.

Here are the Communicator methods used in Point to Point communication:

  • send: This method is used by the sender to send a message to a specific process in the environment with the ID specified by the recvID parameter

  • receive: This message is a blocking method used by the receiver to wait for a message from a process specified by the SendID parameter

  • Let’s take a simple example:

    We have 5 computers and we want to create a pipeline where one computer computes something and gives the result to the next computer. So we start with 2 and each computer takes the number from computer rank-1, doubles it and sends it to the computer with rank+1.

    We must have this setup:

Example of point-to-point communication

Let’s code that:

#!/usr/bin/env python3

import time
from mpi4py import MPI

comm = MPI.COMM_WORLD
worker = comm.Get_rank()
size = comm.Get_size()

initial = 2
if worker == 0:
    print(size)
    comm.send(initial, dest=worker+1)
elif worker == size-1:
    rec = comm.recv(source=worker-1)
    print(rec*2)
else:
    rec = comm.recv(source=worker-1)
    comm.send(rec*2, dest=worker+1)
    print(worker, rec)

We have 8 computers and we want to create a data cleaning pipeline using all those computers. What we want is to retrieve data from Twitter chunk after chunk, give each computer a role in the processing chain and the last computer will save a perfectly cleaned and ready-to-use dataset for our ML or NLP task.

So we may have the following setup:

  • The first worker with rank 0 retrieves the data chunk after chunk from the source. And send each chunk to the next worker with rank 1 and the worker from rank 1 remove emojis, tokenize the text and send it to worker 2, worker 2 cleans the text by removing links, abbreviations, and numbers, … and sent it to worker 3, worker 3 remove stopwords (If you don’t know much about it don’t worry it is not important in this exercise) and send to worker 4, worker 4 open the file called cleandataset.txt and append the cleaned chunk to the file and then close it. this is not a technology made for pipeline creation but that example can help to show you the computer communication process

3 – Collective Communication

In collective communication, we have a group of workers who can either send or receive information from one worker or a group of workers. In this case, we have those methods:

Broadcasting:

This is used when we want to send the same message to every worker. For example, a learning rate for the computation of the gradient.

# Broadcast
from mpi4py import MPI

comm = MPI.COMM_WORLD
worker = comm.Get_rank()
size = comm.Get_size()

if worker == 0:
    data = {"data": "for everyone", "something": [1,2,3,4,5]}
else:
    data = None
data = comm.bcast(data, root=0)

print(worker, data)

Reduce:

Like the reduce function from programming languages, this reduces aggregate data from all processes. It is used in operations like finding the max in an array or sum in an array, … in our case, the receiver will receive the data from every worker and aggregate them using a predefined or custom operation. For example MPI.SUM, MPI.PROD, MPI.MAX, MPI.MIN, …

# Reduction
from mpi4py import MPI

def reduce_func(a,b):
    return a*b

comm = MPI.COMM_WORLD
size = comm.Get_size()
worker = comm.Get_rank()

data = comm.reduce(worker, op=reduce_func, root=0)

print(worker, worker)

if worker == 0:
    print("final result ",data)

Output (visualization):

1.Example :

Writing Hello World

We’ll start with the first example in mpi/example1, which is a simple Hello World code:

if __name__ == "__main__":

    print("Hello World!")
$ git clone git@github.com:MolSSI-Education/parallel-programming.git
$ cd parallel-programming/examples/mpi4py/example1
$ python example1.py

Acquire a copy of the example files for this lesson, and then run MPI Example 1:

# output
Hello World!

Getting Started with MPI:

MPI - mpiexec vs mpirun

MPI stands for ‘message passing interface’ and is a message passing standard which is designed to work on a variety of parallel computing architectures. The MPI standard defines how syntax and semantics of a library of routines. There are a number of implementations of this standard including OpenMPI, MPICH, and MS MPI.

Let’s try running this code on multiple processes. This is done using the mpiexec command. Many environments also provide an mpirun command, which usually - but not always - works the same way. Whenever possible, you should use mpiexec and not mpirun, in order to guarantee more consistent results.

2.Example :

For example, to launch example1.py on 4 processes, do:

from mpi4py import MPI

if __name__ == "__main__":

    world_comm = MPI.COMM_WORLD
    world_size = world_comm.Get_size()
    my_rank = world_comm.Get_rank()

    print("World Size: " + str(world_size) + "   " + "Rank: " + str(my_rank))

In the above code we first import mpi4py. Then, we get the communicator that spans all of the processes, which is called MPI.COMM_WORLD. The communicator’s Get_size() function tells us the total number of processes within that communicator. Each of these processes is assigned a uniqe rank, which is an integer that ranges from 0 to world_size - 1. The rank of a process allows it to be identified whenever processes communicate with one another. For example, in some cases we might want rank 2 to send some information to rank 4, or we might want rank 0 to receive information from all of the other processes. Calling world_comm.Get_rank() returns the rank of the process that called it within world_comm.

Go ahead and run the code now:

$ mpiexec -n 4 python example1.py

As you can see, the world_comm.Get_size() function returns 4, which is the total number of ranks we told mpiexec to run with (through the -n argument). Each of the processes is assigned a rank in the range of 0 to 3.

You can also try rerunning with a different value for the -n mpiexec argument. For example:

$ mpiexec -n 2 python example1.py

3.Example:

Case 1: Number and Size of Workers

from mpi4py import MPI

comm = MPI.COMM_WORLD
worker = comm.Get_rank()
size = comm.Get_size()

print("Hello world from worker ", worker, " of ", size)

# mpiexec -n 4 python example.py

$ mpiexec -n 10 python example.py

4.Example:

Point to Point Communication

Case 2 : Calculating the Speed of various operations on an array

import numpy as np
from mpi4py import MPI

if __name__ == "__main__":

    # get basic information about the MPI communicator
    world_comm = MPI.COMM_WORLD
    world_size = world_comm.Get_size()
    my_rank = world_comm.Get_rank()

    N = 10000000

    # initialize a
    start_time = MPI.Wtime()
    a = np.ones( N )
    end_time = MPI.Wtime()
    if my_rank == 0:
        print("Initialize a time: " + str(end_time-start_time))

    # initialize b
    start_time = MPI.Wtime()
    b = np.zeros( N )
    for i in range( N ):
        b[i] = 1.0 + i
    end_time = MPI.Wtime()
    if my_rank == 0:
        print("Initialize b time: " + str(end_time-start_time))

    # add the two arrays
    start_time = MPI.Wtime()
    for i in range( N ):
        a[i] = a[i] + b[i]
    end_time = MPI.Wtime()
    if my_rank == 0:
        print("Add arrays time: " + str(end_time-start_time))

    # average the result
    start_time = MPI.Wtime()
    sum = 0.0
    for i in range( N ):
        sum += a[i]
    average = sum / N
    end_time = MPI.Wtime()
    if my_rank == 0:
        print("Average result time: " + str(end_time-start_time))
        print("Average: " + str(average))

Considering the above cases, With MPI we can achieve much more things than what we have introduced earlier. There are concepts like One-side communications, Shared memory (interaction between process and threads), Parallel I/O, and much more that you can document yourself concerning MPI and its capabilities.