Parallélisme : travaux dirigés et pratiques

Table des matières

5. MPI

5.1. Introduction

MPI (The Message Passing Interface), conçue en 1993-94, est une norme (ou API - Application Programming Interface) définissant une bibliothèque de fonctions, utilisable avec les langages C, C++ et Fortran. Elle permet d'exploiter des ordinateurs distants ou multiprocesseurs par passage de messages (Wikipedia).

La technique de passage de message consiste à transmettre au travers du réseau les données à échanger. Initialement MPI a été conçu pour des systèmes à mémoire distribuée très populaires dans les années 80-90. MPI fut ensuite adapté pour les systèmes à mémoire partagée et fonctionne à présent pour des systèmes hybrides (distribué + partagé).

L'ensemble des fonctions peut être trouvé à cette adresse OpenMPI Doc.

MPI étant une API, il existe plusieurs implantations come MPICH ou OpenMPI (voir ce site pour un aperçu de l'architecture MPI).

5.2. Installation

Sous Ubuntu 24.04, il n'est plus possible d'utiliser la partie C++ de MPI. Deux possibilités s'offrent à vous :

5.2.1. Installation OpenMPI depuis le code source

Récupérez openmpi-4.1.5.tar.gz sur le site d'OpenMPI.

Décompressez, compilez le code source et installez les librairies :

tar -xvzf openmpi-4.1.5.tar.gz 
cd openmpi-4.1.5/
./configure --prefix=/usr/local/openmpi-4.1 --enable-mpi-cxx --disable-cuda
make -j4
sudo make install

Ici l'option --disable-cuda est censée supprimer la compilation de la partie CUDA mais semble ne pas fonctionner ou ne pas être reconnue en général.

Modifiez ensuite votre fichier .bashrc :

export PATH=/usr/local/openmpi-4.1/bin:\$PATH
export LD_LIBRARY_PATH=/usr/local/openmpi-4.1/lib:\$LD_LIBRARY_PATH

5.2.2. Utilisation de Docker

Commencez par installer Docker s'il n'est pas déjà installé :

sudo apt-get update
# ne pas oublier docker-buildx pour les versions récentes de docker
sudo apt-get install docker.io docker-buildx
sudo usermod -aG docker $USER
sudo systemctl start docker

Créez ensuite un répertoire dans lequel vous placez un fichier Dockerfile :

mkdir mpi
cd mpi
nano Dockerfile

Le fichier Dockerfile possède le contenu suivant. Il crée un utilisateur user et install OpenMPI ainsi que deux éditeurs en mode texte : nano et fte (commande sfte).

# Utiliser Ubuntu 20.04 comme image de base
FROM ubuntu:20.04

# Empêcher les invites interactives lors de l'installation des paquets
ARG DEBIAN_FRONTEND=noninteractive

# Mettre à jour le système et installer les dépendances nécessaires
# Utilisation de fte pour avoir l'éditeur de texte sfte plus sympathique
# que nano
RUN apt-get update && apt-get install -y \
    build-essential \
    openmpi-bin \
    openmpi-common \
    libopenmpi-dev \
    sudo \
    nano fte fte-console fte-terminal \
    && rm -rf /var/lib/apt/lists/*

# Créer un utilisateur non-root 'user'
RUN useradd -m -s /bin/bash user \
    && echo "user ALL=(ALL) NOPASSWD:ALL" >> /etc/sudoers

# Passer à l'utilisateur 'user'
USER user

# Créer un répertoire de travail pour les programmes
WORKDIR /home/user

# Configurer les variables d'environnement MPI
ENV PATH="/usr/lib64/openmpi/bin:\$PATH" \
    LD_LIBRARY_PATH="/usr/lib64/openmpi/lib:\$LD_LIBRARY_PATH"

# L'utilisateur 'user' peut maintenant compiler et exécuter des programmes MPI
# Vous pouvez ajouter vos fichiers ou configurations ici, par exemple :
# COPY . /home/user/

# Commande par défaut (exemple pour démarrer un shell interactif)
CMD ["/bin/bash"]

Compilez ensuite le Dockerfile afin de créer le container MPI :

sudo docker build -t mpi_cpp .
sudo docker run -it mpi_cpp

Ou bien si on veut mapper (mettre en correspondance) le répertoire /home/richer/exchange de l'utilisateur richer sur la machine hôte vers le répertoire /home/user/exchange du container :

docker run -it --mount type=bind,source=/home/richer/exchange,target=/home/user/exchange mpi_cpp

5.3. Utiliser MPI

Il est nécessaire de modifier substantiellement son programme si on désire le paralléliser avec MPI car on exécute le même programme sur plusieurs machines/coeurs différents.

MPI est disponible pour Fortran, C et C++ et également avec Python (pyMPI).

Pour pouvoir disposer de MPI sur sa machine il faut installer les librairies suivantes sous Ubuntu :

sudo apt-get install libopenmpi-dev openmpi-bin openmpi-doc

Pour C et C++ on inclura le fichier mpi.h

Pour débuguer il est recommandé d'installer MPE (Multi-Processing Environment), difficile à trouver, qui doit être compilé lors de l'installation de Python Anaconda. On réalisera alors l'édition de liens avec -llmpe -lmpe

5.4. Communicateur

Les opérations MPI portent sur des communicateurs qui sont en quelque sorte des ports de communication inter processus. Le communicateur par défaut est COMM_WORLD qui comprend tous les processus actifs.

On peut créer de nouveaux communicateurs mais cela se révèle complexe.

Le modèle de communication est un modèle point à point basé sur deux opérations élémentaires send et receive qui sont ensuite déclinés de plusieurs manières différentes.

Le schéma classique d'utilisation de MPI consiste à :

  • initialiser les ressources MPI_Init
  • obtenir le nombre de processus lancés MPI_Comm_size
  • obtenir l'identifiant du processus courant MPI_Comm_rank
  • exécuter le code séquentiel
  • exécuter le code parallèle
  • libérer les ressources MPI_Finalize

Voici deux exemples simples qui affichent un message avec un temps de latence (instruction sleep) pour chaque thread / processeur. Le premier utilise des fonctions C, le second utilise des espaces de noms et des objets

  1. // ==================================================================
  2. // Author: Jean-Michel Richer
  3. // Email: jean-michel.richer@univ-angers.fr
  4. // Date: Aug 2016
  5. // Purpose: Demonstrate basic functionnalities of MPI with C
  6. // ==================================================================
  7. #include <iostream>
  8. #include <cstdlib>
  9. #include <unistd.h> // for sleep
  10. using namespace std;
  11. #include <mpi.h>
  12.  
  13. // ==============================================================
  14. // C version
  15. // ==============================================================
  16.  
  17. int main(int argc, char ** argv) {
  18.     int max_cpus, cpu_rank;
  19.     char cpu_name[MPI_MAX_PROCESSOR_NAME];
  20.     int length;
  21.  
  22.     // allocate resources
  23.     MPI_Init(&argc, &argv);
  24.    
  25.     // get number of programs running
  26.     MPI_Comm_size(MPI_COMM_WORLD, &max_cpus);
  27.     // get program identifier
  28.     MPI_Comm_rank(MPI_COMM_WORLD, &cpu_rank);
  29.     // get cpu name
  30.     MPI_Get_processor_name(cpu_name, &length);
  31.  
  32.     sleep(cpu_rank);
  33.    
  34.     cerr << "running on " << cpu_name << " with id=" << cpu_rank << "/";
  35.     cerr << max_cpus << endl;
  36.    
  37.     // free resources
  38.     MPI_Finalize();
  39.  
  40.     exit(EXIT_SUCCESS);
  41. }
  42.  
  1. // ==================================================================
  2. // Author: Jean-Michel Richer
  3. // Email: jean-michel.richer@univ-angers.fr
  4. // Date: Aug 2016
  5. // Purpose: Demonstrate basic functionnalities of MPI with C++
  6. // ==================================================================
  7. #include <iostream>
  8. #include <cstdlib>
  9. #include <unistd.h> // for sleep
  10. using namespace std;
  11. #include <mpi.h>
  12.  
  13. // ==================================================================
  14. // C++ version
  15. // ==================================================================
  16.  
  17. int main(int argc, char ** argv) {
  18.     int max_cpus, cpu_rank;
  19.     char cpu_name[MPI::MAX_PROCESSOR_NAME];
  20.     int length;
  21.  
  22.     // allocate resources
  23.     MPI::Init(argc, argv);
  24.    
  25.     // get number of programs running
  26.     max_cpus = MPI::COMM_WORLD.Get_size();
  27.     // get program identifier
  28.     cpu_rank = MPI::COMM_WORLD.Get_rank();
  29.     // get cpu name
  30.     memset(cpu_name, 0, MPI::MAX_PROCESSOR_NAME);
  31.     MPI::Get_processor_name(cpu_name, length);
  32.  
  33.     // sleep
  34.     sleep(cpu_rank);
  35.    
  36.     cerr << "running on " << cpu_name << " with id=" << cpu_rank << "/";
  37.     cerr << max_cpus << endl;
  38.    
  39.     // free resources
  40.     MPI::Finalize();
  41.  
  42.     exit(EXIT_SUCCESS);
  43. }
  44.  
  45.  

5.4.1. Compilation

Afin de compiler un programme MPI, on utilise le compilateur mpicc pour le C, mpic++ pour le C++ :

mpic++ -o exe  src.cpp -O2 ... 

5.4.2. Execution

Pour exécuter un programme compilé avec MPI, il faut utiliser mpirun (ou mpiexec) en spécifiant le nombre de processeurs (=processus,coeurs) grâce à l'option -n ou -np suivant les systèmes :

mpirun -n 4 ./c3_mpi_ex_1.exe

Voici le résultat à l'affichage du programme précédent si on l'exécute sur une seule machine Intel Core i3-2375M CPU @ 1.50GHz (Dual Core + HyperThreading, soit 4 threads) :

running on inspiron with id=0/4
running on inspiron with id=1/4
running on inspiron with id=2/4
running on inspiron with id=3/4

Voici un autre exemple qui lance 16 processus sur 4 noeuds en réservant 4 processus (coeurs) sur chacune des machines :

mpiexec -hosts h1:4,h2:4,h3:4,h4:4 -n 16 ./test

Sur un cluster on utilisera par exemple (-pe = parallel environment):

qsub -pe mpi 16 test_mpi.sh

5.5. Synchronisation

Pour obtenir une section critique pour l'affichage on utilise la fonction MPI_Barrier ou MPI::COMM_WORLD.Barrier() en C++ qui bloque l'appelant jusqu'à ce que tous les autres programmes aient appelé cette fonction :

  1. // ==================================================================
  2. // Author: Jean-Michel Richer
  3. // Email: jean-michel.richer@univ-angers.fr
  4. // Date: Aug 2016
  5. // Purpose: Demonstrate basic functionalities of MPI with C++
  6. // with synchronization for display
  7. // ==================================================================
  8. #include <iostream>
  9. #include <cstdlib>
  10. #include <unistd.h> // for sleep
  11. using namespace std;
  12. #include <mpi.h>
  13.  
  14. // ==================================================================
  15. // C++ version
  16. // ==================================================================
  17.  
  18. int main(int argc, char ** argv) {
  19.     int max_cpus, cpu_rank;
  20.     char cpu_name[MPI::MAX_PROCESSOR_NAME];
  21.     int length;
  22.  
  23.     // allocate resources
  24.     MPI::Init(argc, argv);
  25.    
  26.     // get number of programs running
  27.     max_cpus = MPI::COMM_WORLD.Get_size();
  28.     // get program identifier
  29.     cpu_rank = MPI::COMM_WORLD.Get_rank();
  30.     // get cpu name
  31.     memset(cpu_name, 0, MPI::MAX_PROCESSOR_NAME);
  32.     MPI::Get_processor_name(cpu_name, length);
  33.  
  34.     cout << "Hello, from " << cpu_rank << endl;
  35.     sleep( cpu_rank + 1 );
  36.     MPI::COMM_WORLD.Barrier();    
  37.     cout << "Bye, from " << cpu_rank << endl;
  38.    
  39.     // free resources
  40.     MPI::Finalize();
  41.  
  42.     exit(EXIT_SUCCESS);
  43. }
  44.  
  45.  

Dans l'exemple qui suit, les 6 processus commencent par travailler (travail remplacé ici par un sleep) en parallèle. Le premier (identifiant 0) suspend son exécution pendant 1 seconde, le second pendant 2 secondes, etc. Au final, après 6 secondes, l'ensemble des processus exécutent le code qui suit l'appel à MPI::COMM_WORLD.Barrier();.

mpirun -n 6 ./mpi_cpp_syncrho.exe
Hello, from 3
Hello, from 4
Hello, from 5
Hello, from 2
Hello, from 0
Hello, from 1
.... wait 6 seconds here until you finally get ....
Bye, from 0
Bye, from 5
Bye, from 3
Bye, from 4
Bye, from 1
Bye, from 2

5.6. Echange de donneés Send, Recv

Pour transmettre des données entre les processeurs, on utilise deux fonctions MPI_Send et MPI_Recv, ou éventuellement MPI_Sendrecv :

// envoi d'un processus vers un autre processus
int MPI_Send(void* data,
    int count,
    MPI_Datatype datatype,
    int destination,
    int tag,
    MPI_Comm communicator)

// réception d'un processus vers un autre processus
int MPI_Recv(void* data,
    int count,
    MPI_Datatype datatype,
    int source,
    int tag,
    MPI_Comm communicator,
    MPI_Status* status)

// envoi suivi d'une réception    
int MPI_Sendrecv(const void *sendbuf, 
    int sendcount, MPI_Datatype sendtype,
    int dest, int sendtag,
    void *recvbuf, int recvcount, MPI_Datatype recvtype,
    int source, int recvtag,
    MPI_Comm comm, MPI_Status *status)    
data
pointeur sur la donnée à envoyer ou recevoir
count
nombre d'occurrences
datatype
type de donnée (MPI_CHAR, MPI_INT, MPI_FLOAT, MPI::INT, MPI::FLOAT, ...)
source/destination
identifiant du processus qui reçoit ou envoie les données
communicator
canal de comunication, ex: COMM_WORLD
tag
identifiant de message compris entre 0 et 32767
status
le status lors de la réception des données

Pour la partie C++ on utilisera :

MPI::COMM_WORLD.Send(const void* buf, 
	int count, 
	MPI::Datatype& datatype,
	int dest, 
	int tag) const
	
void MPI::COMM_WORLD.Recv(void* buf, 
	int count, 
	MPI::Datatype& datatype,
	int source, 
	int tag, 
	MPI::Status* status) const
	
void MPI::COMM_WORLD::Sendrecv(const void* sendbuf, 
	int count,
	MPI::Datatype& datatype, 
	int dest, int sendtag,
	void* recvbuf, int recvcount,
	MPI::Datatype recvtype, int source, int recvtag,
	MPI::Status* status) const

Voici quelques exemples :

Dans le premier exemple, on utilise deux processeurs. Le maître (identifiant 0) et l'esclave (identifiant 1) :

 Maître   Esclave 
 Création d'un tableau et initialisation    
 Envoi de la taille →    
    Réception de la taille, création du tableau 
 Envoi du tableau →    
    Réception des données dans le tableau 
    Calcul de la somme des éléments 
    ← Envoi de la somme 
 Réception de la somme    
 Affichage de la somme    
Fonctionnement Exemple Send et Receive
  1. // ==================================================================
  2. // Author: Jean-Michel Richer
  3. // Email: jean-michel.richer@univ-angers.fr
  4. // Date: Aug 2016
  5. // Purpose: Demonstrate basic send functionality, send array of float
  6. // from master (rank=0) to slave (rank=1)
  7. // ==================================================================
  8. #include <unistd.h>
  9. #include <iostream>
  10. #include <cstdlib>
  11. #include <unistd.h> // for sleep
  12. using namespace std;
  13. #include <mpi.h>
  14.  
  15. // ==============================================================
  16. // C++ version
  17. // ==============================================================
  18.  
  19. int main(int argc, char ** argv) {
  20.     int max_cpus, cpu_rank, remote_cpu;
  21.     char cpu_name[MPI::MAX_PROCESSOR_NAME];
  22.     int length;
  23.     // data to send
  24.     float *data;
  25.     int data_size;
  26.  
  27.     // Initialize
  28.     MPI::Init(argc, argv);
  29.     max_cpus = MPI::COMM_WORLD.Get_size();
  30.     cpu_rank = MPI::COMM_WORLD.Get_rank();
  31.     MPI::Status status;
  32.     memset(cpu_name, 0, MPI::MAX_PROCESSOR_NAME);
  33.     MPI::Get_processor_name(cpu_name,length);
  34.  
  35.     cerr << cpu_rank << "/" << max_cpus << " on machine " << cpu_name << endl;
  36.    
  37.     // MASTER
  38.     if (cpu_rank == 0) {
  39.         // MASTER
  40.         // processor of id 0 (master) fills and sends the array
  41.         data_size = 100 + rand() % 100;
  42.         data = new float [data_size];
  43.         for (int i=0; i<data_size; ++i) data[i] = i+1;
  44.        
  45.         int remote_cpu = 1;
  46.        
  47.         // 1- send length
  48.         cerr << cpu_rank << " [send]  data_size=" << data_size << endl;
  49.         MPI::COMM_WORLD.Send(&data_size, 1, MPI::INT, remote_cpu, 0);
  50.         // 2- send data
  51.         MPI::COMM_WORLD.Send(&data[0], data_size, MPI::FLOAT, remote_cpu, 0);      
  52.         // wait for other processor to compute and send back sum
  53.         float result = 0;
  54.  
  55.         MPI::COMM_WORLD.Recv(&result, 1, MPI::FLOAT, remote_cpu, MPI::ANY_TAG, status);
  56.         cerr << cpu_rank << " [recv] result=" << result << endl;
  57.         cerr << "result is " << result << " for length=" << data_size;
  58.         cerr << ", expected=" << (data_size * (data_size+1))/ 2  << endl;
  59.         delete [] data;
  60.        
  61.     } else if (cpu_rank == 1) {
  62.         // SLAVE
  63.         remote_cpu = 0;
  64.         // processor of id 1 will receive the array and compute the sum
  65.         // 1- we receive the array length and allocate space
  66.         MPI::COMM_WORLD.Recv(&data_size, 1, MPI::INT, remote_cpu, MPI::ANY_TAG, status);
  67.         cerr << cpu_rank << " [recv] data_size=" << data_size << endl;
  68.         data = new float[data_size];
  69.         // 2- we receive the data
  70.         MPI::COMM_WORLD.Recv(&data[0], data_size, MPI::FLOAT, remote_cpu, MPI::ANY_TAG, status);
  71.         // 3- compute sum
  72.         float sum = 0;
  73.         for (int i=0; i<data_size; ++i) {
  74.             sum += data[i];
  75.         }
  76.         // 4- send back result
  77.         cerr << cpu_rank << " [send]  sum=" << sum << endl;
  78.         MPI::COMM_WORLD.Send(&sum, 1, MPI::FLOAT, remote_cpu, 0);
  79.         delete [] data;
  80.        
  81.     } else {
  82.         // other processors (if any) won't do anything
  83.     }
  84.    
  85.     MPI::Finalize();
  86.  
  87.     exit(EXIT_SUCCESS);
  88. }
  89.  

Le deuxième exemple utilise sendrecv pour envoyer les données du tableau et attendre de recevoir la somme.

  1. #include <unistd.h>
  2. #include <iostream>
  3. #include <cstdlib>
  4. #include <unistd.h> // for sleep
  5. #include <sstream>
  6. using namespace std;
  7. #include <mpi.h>
  8.  
  9. int max_cpus, cpu_rank, remote_cpu;
  10. char cpu_name[MPI::MAX_PROCESSOR_NAME];
  11.    
  12. ostringstream output;
  13.  
  14. void message() {
  15.     cout << "cpu " << cpu_rank << "/" << max_cpus;
  16.     cout << " on " << cpu_name << ": ";
  17.     cout << output.str() << endl;
  18.     cout.flush();
  19.     output.str("");
  20. }
  21.  
  22. // ==============================================================
  23. // version C++
  24. // ==============================================================
  25.  
  26. int main(int argc, char ** argv) {
  27.     int length;
  28.     // data to send
  29.     float *data;
  30.     int data_length;
  31.  
  32.     MPI::Init(argc, argv);
  33.     max_cpus = MPI::COMM_WORLD.Get_size();
  34.     cpu_rank = MPI::COMM_WORLD.Get_rank();
  35.    
  36.     MPI::Status status;
  37.        
  38.     memset(cpu_name, 0, MPI::MAX_PROCESSOR_NAME);
  39.     MPI::Get_processor_name(cpu_name,length);
  40.    
  41.     message();
  42.    
  43.     if (cpu_rank == 0) {
  44.        
  45.         // processor of id 0 is filling and sending the array
  46.         data_length = 100 + rand() % 100;
  47.         data = new float [data_length];
  48.         for (int i=0; i<data_length; ++i) data[i] = i+1;
  49.        
  50.         int remote_cpu = 1;
  51.         // 1- send length
  52.        
  53.         output << "send data_length=" << data_length;
  54.         message();
  55.         MPI::COMM_WORLD.Send(&data_length, 1, MPI::INT, remote_cpu, 0);
  56.        
  57.         // 2- send data wait for other processor to compute and send back sum
  58.        
  59.         float result = 0;
  60.         MPI::COMM_WORLD.Sendrecv(
  61.             &data[0], data_length, MPI::FLOAT, remote_cpu, 0,
  62.             &result, 1, MPI::FLOAT, cpu_rank, MPI::ANY_TAG,
  63.             status);       
  64.        
  65.         output << "result is " << result << " for length=" << data_length;
  66.         message();
  67.         output << "result expected=" << (data_length * (data_length+1))/ 2  << endl;
  68.         message();
  69.        
  70.         delete [] data;
  71.        
  72.     } else if (cpu_rank == 1) {
  73.        
  74.         // processor of id 1 will receive the array and compute the sum
  75.         // 1- we receive the array length and allocate space
  76.         remote_cpu = 0;
  77.         MPI::COMM_WORLD.Recv(&data_length, 1, MPI::INT, remote_cpu, MPI::ANY_TAG, status);
  78.        
  79.         output << "receive data_length=" << data_length;
  80.         message();
  81.        
  82.         data = new float[data_length];
  83.        
  84.         // 2- we receive the data
  85.         MPI::COMM_WORLD.Recv(&data[0], data_length, MPI::FLOAT, remote_cpu, MPI::ANY_TAG, status);
  86.        
  87.         // 3- compute sum
  88.         float sum = 0;
  89.         for (int i=0; i<data_length; ++i) {
  90.             sum += data[i];
  91.         }
  92.  
  93.         // 4- send back result
  94.         MPI::COMM_WORLD.Send(&sum, 1, MPI::FLOAT, remote_cpu, 0);
  95.         delete [] data;
  96.        
  97.     } else {
  98.         // other processors won't do anything
  99.     }
  100.    
  101.     MPI::Finalize();
  102.  
  103.     exit(EXIT_SUCCESS);
  104. }
  105.  

Afin de simplifier l'utilisation de MPI on peut créer une interface (wrapper) : j'ai créé, pour ma part, EZMPI.

EZMPI est constitué d'une seule classe Process qui représente un processus et qui permet de récupérer lors de l'initialisation l'identifiant du processeur, le nombre de processus lancés, ... On dispose également de méthodes qui permettent d'envoyer et recevoir un élément (quel que soit le type) ou un tableau d'élements.

On dispose en outre d'un système de log qui permet de consulter le déroulement des échanges entre processus (réception et envoi de données).

  1. // ==================================================================
  2. // Author: Jean-Michel Richer
  3. // Email: jean-michel.richer@univ-angers.fr
  4. // Date: Aug 2020
  5. // Last modified: November 2020
  6. // Purpose: interface and class to facilitate use of MPI
  7. // ==================================================================
  8. #ifndef EZ_MPI_H
  9. #define EZ_MPI_H
  10.  
  11. #include <string>
  12. #include <typeinfo>
  13. #include <sstream>
  14. #include <stdexcept>
  15. #include <cstdint>
  16. using namespace std;
  17. #include <time.h>
  18. #include <unistd.h>
  19. #include <mpi.h>
  20.  
  21. /**
  22.  * EZ MPI is a wrapper for MPI C++. It simplifies the use
  23.  * of MPI send, receive, gather, scatter functions.
  24.  * For the gather, scatter and reduce functions the processor
  25.  * of rank 0 is considered as the "master" that collects or
  26.  * sends data.
  27.  */
  28.  
  29. namespace ez {
  30.  
  31. namespace mpi {
  32.  
  33. /*
  34.  * Process Information.
  35.  * This class also acts as a logger so it can be used to print
  36.  * information.
  37.  */
  38. class Process {
  39. protected:
  40.     // maximum number of process working together
  41.     int m_max;
  42.     // identifier of current process, called rank for MPI
  43.     int m_id;
  44.     // identifier of remote process for send, receive, ...
  45.     int m_remote;
  46.     // status of last operation
  47.     MPI::Status m_status;
  48.     // message tag if needed (default is 0)
  49.     int m_message_tag;
  50.     // name of process
  51.     string m_name;
  52.     // Linux process identifier
  53.     int m_pid;
  54.     // verbose mode
  55.     bool m_verbose_flag;
  56.     // verbose mode
  57.     bool m_log_flag;
  58.     // main output stream for current processor
  59.     ostringstream log_stream;
  60.     // temporary output stream
  61.     ostringstream tmp_log;
  62.    
  63. private:
  64.     /**
  65.      * Find processor name
  66.      */
  67.     void find_cpu_name();
  68.    
  69.     /**
  70.      * record output of oss into general output
  71.      */
  72.     void append();
  73.    
  74.     /**
  75.      * initialize max_cpus, cpu_rank, processus id
  76.      */
  77.     void init();
  78.    
  79. public:
  80.            
  81.     /**
  82.      * Default constructor
  83.      */
  84.     Process(int argc, char *argv[], bool verbose=false);
  85.    
  86.     ~Process();
  87.    
  88.     /**
  89.      * Display output of each processor if verbose mode is on
  90.      */
  91.     void finalize();
  92.    
  93.     /**
  94.      * set verbose mode
  95.      */
  96.     void verbose(bool mode) {
  97.         m_verbose_flag = mode;
  98.     }
  99.    
  100.     /**
  101.      * set log mode
  102.      */
  103.     void log(bool mode) {
  104.         m_log_flag = mode;
  105.     }
  106.    
  107.    
  108.    
  109.     /**
  110.      * Get process identifier
  111.      */
  112.     int pid();
  113.    
  114.     /**
  115.      * Get processor identifier or rank
  116.      */
  117.     int id();
  118.    
  119.     /**
  120.      * Get number of processors used
  121.      */
  122.     int max();
  123.    
  124.     /**
  125.      * Get processor name
  126.      */
  127.     string name();
  128.    
  129.    
  130.     /**
  131.      * set remote processor identifier
  132.      */
  133.     void remote(int rmt);
  134.    
  135.     /**
  136.      * set message tag
  137.      * @param tag must be an integer between 0 and 32767
  138.      */
  139.     void tag(int tag);
  140.    
  141.     /**
  142.      * Return true if this processor is the processor of rank 0
  143.      * considered as the master.
  144.      */
  145.     bool is_master() {
  146.         return (m_id == 0);
  147.     }
  148.    
  149.     /**
  150.      * synchronize
  151.      */
  152.     void synchronize();
  153.    
  154.     /**
  155.      * Determine type of data T and convert it into MPI::Datatype.
  156.      * This function needs to be extended with other types.
  157.      */
  158.     template<class T>
  159.     MPI::Datatype get_type() {
  160.         if (typeid(T) == typeid(char)) {
  161.             return MPI::CHAR;
  162.         } else if (typeid(T) == typeid(int8_t)) {  
  163.             return MPI::CHAR;
  164.         } else if (typeid(T) == typeid(uint8_t)) { 
  165.             return MPI::CHAR;  
  166.         } else if (typeid(T) == typeid(int)) {
  167.             return MPI::INT;
  168.         } else if (typeid(T) == typeid(float)) {
  169.             return MPI::FLOAT;
  170.         } else if (typeid(T) == typeid(double)) {
  171.             return MPI::DOUBLE;
  172.         }
  173.         //throw std::runtime_error("unknown MPI::Datatype");
  174.         cout << "!!!!! unknown " << typeid(T).name() << endl;
  175.         return MPI::INT;
  176.     }
  177.    
  178.     /**
  179.      * Send one instance of data to remote_cpu
  180.      * @param v data to send
  181.      */
  182.     template<class T>
  183.     void send(T& v) {
  184.         MPI::Datatype data_type = get_type<T>();
  185.        
  186.         tmp_log << "send value=" << v << " to " << m_remote << endl;
  187.         flush();
  188.        
  189.         MPI::COMM_WORLD.Send(&v, 1, data_type, m_remote, m_message_tag);
  190.     }
  191.    
  192.     /**
  193.      * Send an array to remote_cpu
  194.      * @param arr address of the array
  195.      * @param size number of elements to send
  196.      */
  197.     template<class T>
  198.     void send(T *arr, int size) {
  199.         MPI::Datatype data_type = get_type<T>();
  200.        
  201.         tmp_log << "send array of size=" << size << " to " << m_remote << endl;
  202.         flush();
  203.                
  204.         MPI::COMM_WORLD.Send(&arr[0], size, data_type, m_remote, m_message_tag);
  205.     }
  206.    
  207.     /**
  208.      * Receive one instance of data from remote cpu
  209.      * @param v data to receive
  210.      */
  211.     template<class T>
  212.     void recv(T& v) {
  213.         MPI::Datatype data_type = get_type<T>();
  214.        
  215.         MPI::COMM_WORLD.Recv(&v, 1, data_type, m_remote,
  216.             (m_message_tag == 0) ? MPI::ANY_TAG : m_message_tag,
  217.                     m_status);
  218.            
  219.         tmp_log << "receive value=" << v << " from " << m_remote << endl;
  220.         flush();
  221.            
  222.     }
  223.    
  224.     /**
  225.      * Receive an array of given size
  226.      * @param arr pointer to address of the array
  227.      * @param size number of elements
  228.      */
  229.     template<class T>
  230.     void recv(T *arr, int size) {
  231.         MPI::Datatype data_type = get_type<T>();
  232.        
  233.         MPI::COMM_WORLD.Recv(&arr[0], size, data_type, m_remote,
  234.             (m_message_tag == 0) ? MPI::ANY_TAG : m_message_tag,
  235.                     m_status);
  236.            
  237.         tmp_log << "receive array of size=" << size << " from " << m_remote << endl;
  238.         flush();
  239.            
  240.     }
  241.    
  242.     /**
  243.      * Send array and receive value in return, this is an instance
  244.      * of the Sendrecv function.
  245.      * @param arr address of the array to send
  246.      * @param size size of the array to send
  247.      * @param value value to receive
  248.      */
  249.     template<class T, class U>
  250.     void sendrecv(T *array, int size, U& value) {
  251.         MPI::Datatype array_data_type = get_type<T>();
  252.         MPI::Datatype value_data_type = get_type<U>();
  253.        
  254.         tmp_log << "sendrecv/send array of size=" << size << endl;
  255.         flush();
  256.                
  257.         MPI::COMM_WORLD.Sendrecv(&array[0], size, array_data_type, m_remote, 0,
  258.             &value, 1, value_data_type, MPI::ANY_SOURCE, MPI::ANY_TAG,
  259.             m_status);
  260.            
  261.         tmp_log << "sendrecv/receive value=" << value << endl;
  262.         flush();
  263.            
  264.     }
  265.    
  266.     /**
  267.      * Perform reduction
  268.      * @param lcl_value local array used to perform reduction
  269.      * @param glb_value global data that will contain result
  270.      * @param op operation to perform (MPI::SUM, MPI::MAX, ...)
  271.      */
  272.     template<class T>
  273.     void reduce(T &lcl_value, T &glb_value, const MPI::Op& op) {
  274.         MPI::Datatype data_type = get_type<T>();
  275.        
  276.         MPI::COMM_WORLD.Reduce(&lcl_value,
  277.             &glb_value, 1, data_type, op, 0);
  278.            
  279.         tmp_log << "reduction gives value=" << glb_value << endl;
  280.         flush();
  281.  
  282.     }
  283.    
  284.     /**
  285.      * Perform gather operation
  286.      * @param lcl_array local array that is send to master process
  287.      * @param glb_array global array that will contain all local arrays
  288.      */
  289.     template<class T>
  290.     void gather(T *lcl_array, int size, T *glb_array) {
  291.         MPI::Datatype data_type = get_type<T>();
  292.        
  293.         MPI::COMM_WORLD.Gather(lcl_array, size, data_type,
  294.             glb_array, size, data_type, 0);
  295.            
  296.         tmp_log << "gather" << endl;
  297.         flush();
  298.  
  299.     }
  300.    
  301.     /**
  302.      * Perform scatter operation
  303.      * @param glb_array array of data that will be send by to all processors
  304.      * @param size size of the local array of data
  305.      * @param lcl_array local array of data
  306.      */
  307.     template<class T>
  308.     void scatter(T *glb_array, int size, T *lcl_array) {
  309.         MPI::Datatype data_type = get_type<T>();
  310.        
  311.         MPI::COMM_WORLD.Scatter(glb_array, size, data_type,
  312.             lcl_array, size, data_type, 0);
  313.            
  314.         tmp_log << "scatter" << endl;
  315.         flush();
  316.  
  317.     }
  318.    
  319.     typedef std::ostream& (*ManipFn)(std::ostream&);
  320.     typedef std::ios_base& (*FlagsFn)(std::ios_base&);
  321.    
  322.     void print(char v);
  323.     void print(int v);
  324.     void print(string s);
  325.     void print(float f);
  326.     void print(double d);
  327.    
  328.     template<class T> // int, double, strings, etc
  329.     Process& operator<<(const T& output) {
  330.         tmp_log << output;
  331.         return *this;
  332.     }
  333.  
  334.     // endl, flush, setw, setfill, etc.
  335.     Process& operator<<(ManipFn manip) {
  336.         manip(tmp_log);
  337.         if (manip == static_cast<ManipFn>(std::flush) || manip == static_cast<ManipFn>(std::endl)) {
  338.           this->flush();
  339.         }
  340.         return *this;
  341.     }
  342.  
  343.     // setiosflags, resetiosflags
  344.     Process& operator<<(FlagsFn manip) {
  345.         manip(tmp_log);
  346.         return *this;
  347.     }
  348.  
  349.     void flush();
  350.  
  351.     void logs(ostream& out);
  352.    
  353.     typedef void (*Code)(Process& p);
  354.    
  355.     void run(Code code) {
  356.         code(*this);
  357.     }
  358. };
  359.  
  360.  
  361.  
  362. } // end of namespace mpi
  363.  
  364. } // end of namespace ez
  365. #endif
  366.  
  1. // ==================================================================
  2. // Author: Jean-Michel Richer
  3. // Email: jean-michel.richer@univ-angers.fr
  4. // Date: Aug 2020
  5. // Last modified: November 2020
  6. // Purpose: interface and class to facilitate use of MPI
  7. // ==================================================================
  8.  
  9. #include "ezmpi.h"
  10.  
  11. using namespace ez::mpi;
  12.  
  13. void Process::find_cpu_name() {
  14.     char name[MPI::MAX_PROCESSOR_NAME];
  15.     int length;
  16.    
  17.     memset(name, 0, MPI::MAX_PROCESSOR_NAME);
  18.     MPI::Get_processor_name(name,length);
  19.     m_name = name;
  20. }
  21.  
  22. void Process::init() {
  23.     m_max = MPI::COMM_WORLD.Get_size();
  24.     m_id = MPI::COMM_WORLD.Get_rank();
  25.     find_cpu_name();
  26.     m_pid = getpid();
  27.     if (m_verbose_flag) {
  28.         tmp_log << "pid=" << m_pid << ", id=" << m_id << endl;
  29.         flush();
  30.     }
  31.  
  32. }
  33.  
  34. Process::Process(int argc, char *argv[], bool verbose) {
  35.     m_remote = 0;
  36.     m_message_tag = 0;
  37.     m_verbose_flag = verbose;
  38.     m_log_flag = false;
  39.     MPI::Init(argc, argv);
  40.     init();
  41. }
  42.  
  43. void Process::logs(ostream& out) {
  44.     m_verbose_flag = m_log_flag = false;
  45.     if (m_id == 0) {
  46.         out.flush();
  47.         out << std::endl;
  48.         out << "====================" << std::endl;
  49.         out << "=== FINAL RESULT ===" << std::endl;
  50.         out << "====================" << std::endl;
  51.         out << "---------------------" << std::endl;
  52.         out << "CPU " << m_id << std::endl;
  53.         out << "---------------------" << std::endl;
  54.         out << log_stream.str();
  55.         out.flush();
  56.         remote( 1 );
  57.         int token = -255;
  58.         send( token );
  59.     } else {
  60.         remote( m_id - 1 );
  61.         int token;
  62.         recv(token);           
  63.         out << "---------------------" << std::endl;
  64.         out << "CPU " << m_id << std::endl;
  65.         out << "---------------------" << std::endl;
  66.         out << log_stream.str();
  67.         out.flush();
  68.         if (m_id < m_max - 1) {
  69.             remote( m_id + 1 );
  70.             token = -255;
  71.             send(token);
  72.         }
  73.        
  74.     }
  75.    
  76.    
  77. }
  78.    
  79. void Process::finalize() {
  80.     MPI::COMM_WORLD.Barrier();
  81.     MPI::Finalize();
  82. }
  83.  
  84. Process::~Process() {
  85.     finalize();
  86. }
  87.  
  88. int Process::pid() {
  89.     return m_pid;
  90. }
  91.  
  92. int Process::id() {
  93.     return m_id;
  94. }
  95.  
  96. int Process::max() {
  97.     return m_max;
  98. }
  99.    
  100. string Process::name() {
  101.     return m_name;
  102. }
  103.    
  104. void Process::tag(int tag) {
  105.     m_message_tag = tag;
  106. }
  107.  
  108. void Process::remote(int rmt) {
  109.     m_remote = rmt;
  110. }
  111.  
  112. void Process::synchronize() {
  113.     MPI::COMM_WORLD.Barrier();
  114. }
  115.  
  116. // --------------------------
  117. // output
  118. // --------------------------
  119.  
  120. const std::string currentDateTime() {
  121.     time_t     now = time(0);
  122.     struct tm  tstruct;
  123.     char       buf[80];
  124.     tstruct = *localtime(&now);
  125.     //strftime(buf, sizeof(buf), "%Y-%m-%d.%X [%s]", &tstruct);
  126.     strftime(buf, sizeof(buf), "%X", &tstruct);
  127.    
  128.     return buf;
  129. }
  130.  
  131. void Process::flush() {
  132.     string str = currentDateTime();
  133.     if (m_log_flag) {
  134.         log_stream << str << " cpu " << m_id << "/" << m_max << ": " << tmp_log.str();
  135.     }
  136.     if (m_verbose_flag) {
  137.         cerr << str << " cpu " << m_id << "/" << m_max << ": " << tmp_log.str();
  138.     }
  139.     tmp_log.str("");
  140. }
  141.  
  142. void Process::print(char v) {
  143.     tmp_log << v;
  144. }
  145.  
  146. void Process::print(int v) {
  147.     tmp_log << v;
  148. }
  149.  
  150. void Process::print(string v) {
  151.     tmp_log << v;
  152. }
  153.  
  154. void Process::print(float v) {
  155.     tmp_log << v;
  156. }
  157.  
  158. void Process::print(double v) {
  159.     tmp_log << v;
  160. }
  161.    
  162.  
  163.  

On peut alors réécrire les programes précédents de manière plus simple :

  1. // ==================================================================
  2. // Author: Jean-Michel Richer
  3. // Email: jean-michel.richer@univ-angers.fr
  4. // Date: Aug 2020
  5. // Last modified: November 2020
  6. // Purpose: Demonstrate basic send functionality, send array of float
  7. // from master (rank=0) to slave (rank=1)
  8. // ==================================================================
  9. #include <unistd.h>
  10. #include <iostream>
  11. #include <cstdlib>
  12. #include <unistd.h> // for sleep
  13. using namespace std;
  14. #include "ezmpi.h"
  15. using namespace ez::mpi;
  16.  
  17. /**
  18.  * run master and slaves
  19.  */
  20. void run(int argc, char *argv[]) {
  21.  
  22.     // data to send
  23.     float *data;
  24.     int data_size;
  25.  
  26.     // Create Process from ezmpi
  27.     // that retrives cpu rank (id), # cpus (max), cpu name (name)
  28.     Process p(argc, argv);
  29.      
  30.     // the master is the process with id() == 0
  31.     if (p.is_master()) {
  32.         // Code of master
  33.        
  34.         // set remote cpu identifier that will communicate with master
  35.         p.remote(1);
  36.        
  37.         // processor of id 0 (master) fills and sends the array
  38.         data_size = 100 + rand() % 100;
  39.         data = new float [data_size];
  40.         for (int i=0; i<data_size; ++i) data[i] = i+1;
  41.        
  42.         // 1- send size of array to remote
  43.         p.send(data_size);
  44.         // 2- send data of array to remote
  45.         p.send(data, data_size);       
  46.        
  47.         // wait for other processor to compute and send back sum
  48.         float result = 0;
  49.         p.recv(result);
  50.        
  51.         p << "result is " << result << " for length=" << data_size;
  52.         p << ", expected=" << (data_size * (data_size+1))/ 2  << endl;
  53.        
  54.         delete [] data;    
  55.        
  56.     } else if (p.id() == 1) {
  57.         // Code for slave
  58.        
  59.         // tells to Process to send data to master processor
  60.         p.remote(0);
  61.        
  62.         // process of id 1 will receive the array and compute the sum
  63.         // 1- we receive the array length and allocate space
  64.         p.recv(data_size);
  65.    
  66.         data = new float[data_size];
  67.        
  68.         // 2- we receive the data
  69.         p.recv(data, data_size);
  70.        
  71.         // 3- compute sum
  72.         float sum = 0;
  73.         for (int i=0; i<data_size; ++i) {
  74.             sum += data[i];
  75.         }
  76.        
  77.         // 4- send back result
  78.         p.send(sum);
  79.        
  80.         delete [] data;
  81.        
  82.     } else {
  83.         // other process (if any) won't do anything
  84.         p << "is idle" << endl;
  85.     }
  86.    
  87.     sleep(1);
  88.    
  89.     p.logs(cout);
  90.    
  91.     // desctuctor of Process p will call the finalize method
  92.     // of MPI
  93. }  
  94.  
  95. /**
  96.  * main function
  97.  *
  98.  */
  99. int main(int argc, char ** argv) {
  100.     // call a function that contains code for master and slave
  101.     // in order to avoid problems of initialization and finalization
  102.     run(argc, argv);
  103.    
  104.     exit(EXIT_SUCCESS);
  105. }
  106.  
  1. // ==================================================================
  2. // Author: Jean-Michel Richer
  3. // Email: jean-michel.richer@univ-angers.fr
  4. // Date: Aug 2020
  5. // Last modified: November 2020
  6. // Purpose: Demonstrate basic send functionality, send array of float
  7. // from master (rank=0) to slave (rank=1)
  8. // ==================================================================
  9. #include <unistd.h>
  10. #include <iostream>
  11. #include <cstdlib>
  12. #include <unistd.h> // for sleep
  13. #include <sstream>
  14. using namespace std;
  15. #include "ezmpi.h"
  16. using namespace ez::mpi;
  17.  
  18. /**
  19.  * run master and slaves
  20.  */
  21. void run(int argc, char *argv[]) {
  22.     // data to send
  23.     float *data;
  24.     int data_size;
  25.  
  26.     // Create Process from ezmpi
  27.     // that retrives cpu rank (id), # cpus (max), cpu name (name)
  28.     Process p(argc, argv);
  29.    
  30.     if (p.is_master()) {
  31.         // Code of master
  32.        
  33.         // set remote cpu identifier that will communicate with master
  34.         p.remote(1);
  35.        
  36.         // processor of id 0 (master) fills and sends the array
  37.         data_size = 100 + rand() % 100;
  38.         data = new float [data_size];
  39.         for (int i=0; i<data_size; ++i) data[i] = i+1;
  40.        
  41.         // 1- send size of array to remote
  42.         p.send(data_size);
  43.        
  44.        
  45.         // 2- send data of array to remote and receive result
  46.         float result = 0;
  47.         p.sendrecv(data, data_size, result);   
  48.        
  49.         p << "result is " << result << " for length=" << data_size;
  50.         p << ", expected=" << (data_size * (data_size+1))/ 2  << endl;
  51.        
  52.         delete [] data;    
  53.        
  54.     } else if (p.id() == 1) {
  55.         // Code for slave
  56.        
  57.         // tells to Process to send data to master processor
  58.         p.remote(0);
  59.        
  60.         // process of id 1 will receive the array and compute the sum
  61.         // 1- we receive the array length and allocate space
  62.         p.recv(data_size);
  63.    
  64.         data = new float[data_size];
  65.        
  66.         // 2- we receive the data
  67.         p.recv(data, data_size);
  68.        
  69.         // 3- compute sum
  70.         float sum = 0;
  71.         for (int i=0; i<data_size; ++i) {
  72.             sum += data[i];
  73.         }
  74.        
  75.         // 4- send back result
  76.         p.send(sum);
  77.        
  78.         delete [] data;
  79.        
  80.     } else {
  81.         // other process (if any) won't do anything
  82.         p << "is idle" << endl;
  83.     }
  84.    
  85.     sleep(1);
  86.    
  87.     p.logs(cout);
  88.    
  89.     // desctuctor of Process p will call the finalize method
  90.     // of MPI
  91. }
  92.  
  93. // ==============================================================
  94. // version C++
  95. // ==============================================================
  96. int main(int argc, char ** argv) {
  97.     // call a function that contains code for master and slave
  98.     // in order to avoid problems of initialization and finalization
  99.     run(argc, argv);
  100.    
  101.     exit(EXIT_SUCCESS);
  102. }
  103.  

5.7. Réduction

Certains traitements comme la réduction ou le scan sont implantés spécifiquement pour MPI.

Par exemple pour la réduction :

  1. // ==================================================================
  2. // Author: Jean-Michel Richer
  3. // Date: 20 Aug 2016
  4. // Purpose: Use of MPI::Reduce with function to integrate
  5. // ==================================================================
  6. #include <unistd.h>
  7. #include <iostream>
  8. #include <cstdlib>
  9. #include <unistd.h> // for sleep
  10. #include <sstream>
  11. using namespace std;
  12. #include "ezmpi.h"
  13. using namespace ez::mpi;
  14.  
  15. /**
  16.  * function to integrate f(x)
  17.  */
  18. double f(double x) {
  19.     return x * x;
  20. }
  21.  
  22. /**
  23.  * integrate from
  24.  * @param a lower bound
  25.  * @param b upper bound
  26.  * @param n number of steps
  27.  */
  28. double integrate(double a, double b, int n) {
  29.     double dx = (b-a) / n;
  30.    
  31.     double sum = 0;
  32.     for (int i=1; i<n; ++i) {
  33.         sum += f(a + i * dx);
  34.     }
  35.     return sum * dx;
  36. }
  37.  
  38. /**
  39.  * run master and slaves
  40.  */
  41. void run(int argc, char *argv[]) {
  42.    
  43.     Process p(argc, argv);
  44.    
  45.     // data to send
  46.     double a = 1.0;
  47.     double b = 3.0;
  48.     int steps = 10000000;
  49.    
  50.     double x_range = (b - a) / p.max();
  51.     int steps_range = steps / p.max();
  52.    
  53.     double local_a = a + p.id() * x_range;
  54.     double local_b = local_a + x_range;
  55.     double local_result = integrate(local_a, local_b, steps_range);
  56.    
  57.     p << "integrate from " << local_a << " to " << local_b;
  58.     p << " during " << steps_range << " steps" << endl;
  59.     p << "local_result=" << local_result << endl;
  60.    
  61.     double final_result = 0;
  62.     p.synchronize();
  63.    
  64.     p.reduce(local_result, final_result, MPI::SUM);
  65.    
  66.     // expected result should be (9 - 1/3) = 8.6666
  67.     if (p.is_master()) {
  68.         p << "final result=" << final_result << endl;
  69.     }
  70.    
  71.     sleep(1);
  72.     p.logs(cout);
  73.    
  74. }
  75.  
  76. /**
  77.  * main function
  78.  */
  79. int main(int argc, char ** argv) {
  80.    
  81.     // call a function that contains code for master and slave
  82.     // in order to avoid problems of initialization and finalization
  83.     run(argc, argv);
  84.    
  85.    
  86.     exit(EXIT_SUCCESS);
  87. }
  88.  

On calcule l'intégrale de la fonction $f(x) = x^2$ entre $x=1$ et $x=3$. Pour cela on utilise la méthode des rectangles. On veut 10_000_000 de rectangles entre 1 et 3. Si on utilise 4 processeurs on aura donc $10\_000\_000 / 4 = 2\_500\_000$ itérations par processeur. Au final, le résultat de la somme de chaque partie de l'intégrale est renvoyé au maître qui en fait la somme.

---------------------
CPU 0
---------------------
2016-08-24.02:37:35  cpu 0/4: pid=22172, rank=0
2016-08-24.02:37:35  cpu 0/4: integrate from 1 to 1.5 during 2500000 steps
2016-08-24.02:37:35  cpu 0/4: local_result=0.791666
2016-08-24.02:37:35  cpu 0/4: reduce gives value=8.66666
2016-08-24.02:37:35  cpu 0/4: final result=8.66666
---------------------
CPU 1
---------------------
2016-08-24.02:37:35  cpu 1/4: pid=22173, rank=1
2016-08-24.02:37:35  cpu 1/4: integrate from 1.5 to 2 during 2500000 steps
2016-08-24.02:37:35  cpu 1/4: local_result=1.54167
2016-08-24.02:37:35  cpu 1/4: reduce gives value=0
---------------------
CPU 2
---------------------
2016-08-24.02:37:35  cpu 2/4: pid=22174, rank=2
2016-08-24.02:37:35  cpu 2/4: integrate from 2 to 2.5 during 2500000 steps
2016-08-24.02:37:35  cpu 2/4: local_result=2.54167
2016-08-24.02:37:35  cpu 2/4: reduce gives value=0
---------------------
CPU 3
---------------------
2016-08-24.02:37:35  cpu 3/4: pid=22175, rank=3
2016-08-24.02:37:35  cpu 3/4: integrate from 2.5 to 3 during 2500000 steps
2016-08-24.02:37:35  cpu 3/4: local_result=3.79167
2016-08-24.02:37:35  cpu 3/4: reduce gives value=0

5.8. Gather et scatter

MPI propose deux opérations inverses :

scatter gather

Voici un exemple pour lequel $K$ processus créent des tableaux de 10 entiers qui sont envoyés au master (processus de rang 0) :

  1. // ==================================================================
  2. // Author: Jean-Michel Richer
  3. // Date: 20 Aug 2016
  4. // Purpose: Use of MPI::Scatter
  5. // The master process creates its own local data and they are send
  6. // to other process (of rank not equal to 0)
  7. // ==================================================================
  8. #include <unistd.h>
  9. #include <iostream>
  10. #include <cstdlib>
  11. #include <unistd.h> // for sleep
  12. using namespace std;
  13. #include "ezmpi.h"
  14. using namespace ez::mpi;
  15.  
  16. /**
  17.  * run master and slaves
  18.  */
  19. void run(int argc, char *argv[]) {
  20.    
  21.     Process p(argc, argv);
  22.    
  23.     const int local_data_size = 10;
  24.     int *local_data;
  25.     int global_data_size = p.max() * local_data_size;
  26.     int *global_data = NULL;
  27.    
  28.     local_data = new int [ local_data_size ];
  29.     for (int i=0; i<local_data_size; ++i) {
  30.         local_data[i] = 0;
  31.     }
  32.        
  33.    
  34.     if (p.is_master()) {
  35.    
  36.         // master process will send data to others
  37.         global_data = new int [ global_data_size ];
  38.         for (int i=0; i<global_data_size; ++i) global_data[i] =  i+1;
  39.        
  40.         p << "global array = [";
  41.         for (int i=0; i<global_data_size; ++i) p << global_data[i] << " ";
  42.         p << "]" << endl;
  43.     }
  44.    
  45.     // Master sends data to all others
  46.     p.scatter(global_data, local_data_size, local_data);
  47.    
  48.     // Each process reports the data it has received           
  49.     p << "local array=[";
  50.     for (int i=0; i<local_data_size; ++i) {
  51.         p << local_data[i] << " ";
  52.     }
  53.     p << "]" << endl;
  54.  
  55.     sleep(1);
  56.    
  57.     p.logs(cout);
  58. }
  59.  
  60. // ==================================================================
  61. // C++ version
  62. // ==================================================================
  63. int main(int argc, char ** argv) {
  64.     // call a function that contains code for master and slave
  65.     // in order to avoid problems of initialization and finalization
  66.     run(argc, argv);
  67.    
  68.     exit(EXIT_SUCCESS);
  69. }
  70.  
\$ mpirun -n 4 ./ezmpi_scatter.exe
====================
=== FINAL RESULT ===
====================
---------------------
CPU 0
---------------------
14:15:05 cpu 0/4: global array = [1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 ]
14:15:05 cpu 0/4: scatter
14:15:05 cpu 0/4: local array=[1 2 3 4 5 6 7 8 9 10 ]
---------------------
CPU 1
---------------------
14:15:05 cpu 1/4: scatter
14:15:05 cpu 1/4: local array=[11 12 13 14 15 16 17 18 19 20 ]
---------------------
CPU 2
---------------------
14:15:05 cpu 2/4: scatter
14:15:05 cpu 2/4: local array=[21 22 23 24 25 26 27 28 29 30 ]
---------------------
CPU 3
---------------------
14:15:05 cpu 3/4: scatter
14:15:05 cpu 3/4: local array=[31 32 33 34 35 36 37 38 39 40 ]
  1. // ==================================================================
  2. // Author: Jean-Michel Richer
  3. // Date: Aug 2020
  4. // Last modified: November 2020
  5. // Purpose: Use of MPI::Gather
  6. // Each process creates its own local data and they are all send
  7. // to the master process (of rank 0)
  8. // ==================================================================
  9. #include <unistd.h>
  10. #include <iostream>
  11. #include <cstdlib>
  12. #include <unistd.h> // for sleep
  13. #include <sstream>
  14. using namespace std;
  15. #include "ezmpi.h"
  16. using namespace ez::mpi;
  17.  
  18. /**
  19.  * run master and slaves
  20.  */
  21. void run(int argc, char *argv[]) {
  22.  
  23.     // data to send
  24.     float *data;
  25.     int data_size;
  26.  
  27.     // Create Process from ezmpi
  28.     // that retrives cpu rank (id), # cpus (max), cpu name (name)
  29.     Process p(argc, argv);
  30.    
  31.     const int local_data_size = 10;
  32.     int *local_data;
  33.     int global_data_size = 0;
  34.     int *global_data = NULL;
  35.    
  36.     // each processor creates its local data
  37.     local_data = new int [ local_data_size ];
  38.     for (int i=0; i<local_data_size; ++i) {
  39.         local_data[i] = (p.id() * 10) + i + 1;
  40.     }
  41.        
  42.     p << "local array=[";
  43.     for (int i=0; i<local_data_size; ++i) {
  44.         p << local_data[i] << " ";
  45.     }
  46.     p << "]" << endl;
  47.    
  48.     if (p.is_master()) {
  49.         // master process will gather data from others
  50.         global_data_size = p.max() * local_data_size ;
  51.         global_data = new int [ global_data_size ];
  52.         for (int i=0; i<global_data_size; ++i) global_data[i] = 0;
  53.     }
  54.    
  55.     p.gather(local_data, local_data_size, global_data);
  56.    
  57.     if (p.is_master()) {
  58.         p << "global array = [";
  59.         for (int i=0; i<global_data_size; ++i) {
  60.             p << global_data[i] << " ";
  61.         }
  62.         p << "]" << endl;;
  63.     }
  64.    
  65.     sleep(1);
  66.    
  67.     p.logs(cout);
  68. }
  69.    
  70. // ==================================================================
  71. // C++ version
  72. // ==================================================================
  73. int main(int argc, char ** argv) {
  74.     // call a function that contains code for master and slave
  75.     // in order to avoid problems of initialization and finalization
  76.     run(argc, argv);
  77.    
  78.     exit(EXIT_SUCCESS);
  79. }
  80.  
\$ mpirun -n 4 ./ezmpi_gather.exe
====================
=== FINAL RESULT ===
====================
---------------------
CPU 0
---------------------
14:19:02 cpu 0/4: local array=[1 2 3 4 5 6 7 8 9 10 ]
14:19:02 cpu 0/4: gather
14:19:02 cpu 0/4: global array = [1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 ]
---------------------
CPU 1
---------------------
14:19:02 cpu 1/4: local array=[11 12 13 14 15 16 17 18 19 20 ]
14:19:02 cpu 1/4: gather
---------------------
CPU 2
---------------------
14:19:02 cpu 2/4: local array=[21 22 23 24 25 26 27 28 29 30 ]
14:19:02 cpu 2/4: gather
---------------------
CPU 3
---------------------
14:19:02 cpu 3/4: local array=[31 32 33 34 35 36 37 38 39 40 ]
14:19:02 cpu 3/4: gather

5.9. Broadcast (diffuser)

Enfin, l'opération broadcast envoie à tous les esclaves une copie d'un tableau détenu par le maître par exemple :

  1. // ==================================================================
  2. // Author: Jean-Michel Richer
  3. // Date: Aug 2020
  4. // Last modified: November 2020
  5. // Purpose: Use of MPI::BCast
  6. // The master process creates its own local data and they are send
  7. // to other process (of rank not equal to 0)
  8. // ==================================================================
  9. #include <unistd.h>
  10. #include <iostream>
  11. #include <cstdlib>
  12. #include <unistd.h> // for sleep
  13. using namespace std;
  14. #include "ezmpi.h"
  15. using namespace ez::mpi;
  16.  
  17. /**
  18.  * run master and slaves
  19.  */
  20. void run(int argc, char *argv[]) {
  21.    
  22.     Process p(argc, argv);
  23.    
  24.  
  25.     int global_data_size = 100;
  26.     int *global_data = NULL;
  27.    
  28.     global_data = new int [ global_data_size ];
  29.     for (int i=0; i<global_data_size; ++i) {
  30.         global_data[i] = 0;
  31.     }
  32.        
  33.    
  34.     if (p.is_master()) {
  35.    
  36.         // master process will send data to others
  37.         for (int i=0; i<global_data_size; ++i) global_data[i] =  i+1;
  38.        
  39.         p << "global array = [";
  40.         for (int i=0; i<global_data_size; ++i) p << global_data[i] << " ";
  41.         p << "]" << endl;
  42.     }
  43.    
  44.     // Master sends data to all others
  45.     //p.synchronize();
  46.     p.broadcast(global_data, global_data_size, 0);
  47.    
  48.    
  49.     // Each process reports the data it has received           
  50.     p << "local copy array=[";
  51.     for (int i=0; i<global_data_size; ++i) {
  52.         p << global_data[i] << " ";
  53.     }
  54.     p << "]" << endl;
  55.  
  56.     sleep(1);
  57.    
  58.     p.logs(cout);
  59. }
  60.  
  61. // ==================================================================
  62. // C++ version
  63. // ==================================================================
  64. int main(int argc, char ** argv) {
  65.     // call a function that contains code for master and slave
  66.     // in order to avoid problems of initialization and finalization
  67.     run(argc, argv);
  68.    
  69.     exit(EXIT_SUCCESS);
  70. }
  71.  
\$ mpirun -n 4 ./ezmpi_broadcast.exe
====================
=== FINAL RESULT ===
====================
---------------------
CPU 0
---------------------
17:08:27 cpu 0/4: global array on master = [1 2 3 4 5 6 7 8 9 10 ]
17:08:27 cpu 0/4: broadcast
17:08:27 cpu 0/4: global array (after broadcast) =[1 2 3 4 5 6 7 8 9 10 ]
---------------------
CPU 1
---------------------
17:08:27 cpu 1/4: global array on slave = [0 0 0 0 0 0 0 0 0 0 ]
17:08:27 cpu 1/4: broadcast
17:08:27 cpu 1/4: global array (after broadcast) =[1 2 3 4 5 6 7 8 9 10 ]
---------------------
CPU 2
---------------------
17:08:27 cpu 2/4: global array on slave = [0 0 0 0 0 0 0 0 0 0 ]
17:08:27 cpu 2/4: broadcast
17:08:27 cpu 2/4: global array (after broadcast) =[1 2 3 4 5 6 7 8 9 10 ]
---------------------
CPU 3
---------------------
17:08:27 cpu 3/4: global array on slave = [0 0 0 0 0 0 0 0 0 0 ]
17:08:27 cpu 3/4: broadcast
17:08:27 cpu 3/4: global array (after broadcast) =[1 2 3 4 5 6 7 8 9 10 ]

5.10. Liens

5.11. Exercices

Exercice 5.1

Une suite de Syracuse est une suite d'entiers naturels définie de la manière suivante : on part d'un nombre entier strictement positif

  • s'il est pair, on le divise par 2
  • s'il est impair, on le multiplie par 3 et on ajoute 1

Cette suite possède la propriété de converger vers 1 après un certain nombre d'étapes.

Mettre en place une solution MPI avec trois instances :

  • le maître génére un nombre entier aléatoire $x$
  • si le nombre $x$ est pair, il l'envoie à l'esclave n°1 qui retourne $x/2$
  • si le nombre $x$ est impair, il l'envoie à l'esclave n°2 qui retourne $3x+1$
  • à chaque étape, le maître affiche la nouvelle valeur de $x$

Le maître comptera le nombre d'appels à l'esclave n°1 et l'esclave n°2.

Lorsque le maître recoît la valeur 1, il s'arrête et envoie un code d'arrêt aux esclaves (un nombre négatif par exemple). Puis il affiche le nombre d'appels à chaque esclave.

Exercice 5.2

Objectif : Écrire un programme MPI pour rechercher un élément dans un tableau.

Description :

  • créer 5 processus
  • générer un tableau d'un million d'entiers aléatoires sur le processus maître (processus de rang 0)
  • répartir le tableau entre les différents processus esclaves en utilisant la fonction MPI_Scatter
  • le maître génère un nombre aléatoire $x$ et demande à chaque esclave de rechercher le nombre d'occurrences de $x$ dans son tableau et de retourner le résultat
  • afficher sur le maître, le nombre d'occurrences par esclave