Parallélisme : travaux dirigés et pratiques

Table des matières

5. MPI

5.1. Introduction

MPI (The Message Passing Interface), conçue en 1993-94, est une norme (ou API - Application Programming Interface) définissant une bibliothèque de fonctions, utilisable avec les langages C, C++ et Fortran. Elle permet d'exploiter des ordinateurs distants ou multiprocesseurs par passage de messages (Wikipedia).

La technique de passage de message consiste à transmettre au travers du réseau les données à échanger. Initialement MPI a été conçu pour des systèmes à mémoire distribuée très populaires dans les années 80-90. MPI fut ensuite adapté pour les systèmes à mémoire partagée et fonctionne à présent pour des systèmes hybrides (distribué + partagé).

L'ensemble des fonctions peut être trouvé à cette adresse OpenMPI Doc.

MPI étant une API, il existe plusieurs implantations come MPICH ou OpenMPI (voir ce site pour un aperçu de l'architecture MPI).

5.2. Installation

Sous Ubuntu 24.04, il n'est plus possible d'utiliser la partie C++ de MPI. Deux possibilités s'offrent à vous :

5.2.1. Installation OpenMPI depuis le code source

Récupérez openmpi-4.1.8.tar.gz sur le site d'OpenMPI. Préférer la version 4.1 à la dernière version (5.X) car les fichiers sont moins volumineux.

Décompressez, compilez le code source et installez les librairies, il s'agit d'une installation locale dans votre home directory :

> tar -xvzf openmpi-4.1.8.tar.gz
> cd openmpi-4.1.8/

# pour installation gobale (tous les utilisateurs)
> ./configure prefix=/usr/local/openmpi-4.1.8 --disable-debug-symbols --enable-mpi-cxx
> make -j$(nproc)
> sudo make install

# ou alors en local (pour l'utilisateur courant)
> ./configure prefix=\$HOME/openmpi-4.1.8 --disable-debug-symbols --enable-mpi-cxx
> make -j$(nproc)
> make install

Modifiez ensuite votre fichier .bashrc :

# pour installation globale
export PATH=/usr/local/openmpi-4.1.8/bin:\$PATH
export LD_LIBRARY_PATH=/usr/local/openmpi-4.1.8/lib:\$LD_LIBRARY_PATH
	
# ou alors pour installation locale
export PATH=\$HOME/openmpi-4.1.8/bin:\$PATH
export LD_LIBRARY_PATH=\$HOME/openmpi-4.1.8/lib:\$LD_LIBRARY_PATH

et faire un source du .bashrc :

source .bashrc

# vérifier que mpi fonctionne :
> mpirun --version
mpirun (Open MPI) 4.1.8

Report bugs to http://www.open-mpi.org/community/help/

5.2.2. Utilisation de Docker

Commencez par installer Docker s'il n'est pas déjà installé :

> sudo apt-get update
# ne pas oublier docker-buildx pour les versions récentes de docker
> sudo apt-get install docker.io docker-buildx
> sudo usermod -aG docker $USER
> sudo systemctl start docker

Créez ensuite un répertoire dans lequel vous placez un fichier Dockerfile :

> mkdir mpi
> cd mpi
> nano Dockerfile

Le fichier Dockerfile possède le contenu suivant. Il crée un utilisateur user et install OpenMPI ainsi que deux éditeurs en mode texte : nano et fte (commande sfte).

# Utiliser Ubuntu 20.04 comme image de base
FROM ubuntu:20.04

# Empêcher les invites interactives lors de l'installation des paquets
ARG DEBIAN_FRONTEND=noninteractive

# Mettre à jour le système et installer les dépendances nécessaires
# Utilisation de fte pour avoir l'éditeur de texte sfte plus sympathique
# que nano
RUN apt-get update && apt-get install -y \
    build-essential \
    openmpi-bin \
    openmpi-common \
    libopenmpi-dev \
    sudo \
    nano fte fte-console fte-terminal \
    && rm -rf /var/lib/apt/lists/*

# Créer un utilisateur non-root 'user'
RUN useradd -m -s /bin/bash user \
    && echo "user ALL=(ALL) NOPASSWD:ALL" >> /etc/sudoers

# Passer à l'utilisateur 'user'
USER user

# Créer un répertoire de travail pour les programmes
WORKDIR /home/user

# Configurer les variables d'environnement MPI
ENV PATH="/usr/lib64/openmpi/bin:\$PATH" \
    LD_LIBRARY_PATH="/usr/lib64/openmpi/lib:\$LD_LIBRARY_PATH"

# L'utilisateur 'user' peut maintenant compiler et exécuter des programmes MPI
# Vous pouvez ajouter vos fichiers ou configurations ici, par exemple :
# COPY . /home/user/

# Commande par défaut (exemple pour démarrer un shell interactif)
CMD ["/bin/bash"]

Compilez ensuite le Dockerfile afin de créer le container MPI :

> sudo docker build -t mpi_cpp .
> sudo docker run -it mpi_cpp

Ou bien si on veut mapper (mettre en correspondance) le répertoire /home/richer/exchange de l'utilisateur richer sur la machine hôte vers le répertoire /home/user/exchange du container :

> docker run -it --mount type=bind,source=/home/richer/exchange,target=/home/user/exchange mpi_cpp

5.3. Utiliser MPI

Il est nécessaire de modifier substantiellement son programme si on désire le paralléliser avec MPI car on exécute le même programme sur plusieurs machines/coeurs différents.

MPI est disponible pour Fortran, C et C++ et également avec Python (pyMPI).

Pour pouvoir disposer de MPI sur sa machine il faut installer les librairies suivantes sous Ubuntu :

> sudo apt-get install libopenmpi-dev openmpi-bin openmpi-doc

Pour C et C++ on inclura le fichier mpi.h

5.3.1. Débogage

Pour débuguer il est recommandé d'installer MPE (Multi-Processing Environment), difficile à trouver, qui doit être compilé lors de l'installation de Python Anaconda. On réalisera alors l'édition de liens avec -llmpe -lmpe

On peut également utiliser la commande suivante (ici dans le cas de deux programmes avec gdb):

> mpirun -n 2 xterm -e gdb ./a.exe

Deux terminaux sont alors ouverts et il faut lancer l'exécution du programme dans le débogueur avec run.

5.4. Communicateur

Les opérations MPI portent sur des communicateurs qui sont en quelque sorte des ports de communication inter processus. Le communicateur par défaut est COMM_WORLD qui comprend tous les processus actifs.

On peut créer de nouveaux communicateurs mais cela se révèle complexe.

Le modèle de communication est un modèle point à point basé sur deux opérations élémentaires send et receive qui sont ensuite déclinés de plusieurs manières différentes.

Le schéma classique d'utilisation de MPI consiste à :

  • initialiser les ressources MPI_Init
  • obtenir le nombre de processus lancés MPI_Comm_size
  • obtenir l'identifiant du processus courant MPI_Comm_rank
  • exécuter le code séquentiel
  • exécuter le code parallèle
  • libérer les ressources MPI_Finalize

Voici deux exemples simples qui affichent un message avec un temps de latence (instruction sleep) pour chaque thread / processeur. Le premier utilise des fonctions C, le second utilise des espaces de noms et des objets

  1. // ==================================================================
  2. // Author: Jean-Michel Richer
  3. // Email: jean-michel.richer@univ-angers.fr
  4. // Date: Aug 2016
  5. // Purpose: Demonstrate basic functionnalities of MPI with C
  6. // ==================================================================
  7. #include <iostream>
  8. #include <cstdlib>
  9. #include <unistd.h> // for sleep
  10. using namespace std;
  11. #include <mpi.h>
  12.  
  13. // ==============================================================
  14. // C version
  15. // ==============================================================
  16.  
  17. int main(int argc, char ** argv) {
  18.     // maximum number of CPUs
  19.     int max_cpus;
  20.     // cpu identifier (called rank)
  21.     int cpu_rank;
  22.     // C-string to store the name of the host
  23.     char cpu_name[MPI_MAX_PROCESSOR_NAME];
  24.     // length of the C-string
  25.     int length;
  26.  
  27.  
  28.     // initialization also using command line parameters
  29.     MPI_Init(&argc, &argv);
  30.    
  31.     // get number of programs running
  32.     MPI_Comm_size(MPI_COMM_WORLD, &max_cpus);
  33.  
  34.     // get program identifier
  35.     MPI_Comm_rank(MPI_COMM_WORLD, &cpu_rank);
  36.  
  37.     // get cpu name
  38.     MPI_Get_processor_name(cpu_name, &length);
  39.  
  40.     sleep(cpu_rank);
  41.    
  42.     cerr << "running on " << cpu_name << " with id=" << cpu_rank << "/";
  43.     cerr << max_cpus << endl;
  44.    
  45.     // free resources, don't forget !
  46.     MPI_Finalize();
  47.  
  48.     exit(EXIT_SUCCESS);
  49. }
  50.  
  1. // ==================================================================
  2. // Author: Jean-Michel Richer
  3. // Email: jean-michel.richer@univ-angers.fr
  4. // Date: Aug 2016
  5. // Purpose: Demonstrate basic functionnalities of MPI with C++
  6. // ==================================================================
  7. #include <iostream>
  8. #include <cstdlib>
  9. #include <unistd.h> // for sleep
  10. using namespace std;
  11. #include <mpi.h>
  12.  
  13. // ==================================================================
  14. // C++ version
  15. // ==================================================================
  16.  
  17. int main(int argc, char ** argv) {
  18.     // maximum number of CPUs
  19.     int max_cpus;
  20.     // cpu identifier (called rank)
  21.     int cpu_rank;
  22.     // C-string to store name of the host
  23.     char cpu_name[MPI::MAX_PROCESSOR_NAME];
  24.     // length of the C-string
  25.     int length;
  26.  
  27.     // initialization also using command line parameters
  28.     MPI::Init(argc, argv);
  29.    
  30.     // get number of programs running
  31.     max_cpus = MPI::COMM_WORLD.Get_size();
  32.    
  33.     // get program identifier
  34.     cpu_rank = MPI::COMM_WORLD.Get_rank();
  35.    
  36.     // get cpu name
  37.     memset(cpu_name, 0, MPI::MAX_PROCESSOR_NAME);
  38.     MPI::Get_processor_name(cpu_name, length);
  39.  
  40.     // sleep
  41.     sleep(cpu_rank);
  42.    
  43.     cerr << "running on " << cpu_name << " with id=" << cpu_rank << "/";
  44.     cerr << max_cpus << endl;
  45.    
  46.     // free resources, don't forget !
  47.     MPI::Finalize();
  48.  
  49.     exit(EXIT_SUCCESS);
  50. }
  51.  
  52.  

5.4.1. Compilation

Afin de compiler un programme MPI, on utilise le compilateur mpicc pour le C, mpic++ pour le C++ :

# pour le code source C
> mpicc -o exe  src.c -O3 ... 
# ou alors pour les sources en C++ :
> mpic++ -o exe  src.cpp -O3 ... 
# ou alors si cela ne fonctionne pas :
> mpicxx -o exe  src.cpp -O3 ... 

5.4.2. Execution

Pour exécuter un programme compilé avec MPI, il faut utiliser mpirun (ou mpiexec) en spécifiant le nombre de processeurs (=processus,coeurs) grâce à l'option -n ou -np suivant les systèmes :

> mpirun -n 4 ./c3_mpi_ex_1.exe

Voici le résultat à l'affichage du programme précédent si on l'exécute sur une seule machine Intel Core i3-2375M CPU @ 1.50GHz (Dual Core + HyperThreading, soit 4 threads) :

running on inspiron with id=0/4
running on inspiron with id=1/4
running on inspiron with id=2/4
running on inspiron with id=3/4

Voici un autre exemple qui lance 16 processus sur 4 noeuds (h1 à h4) en réservant 4 processus (coeurs) sur chacune des machines :

> mpiexec -hosts h1:4,h2:4,h3:4,h4:4 -n 16 ./test

Référez vous à la section 5.10 pour voir réellement comment faire.

Sur un cluster on utilisera par exemple (-pe = parallel environment):

> qsub -pe mpi 16 test_mpi.sh

5.5. Synchronisation

Pour obtenir une section critique pour l'affichage on utilise la fonction MPI_Barrier ou MPI::COMM_WORLD.Barrier() en C++ qui bloque l'appelant jusqu'à ce que tous les autres programmes aient appelé cette fonction :

  1. // ==================================================================
  2. // Author: Jean-Michel Richer
  3. // Email: jean-michel.richer@univ-angers.fr
  4. // Date: Aug 2016
  5. // Purpose: Demonstrate basic functionalities of MPI with C++
  6. // with synchronization for display
  7. // ==================================================================
  8. #include <iostream>
  9. #include <cstdlib>
  10. #include <unistd.h> // for sleep
  11. using namespace std;
  12. #include <mpi.h>
  13.  
  14. // ==================================================================
  15. // C++ version
  16. // ==================================================================
  17.  
  18. int main(int argc, char ** argv) {
  19.     // maximum number of CPUs
  20.     int max_cpus;
  21.     // cpu identifier (called rank)
  22.     int cpu_rank;
  23.     // C-string to store the name of the host
  24.     char cpu_name[MPI_MAX_PROCESSOR_NAME];
  25.     // length of the C-string
  26.     int length;
  27.  
  28.  
  29.     // initialization also using command line parameters
  30.     MPI_Init(&argc, &argv);
  31.    
  32.     // get number of programs running
  33.     max_cpus = MPI::COMM_WORLD.Get_size();
  34.    
  35.     // get program identifier
  36.     cpu_rank = MPI::COMM_WORLD.Get_rank();
  37.    
  38.     // get cpu name
  39.     memset(cpu_name, 0, MPI::MAX_PROCESSOR_NAME);
  40.     MPI::Get_processor_name(cpu_name, length);
  41.  
  42.     cout << "Hello, from " << cpu_rank << endl;
  43.    
  44.     sleep( cpu_rank + 1 );
  45.    
  46.     MPI::COMM_WORLD.Barrier();    
  47.     cout << "Bye, from " << cpu_rank << endl;
  48.    
  49.     // free resources, don't forget !
  50.     MPI::Finalize();
  51.  
  52.     exit(EXIT_SUCCESS);
  53. }
  54.  
  55.  

Dans l'exemple qui suit, les 6 processus commencent par travailler (travail remplacé ici par un sleep) en parallèle. Le premier (identifiant 0) suspend son exécution pendant 1 seconde, le second pendant 2 secondes, etc. Au final, après 6 secondes, l'ensemble des processus exécutent le code qui suit l'appel à MPI::COMM_WORLD.Barrier();.

> mpirun -n 6 ./mpi_cpp_syncrho.exe
Hello, from 3
Hello, from 4
Hello, from 5
Hello, from 2
Hello, from 0
Hello, from 1
.... wait 6 seconds here until you finally get ....
Bye, from 0
Bye, from 5
Bye, from 3
Bye, from 4
Bye, from 1
Bye, from 2

5.6. Echange de donneés Send, Recv

Pour transmettre des données entre les processeurs, on utilise deux fonctions MPI_Send et MPI_Recv, ou éventuellement MPI_Sendrecv :

// envoi d'un processus vers un autre processus
int MPI_Send(void* data,
    int count,
    MPI_Datatype datatype,
    int destination,
    int tag,
    MPI_Comm communicator)

// réception d'un processus vers un autre processus
int MPI_Recv(void* data,
    int count,
    MPI_Datatype datatype,
    int source,
    int tag,
    MPI_Comm communicator,
    MPI_Status* status)

// envoi suivi d'une réception    
int MPI_Sendrecv(const void *sendbuf, 
    int sendcount, MPI_Datatype sendtype,
    int dest, int sendtag,
    void *recvbuf, int recvcount, MPI_Datatype recvtype,
    int source, int recvtag,
    MPI_Comm comm, MPI_Status *status)    
data
pointeur sur la donnée à envoyer ou recevoir
count
nombre d'occurrences
datatype
type de donnée (MPI_CHAR, MPI_INT, MPI_FLOAT, MPI::INT, MPI::FLOAT, ...)
source/destination
identifiant du processus qui reçoit ou envoie les données
communicator
canal de comunication, ex: COMM_WORLD
tag
identifiant de message compris entre 0 et 32767
status
le status lors de la réception des données

Pour la partie C++ on utilisera :

MPI::COMM_WORLD.Send(const void* buf, 
	int count, 
	MPI::Datatype& datatype,
	int dest, 
	int tag) const
	
void MPI::COMM_WORLD.Recv(void* buf, 
	int count, 
	MPI::Datatype& datatype,
	int source, 
	int tag, 
	MPI::Status* status) const
	
void MPI::COMM_WORLD::Sendrecv(const void* sendbuf, 
	int count,
	MPI::Datatype& datatype, 
	int dest, int sendtag,
	void* recvbuf, int recvcount,
	MPI::Datatype recvtype, int source, int recvtag,
	MPI::Status* status) const

Voici quelques exemples :

Dans le premier exemple, on utilise deux processeurs. Le maître (identifiant 0) et l'esclave (identifiant 1) :

 Maître   Esclave 
 Création d'un tableau et initialisation    
 Envoi de la taille →    
    Réception de la taille, création du tableau 
 Envoi du tableau →    
    Réception des données dans le tableau 
    Calcul de la somme des éléments 
    ← Envoi de la somme 
 Réception de la somme    
 Affichage de la somme    
Fonctionnement Exemple Send et Receive
  1. // ==================================================================
  2. // Author: Jean-Michel Richer
  3. // Email: jean-michel.richer@univ-angers.fr
  4. // Date: Aug 2016
  5. // Purpose: Demonstrate basic send functionality, send array of float
  6. // from master (rank=0) to slave (rank=1)
  7. // ==================================================================
  8. #include <unistd.h>
  9. #include <iostream>
  10. #include <cstdlib>
  11. #include <unistd.h> // for sleep
  12. using namespace std;
  13. #include <mpi.h>
  14.  
  15. // ==============================================================
  16. // C++ version
  17. // ==============================================================
  18.  
  19. int main(int argc, char ** argv) {
  20.     // maximum number of CPUs
  21.     int max_cpus;
  22.     // cpu identifier (called rank)
  23.     int cpu_rank;
  24.     // C-string to store name of the host
  25.     char cpu_name[MPI::MAX_PROCESSOR_NAME];
  26.     // length of the C-string
  27.     int length;
  28.  
  29.     // initialization also using command line parameters
  30.     MPI::Init(argc, argv);
  31.  
  32.     // get number of programs running
  33.     max_cpus = MPI::COMM_WORLD.Get_size();
  34.    
  35.     // get program identifier
  36.     cpu_rank = MPI::COMM_WORLD.Get_rank();
  37.    
  38.     // get cpu name
  39.     memset(cpu_name, 0, MPI::MAX_PROCESSOR_NAME);
  40.     MPI::Get_processor_name(cpu_name, length);
  41.  
  42.    
  43.     cerr << cpu_rank << "/" << max_cpus << " on machine " << cpu_name << endl;
  44.    
  45.     // number of elements of the array 'data' that will be allocated
  46.     int data_size;
  47.     // array of float created by the master and sent to the slave(s)
  48.     float *data;
  49.     // identifier (rank) of remove processor (master or slave)
  50.     int remote_cpu;
  51.    
  52.     // Status for communication
  53.     MPI::Status status;
  54.  
  55.     // MASTER
  56.     if (cpu_rank == 0) {
  57.         // MASTER
  58.         // processor of id 0 (master) fills and sends the array
  59.         data_size = 100 + rand() % 100;
  60.         data = new float [data_size];
  61.         for (int i=0; i<data_size; ++i) data[i] = i+1;
  62.        
  63.         int remote_cpu = 1;
  64.        
  65.         // 1- send length
  66.         cerr << cpu_rank << " [send]  data_size=" << data_size << endl;
  67.         MPI::COMM_WORLD.Send(&data_size, 1, MPI::INT, remote_cpu, 0);
  68.         // 2- send data
  69.         MPI::COMM_WORLD.Send(&data[0], data_size, MPI::FLOAT, remote_cpu, 0);      
  70.         // wait for other processor to compute and send back sum
  71.         float result = 0;
  72.  
  73.         MPI::COMM_WORLD.Recv(&result, 1, MPI::FLOAT, remote_cpu, MPI::ANY_TAG, status);
  74.         cerr << cpu_rank << " [recv] result=" << result << endl;
  75.         cerr << "result is " << result << " for length=" << data_size;
  76.         cerr << ", expected=" << (data_size * (data_size+1))/ 2  << endl;
  77.         delete [] data;
  78.        
  79.     } else if (cpu_rank == 1) {
  80.         // SLAVE
  81.         // processor of id 1 will receive the array and compute the sum
  82.         // 1- we receive the array length and allocate space
  83.         remote_cpu = 0; // master
  84.         MPI::COMM_WORLD.Recv(&data_size, 1, MPI::INT, remote_cpu, MPI::ANY_TAG, status);
  85.         cerr << cpu_rank << " [recv] data_size=" << data_size << endl;
  86.         data = new float[data_size];
  87.         // 2- we receive the data
  88.         MPI::COMM_WORLD.Recv(&data[0], data_size, MPI::FLOAT, remote_cpu, MPI::ANY_TAG, status);
  89.         // 3- compute sum
  90.         float sum = 0;
  91.         for (int i=0; i<data_size; ++i) {
  92.             sum += data[i];
  93.         }
  94.         // 4- send back result
  95.         cerr << cpu_rank << " [send]  sum=" << sum << endl;
  96.         MPI::COMM_WORLD.Send(&sum, 1, MPI::FLOAT, remote_cpu, 0);
  97.         delete [] data;
  98.        
  99.     } else {
  100.         // other processors (if any) won't do anything
  101.     }
  102.    
  103.     MPI::Finalize();
  104.  
  105.     exit(EXIT_SUCCESS);
  106. }
  107.  

Le deuxième exemple utilise sendrecv pour échanger une donné entre maître et esclave.

Attention la fonction Sendrecv permet uniquement d'échanger deux données de même type et de même nombre d'occurrence.

On ne peut pas, par exemple, envoyer un tableau de données et attendre la somme en retour.

  1. #include <unistd.h>
  2. #include <iostream>
  3. #include <cstdlib>
  4. #include <unistd.h> // for sleep
  5. #include <sstream>
  6. #include <numeric>
  7. using namespace std;
  8. #include <mpi.h>
  9.  
  10.  
  11. // ==============================================================
  12. // version C++
  13. // ==============================================================
  14.  
  15. int main(int argc, char ** argv) {
  16.     // maximum number of CPUs
  17.     int max_cpus;
  18.     // cpu identifier (called rank)
  19.     int cpu_rank;
  20.     // C-string to store name of the host
  21.     char cpu_name[MPI::MAX_PROCESSOR_NAME];
  22.     // length of the C-string
  23.     int length;
  24.  
  25.     // initialization also using command line parameters
  26.     MPI::Init(argc, argv);
  27.  
  28.     // get number of programs running
  29.     max_cpus = MPI::COMM_WORLD.Get_size();
  30.    
  31.     // get program identifier
  32.     cpu_rank = MPI::COMM_WORLD.Get_rank();
  33.    
  34.     // get cpu name
  35.     memset(cpu_name, 0, MPI::MAX_PROCESSOR_NAME);
  36.     MPI::Get_processor_name(cpu_name, length);
  37.  
  38.     cerr << cpu_rank << "/" << max_cpus << " on machine " << cpu_name << endl;
  39.    
  40.     // data to exchange
  41.     int send_data;
  42.     int recv_data;
  43.    
  44.     // identifier (rank) of remove processor (master or slave)
  45.     int remote_cpu, source_cpu;
  46.    
  47.     // Status for communication
  48.     MPI::Status status;
  49.     int TAG_0 = 0;
  50.  
  51.     if (cpu_rank == 0) {
  52.        
  53.         remote_cpu = 1;
  54.         send_data = 11111;
  55.  
  56.     } else if (cpu_rank == 1) {
  57.        
  58.         remote_cpu = 0;
  59.         send_data = 22222;
  60.        
  61.     }
  62.  
  63.     MPI::COMM_WORLD.Sendrecv(
  64.         &send_data,   // Send buffer
  65.         1,            // Number of elements to send
  66.         MPI::INT,     // Datatype of send elements
  67.         remote_cpu,   // Destination rank
  68.         0,            // Send tag
  69.         &recv_data,   // Receive buffer
  70.         1,            // Number of elements to receive
  71.         MPI::INT,     // Datatype of receive elements
  72.         remote_cpu,   // Source rank
  73.         0,            // Receive tag
  74.         status        // Status object
  75.     );
  76.    
  77.    
  78.     if (cpu_rank == 0) {
  79.        
  80.         cerr << "Master: recv_data=" << recv_data << endl;
  81.        
  82.     } else if (cpu_rank == 1) {
  83.        
  84.         cerr << "Slave: recv_data=" << recv_data << endl;
  85.  
  86.     }
  87.     MPI::Finalize();
  88.  
  89.     exit(EXIT_SUCCESS);
  90. }
  91.  

Afin de simplifier l'utilisation de MPI on peut créer une interface (wrapper) : j'ai créé, pour ma part, EZMPI.

EZMPI est constitué d'une seule classe Process qui représente un processus et qui permet de récupérer lors de l'initialisation l'identifiant du processeur, le nombre de processus lancés, ... On dispose également de méthodes qui permettent d'envoyer et recevoir un élément (quel que soit le type) ou un tableau d'élements.

On dispose en outre d'un système de log qui permet de consulter le déroulement des échanges entre processus (réception et envoi de données).

  1. // ==================================================================
  2. // Author: Jean-Michel Richer
  3. // Email: jean-michel.richer@univ-angers.fr
  4. // Date: Aug 2020
  5. // Last modified: November 2020
  6. // Purpose: interface and class to facilitate use of MPI
  7. // ==================================================================
  8. #ifndef EZ_MPI_H
  9. #define EZ_MPI_H
  10.  
  11. #include <string>
  12. #include <typeinfo>
  13. #include <sstream>
  14. #include <stdexcept>
  15. #include <cstdint>
  16. using namespace std;
  17. #include <time.h>
  18. #include <unistd.h>
  19. #include <mpi.h>
  20.  
  21. /**
  22.  * EZ MPI is a wrapper for MPI C++. It simplifies the use
  23.  * of MPI send, receive, gather, scatter functions.
  24.  * For the gather, scatter and reduce functions the processor
  25.  * of rank 0 is considered as the "master" that collects or
  26.  * sends data.
  27.  */
  28.  
  29. namespace ez {
  30.  
  31. namespace mpi {
  32.  
  33. /*
  34.  * Process Information.
  35.  * This class also acts as a logger so it can be used to print
  36.  * information.
  37.  */
  38. class Process {
  39. protected:
  40.     // maximum number of process working together
  41.     int m_max;
  42.     // identifier of current process, called rank for MPI
  43.     int m_id;
  44.     // identifier of remote process for send, receive, ...
  45.     int m_remote;
  46.     // status of last operation
  47.     MPI::Status m_status;
  48.     // message tag if needed (default is 0)
  49.     int m_message_tag;
  50.     // name of process
  51.     string m_name;
  52.     // Linux process identifier
  53.     int m_pid;
  54.     // verbose mode
  55.     bool m_verbose_flag;
  56.     // verbose mode
  57.     bool m_log_flag;
  58.     // main output stream for current processor
  59.     ostringstream log_stream;
  60.     // temporary output stream
  61.     ostringstream tmp_log;
  62.    
  63. private:
  64.     /**
  65.      * Find processor name
  66.      */
  67.     void find_cpu_name();
  68.    
  69.     /**
  70.      * record output of oss into general output
  71.      */
  72.     void append();
  73.    
  74.     /**
  75.      * initialize max_cpus, cpu_rank, processus id
  76.      */
  77.     void init();
  78.    
  79. public:
  80.            
  81.     /**
  82.      * Default constructor
  83.      */
  84.     Process(int argc, char *argv[], bool verbose=false);
  85.    
  86.     ~Process();
  87.    
  88.     /**
  89.      * Display output of each processor if verbose mode is on
  90.      */
  91.     void finalize();
  92.    
  93.     /**
  94.      * set verbose mode
  95.      */
  96.     void verbose(bool mode) {
  97.         m_verbose_flag = mode;
  98.     }
  99.    
  100.     /**
  101.      * set log mode
  102.      */
  103.     void log(bool mode) {
  104.         m_log_flag = mode;
  105.     }
  106.    
  107.    
  108.    
  109.     /**
  110.      * Get process identifier
  111.      */
  112.     int pid();
  113.    
  114.     /**
  115.      * Get processor identifier or rank
  116.      */
  117.     int id();
  118.    
  119.     /**
  120.      * Get number of processors used
  121.      */
  122.     int max();
  123.    
  124.     /**
  125.      * Get processor name
  126.      */
  127.     string name();
  128.    
  129.    
  130.     /**
  131.      * set remote processor identifier
  132.      */
  133.     void remote(int rmt);
  134.    
  135.     /**
  136.      * set message tag
  137.      * @param tag must be an integer between 0 and 32767
  138.      */
  139.     void tag(int tag);
  140.    
  141.     /**
  142.      * Return true if this processor is the processor of rank 0
  143.      * considered as the master.
  144.      */
  145.     bool is_master() {
  146.         return (m_id == 0);
  147.     }
  148.    
  149.     /**
  150.      * synchronize
  151.      */
  152.     void synchronize();
  153.    
  154.     /**
  155.      * Determine type of data T and convert it into MPI::Datatype.
  156.      * This function needs to be extended with other types.
  157.      */
  158.     template<class T>
  159.     MPI::Datatype get_type() {
  160.         if (typeid(T) == typeid(char)) {
  161.             return MPI::CHAR;
  162.         } else if (typeid(T) == typeid(int8_t)) {  
  163.             return MPI::CHAR;
  164.         } else if (typeid(T) == typeid(uint8_t)) { 
  165.             return MPI::CHAR;  
  166.         } else if (typeid(T) == typeid(int)) {
  167.             return MPI::INT;
  168.         } else if (typeid(T) == typeid(float)) {
  169.             return MPI::FLOAT;
  170.         } else if (typeid(T) == typeid(double)) {
  171.             return MPI::DOUBLE;
  172.         }
  173.         //throw std::runtime_error("unknown MPI::Datatype");
  174.         cout << "!!!!! unknown " << typeid(T).name() << endl;
  175.         return MPI::INT;
  176.     }
  177.    
  178.     /**
  179.      * Send one instance of data to remote_cpu
  180.      * @param v data to send
  181.      */
  182.     template<class T>
  183.     void send(T& v) {
  184.         MPI::Datatype data_type = get_type<T>();
  185.        
  186.         tmp_log << "send value=" << v << " to " << m_remote << endl;
  187.         flush();
  188.        
  189.         MPI::COMM_WORLD.Send(&v, 1, data_type, m_remote, m_message_tag);
  190.     }
  191.    
  192.     /**
  193.      * Send an array to remote_cpu
  194.      * @param arr address of the array
  195.      * @param size number of elements to send
  196.      */
  197.     template<class T>
  198.     void send(T *arr, int size) {
  199.         MPI::Datatype data_type = get_type<T>();
  200.        
  201.         tmp_log << "send array of size=" << size << " to " << m_remote << endl;
  202.         flush();
  203.                
  204.         MPI::COMM_WORLD.Send(&arr[0], size, data_type, m_remote, m_message_tag);
  205.     }
  206.    
  207.     /**
  208.      * Receive one instance of data from remote cpu
  209.      * @param v data to receive
  210.      */
  211.     template<class T>
  212.     void recv(T& v) {
  213.         MPI::Datatype data_type = get_type<T>();
  214.        
  215.         MPI::COMM_WORLD.Recv(&v, 1, data_type, m_remote,
  216.             (m_message_tag == 0) ? MPI::ANY_TAG : m_message_tag,
  217.                     m_status);
  218.            
  219.         tmp_log << "receive value=" << v << " from " << m_remote << endl;
  220.         flush();
  221.            
  222.     }
  223.    
  224.     /**
  225.      * Receive an array of given size
  226.      * @param arr pointer to address of the array
  227.      * @param size number of elements
  228.      */
  229.     template<class T>
  230.     void recv(T *arr, int size) {
  231.         MPI::Datatype data_type = get_type<T>();
  232.        
  233.         MPI::COMM_WORLD.Recv(&arr[0], size, data_type, m_remote,
  234.             (m_message_tag == 0) ? MPI::ANY_TAG : m_message_tag,
  235.                     m_status);
  236.            
  237.         tmp_log << "receive array of size=" << size << " from " << m_remote << endl;
  238.         flush();
  239.            
  240.     }
  241.    
  242.     /**
  243.      * Send array and receive value in return, this is an instance
  244.      * of the Sendrecv function.
  245.      * @param arr address of the array to send
  246.      * @param size size of the array to send
  247.      * @param value value to receive
  248.      */
  249.     template<class T, class U>
  250.     void sendrecv(T *array, int size, U& value) {
  251.         MPI::Datatype array_data_type = get_type<T>();
  252.         MPI::Datatype value_data_type = get_type<U>();
  253.        
  254.         tmp_log << "sendrecv/send array of size=" << size << endl;
  255.         flush();
  256.                
  257.         MPI::COMM_WORLD.Sendrecv(&array[0], size, array_data_type, m_remote, 0,
  258.             &value, 1, value_data_type, MPI::ANY_SOURCE, MPI::ANY_TAG,
  259.             m_status);
  260.            
  261.         tmp_log << "sendrecv/receive value=" << value << endl;
  262.         flush();
  263.            
  264.     }
  265.    
  266.     /**
  267.      * Perform reduction
  268.      * @param lcl_value local array used to perform reduction
  269.      * @param glb_value global data that will contain result
  270.      * @param op operation to perform (MPI::SUM, MPI::MAX, ...)
  271.      */
  272.     template<class T>
  273.     void reduce(T &lcl_value, T &glb_value, const MPI::Op& op) {
  274.         MPI::Datatype data_type = get_type<T>();
  275.        
  276.         MPI::COMM_WORLD.Reduce(&lcl_value,
  277.             &glb_value, 1, data_type, op, 0);
  278.            
  279.         tmp_log << "reduction gives value=" << glb_value << endl;
  280.         flush();
  281.  
  282.     }
  283.    
  284.     /**
  285.      * Perform gather operation
  286.      * @param lcl_array local array that is send to master process
  287.      * @param glb_array global array that will contain all local arrays
  288.      */
  289.     template<class T>
  290.     void gather(T *lcl_array, int size, T *glb_array) {
  291.         MPI::Datatype data_type = get_type<T>();
  292.        
  293.         MPI::COMM_WORLD.Gather(lcl_array, size, data_type,
  294.             glb_array, size, data_type, 0);
  295.            
  296.         tmp_log << "gather" << endl;
  297.         flush();
  298.  
  299.     }
  300.    
  301.     /**
  302.      * Perform scatter operation
  303.      * @param glb_array array of data that will be send by to all processors
  304.      * @param size size of the local array of data
  305.      * @param lcl_array local array of data
  306.      */
  307.     template<class T>
  308.     void scatter(T *glb_array, int size, T *lcl_array) {
  309.         MPI::Datatype data_type = get_type<T>();
  310.        
  311.         MPI::COMM_WORLD.Scatter(glb_array, size, data_type,
  312.             lcl_array, size, data_type, 0);
  313.            
  314.         tmp_log << "scatter" << endl;
  315.         flush();
  316.  
  317.     }
  318.    
  319.     typedef std::ostream& (*ManipFn)(std::ostream&);
  320.     typedef std::ios_base& (*FlagsFn)(std::ios_base&);
  321.    
  322.     void print(char v);
  323.     void print(int v);
  324.     void print(string s);
  325.     void print(float f);
  326.     void print(double d);
  327.    
  328.     template<class T> // int, double, strings, etc
  329.     Process& operator<<(const T& output) {
  330.         tmp_log << output;
  331.         return *this;
  332.     }
  333.  
  334.     // endl, flush, setw, setfill, etc.
  335.     Process& operator<<(ManipFn manip) {
  336.         manip(tmp_log);
  337.         if (manip == static_cast<ManipFn>(std::flush) || manip == static_cast<ManipFn>(std::endl)) {
  338.           this->flush();
  339.         }
  340.         return *this;
  341.     }
  342.  
  343.     // setiosflags, resetiosflags
  344.     Process& operator<<(FlagsFn manip) {
  345.         manip(tmp_log);
  346.         return *this;
  347.     }
  348.  
  349.     void flush();
  350.  
  351.     void logs(ostream& out);
  352.    
  353.     typedef void (*Code)(Process& p);
  354.    
  355.     void run(Code code) {
  356.         code(*this);
  357.     }
  358. };
  359.  
  360.  
  361.  
  362. } // end of namespace mpi
  363.  
  364. } // end of namespace ez
  365. #endif
  366.  
  1. // ==================================================================
  2. // Author: Jean-Michel Richer
  3. // Email: jean-michel.richer@univ-angers.fr
  4. // Date: Aug 2020
  5. // Last modified: November 2020
  6. // Purpose: interface and class to facilitate use of MPI
  7. // ==================================================================
  8.  
  9. #include "ezmpi.h"
  10.  
  11. using namespace ez::mpi;
  12.  
  13. void Process::find_cpu_name() {
  14.     char name[MPI::MAX_PROCESSOR_NAME];
  15.     int length;
  16.    
  17.     memset(name, 0, MPI::MAX_PROCESSOR_NAME);
  18.     MPI::Get_processor_name(name,length);
  19.     m_name = name;
  20. }
  21.  
  22. void Process::init() {
  23.     m_max = MPI::COMM_WORLD.Get_size();
  24.     m_id = MPI::COMM_WORLD.Get_rank();
  25.     find_cpu_name();
  26.     m_pid = getpid();
  27.     if (m_verbose_flag) {
  28.         tmp_log << "pid=" << m_pid << ", id=" << m_id << endl;
  29.         flush();
  30.     }
  31.  
  32. }
  33.  
  34. Process::Process(int argc, char *argv[], bool verbose) {
  35.     m_remote = 0;
  36.     m_message_tag = 0;
  37.     m_verbose_flag = verbose;
  38.     m_log_flag = false;
  39.     MPI::Init(argc, argv);
  40.     init();
  41. }
  42.  
  43. void Process::logs(ostream& out) {
  44.     m_verbose_flag = m_log_flag = false;
  45.     if (m_id == 0) {
  46.         out.flush();
  47.         out << std::endl;
  48.         out << "====================" << std::endl;
  49.         out << "=== FINAL RESULT ===" << std::endl;
  50.         out << "====================" << std::endl;
  51.         out << "---------------------" << std::endl;
  52.         out << "CPU " << m_id << std::endl;
  53.         out << "---------------------" << std::endl;
  54.         out << log_stream.str();
  55.         out.flush();
  56.         remote( 1 );
  57.         int token = -255;
  58.         send( token );
  59.     } else {
  60.         remote( m_id - 1 );
  61.         int token;
  62.         recv(token);           
  63.         out << "---------------------" << std::endl;
  64.         out << "CPU " << m_id << std::endl;
  65.         out << "---------------------" << std::endl;
  66.         out << log_stream.str();
  67.         out.flush();
  68.         if (m_id < m_max - 1) {
  69.             remote( m_id + 1 );
  70.             token = -255;
  71.             send(token);
  72.         }
  73.        
  74.     }
  75.    
  76.    
  77. }
  78.    
  79. void Process::finalize() {
  80.     MPI::COMM_WORLD.Barrier();
  81.     MPI::Finalize();
  82. }
  83.  
  84. Process::~Process() {
  85.     finalize();
  86. }
  87.  
  88. int Process::pid() {
  89.     return m_pid;
  90. }
  91.  
  92. int Process::id() {
  93.     return m_id;
  94. }
  95.  
  96. int Process::max() {
  97.     return m_max;
  98. }
  99.    
  100. string Process::name() {
  101.     return m_name;
  102. }
  103.    
  104. void Process::tag(int tag) {
  105.     m_message_tag = tag;
  106. }
  107.  
  108. void Process::remote(int rmt) {
  109.     m_remote = rmt;
  110. }
  111.  
  112. void Process::synchronize() {
  113.     MPI::COMM_WORLD.Barrier();
  114. }
  115.  
  116. // --------------------------
  117. // output
  118. // --------------------------
  119.  
  120. const std::string currentDateTime() {
  121.     time_t     now = time(0);
  122.     struct tm  tstruct;
  123.     char       buf[80];
  124.     tstruct = *localtime(&now);
  125.     //strftime(buf, sizeof(buf), "%Y-%m-%d.%X [%s]", &tstruct);
  126.     strftime(buf, sizeof(buf), "%X", &tstruct);
  127.    
  128.     return buf;
  129. }
  130.  
  131. void Process::flush() {
  132.     string str = currentDateTime();
  133.     if (m_log_flag) {
  134.         log_stream << str << " cpu " << m_id << "/" << m_max << ": " << tmp_log.str();
  135.     }
  136.     if (m_verbose_flag) {
  137.         cerr << str << " cpu " << m_id << "/" << m_max << ": " << tmp_log.str();
  138.     }
  139.     tmp_log.str("");
  140. }
  141.  
  142. void Process::print(char v) {
  143.     tmp_log << v;
  144. }
  145.  
  146. void Process::print(int v) {
  147.     tmp_log << v;
  148. }
  149.  
  150. void Process::print(string v) {
  151.     tmp_log << v;
  152. }
  153.  
  154. void Process::print(float v) {
  155.     tmp_log << v;
  156. }
  157.  
  158. void Process::print(double v) {
  159.     tmp_log << v;
  160. }
  161.    
  162.  
  163.  

On peut alors réécrire les programes précédents de manière plus simple :

  1. // ==================================================================
  2. // Author: Jean-Michel Richer
  3. // Email: jean-michel.richer@univ-angers.fr
  4. // Date: Aug 2020
  5. // Last modified: November 2020
  6. // Purpose: Demonstrate basic send functionality, send array of float
  7. // from master (rank=0) to slave (rank=1)
  8. // ==================================================================
  9. #include <unistd.h>
  10. #include <iostream>
  11. #include <cstdlib>
  12. #include <unistd.h> // for sleep
  13. using namespace std;
  14. #include "ezmpi.h"
  15. using namespace ez::mpi;
  16.  
  17. /**
  18.  * run master and slaves
  19.  */
  20. void run(int argc, char *argv[]) {
  21.  
  22.     // data to send
  23.     float *data;
  24.     int data_size;
  25.  
  26.     // Create Process from ezmpi
  27.     // that retrives cpu rank (id), # cpus (max), cpu name (name)
  28.     Process p(argc, argv);
  29.      
  30.     // the master is the process with id() == 0
  31.     if (p.is_master()) {
  32.         // Code of master
  33.        
  34.         // set remote cpu identifier that will communicate with master
  35.         p.remote(1);
  36.        
  37.         // processor of id 0 (master) fills and sends the array
  38.         data_size = 100 + rand() % 100;
  39.         data = new float [data_size];
  40.         for (int i=0; i<data_size; ++i) data[i] = i+1;
  41.        
  42.         // 1- send size of array to remote
  43.         p.send(data_size);
  44.         // 2- send data of array to remote
  45.         p.send(data, data_size);       
  46.        
  47.         // wait for other processor to compute and send back sum
  48.         float result = 0;
  49.         p.recv(result);
  50.        
  51.         p << "result is " << result << " for length=" << data_size;
  52.         p << ", expected=" << (data_size * (data_size+1))/ 2  << endl;
  53.        
  54.         delete [] data;    
  55.        
  56.     } else if (p.id() == 1) {
  57.         // Code for slave
  58.        
  59.         // tells to Process to send data to master processor
  60.         p.remote(0);
  61.        
  62.         // process of id 1 will receive the array and compute the sum
  63.         // 1- we receive the array length and allocate space
  64.         p.recv(data_size);
  65.    
  66.         data = new float[data_size];
  67.        
  68.         // 2- we receive the data
  69.         p.recv(data, data_size);
  70.        
  71.         // 3- compute sum
  72.         float sum = 0;
  73.         for (int i=0; i<data_size; ++i) {
  74.             sum += data[i];
  75.         }
  76.        
  77.         // 4- send back result
  78.         p.send(sum);
  79.        
  80.         delete [] data;
  81.        
  82.     } else {
  83.         // other process (if any) won't do anything
  84.         p << "is idle" << endl;
  85.     }
  86.    
  87.     sleep(1);
  88.    
  89.     p.logs(cout);
  90.    
  91.     // desctuctor of Process p will call the finalize method
  92.     // of MPI
  93. }  
  94.  
  95. /**
  96.  * main function
  97.  *
  98.  */
  99. int main(int argc, char ** argv) {
  100.     // call a function that contains code for master and slave
  101.     // in order to avoid problems of initialization and finalization
  102.     run(argc, argv);
  103.    
  104.     exit(EXIT_SUCCESS);
  105. }
  106.  
  1. // ==================================================================
  2. // Author: Jean-Michel Richer
  3. // Email: jean-michel.richer@univ-angers.fr
  4. // Date: Aug 2020
  5. // Last modified: November 2020
  6. // Purpose: Demonstrate basic send functionality, send array of float
  7. // from master (rank=0) to slave (rank=1)
  8. // ==================================================================
  9. #include <unistd.h>
  10. #include <iostream>
  11. #include <cstdlib>
  12. #include <unistd.h> // for sleep
  13. #include <sstream>
  14. using namespace std;
  15. #include "ezmpi.h"
  16. using namespace ez::mpi;
  17.  
  18. /**
  19.  * run master and slaves
  20.  */
  21. void run(int argc, char *argv[]) {
  22.     // data to send
  23.     float *data;
  24.     int data_size;
  25.  
  26.     // Create Process from ezmpi
  27.     // that retrives cpu rank (id), # cpus (max), cpu name (name)
  28.     Process p(argc, argv);
  29.    
  30.     if (p.is_master()) {
  31.         // Code of master
  32.        
  33.         // set remote cpu identifier that will communicate with master
  34.         p.remote(1);
  35.        
  36.         // processor of id 0 (master) fills and sends the array
  37.         data_size = 100 + rand() % 100;
  38.         data = new float [data_size];
  39.         for (int i=0; i<data_size; ++i) data[i] = i+1;
  40.        
  41.         // 1- send size of array to remote
  42.         p.send(data_size);
  43.        
  44.        
  45.         // 2- send data of array to remote and receive result
  46.         float result = 0;
  47.         p.sendrecv(data, data_size, result);   
  48.        
  49.         p << "result is " << result << " for length=" << data_size;
  50.         p << ", expected=" << (data_size * (data_size+1))/ 2  << endl;
  51.        
  52.         delete [] data;    
  53.        
  54.     } else if (p.id() == 1) {
  55.         // Code for slave
  56.        
  57.         // tells to Process to send data to master processor
  58.         p.remote(0);
  59.        
  60.         // process of id 1 will receive the array and compute the sum
  61.         // 1- we receive the array length and allocate space
  62.         p.recv(data_size);
  63.    
  64.         data = new float[data_size];
  65.        
  66.         // 2- we receive the data
  67.         p.recv(data, data_size);
  68.        
  69.         // 3- compute sum
  70.         float sum = 0;
  71.         for (int i=0; i<data_size; ++i) {
  72.             sum += data[i];
  73.         }
  74.        
  75.         // 4- send back result
  76.         p.send(sum);
  77.        
  78.         delete [] data;
  79.        
  80.     } else {
  81.         // other process (if any) won't do anything
  82.         p << "is idle" << endl;
  83.     }
  84.    
  85.     sleep(1);
  86.    
  87.     p.logs(cout);
  88.    
  89.     // desctuctor of Process p will call the finalize method
  90.     // of MPI
  91. }
  92.  
  93. // ==============================================================
  94. // version C++
  95. // ==============================================================
  96. int main(int argc, char ** argv) {
  97.     // call a function that contains code for master and slave
  98.     // in order to avoid problems of initialization and finalization
  99.     run(argc, argv);
  100.    
  101.     exit(EXIT_SUCCESS);
  102. }
  103.  

5.7. Réduction

Certains traitements comme la réduction ou le scan sont implantés spécifiquement pour MPI.

Par exemple pour la réduction :

  1. // ==================================================================
  2. // Author: Jean-Michel Richer
  3. // Date: 20 Aug 2016
  4. // Purpose: Use of MPI::Reduce with function to integrate
  5. // ==================================================================
  6. #include <unistd.h>
  7. #include <iostream>
  8. #include <cstdlib>
  9. #include <unistd.h> // for sleep
  10. #include <sstream>
  11. using namespace std;
  12. #include "ezmpi.h"
  13. using namespace ez::mpi;
  14.  
  15. /**
  16.  * function to integrate f(x)
  17.  */
  18. double f(double x) {
  19.     return x * x;
  20. }
  21.  
  22. /**
  23.  * integrate from
  24.  * @param a lower bound
  25.  * @param b upper bound
  26.  * @param n number of steps
  27.  */
  28. double integrate(double a, double b, int n) {
  29.     double dx = (b-a) / n;
  30.    
  31.     double sum = 0;
  32.     for (int i=1; i<n; ++i) {
  33.         sum += f(a + i * dx);
  34.     }
  35.     return sum * dx;
  36. }
  37.  
  38. /**
  39.  * run master and slaves
  40.  */
  41. void run(int argc, char *argv[]) {
  42.    
  43.     Process p(argc, argv);
  44.    
  45.     // data to send
  46.     double a = 1.0;
  47.     double b = 3.0;
  48.     int steps = 10000000;
  49.    
  50.     double x_range = (b - a) / p.max();
  51.     int steps_range = steps / p.max();
  52.    
  53.     double local_a = a + p.id() * x_range;
  54.     double local_b = local_a + x_range;
  55.     double local_result = integrate(local_a, local_b, steps_range);
  56.    
  57.     p << "integrate from " << local_a << " to " << local_b;
  58.     p << " during " << steps_range << " steps" << endl;
  59.     p << "local_result=" << local_result << endl;
  60.    
  61.     double final_result = 0;
  62.     p.synchronize();
  63.    
  64.     p.reduce(local_result, final_result, MPI::SUM);
  65.    
  66.     // expected result should be (9 - 1/3) = 8.6666
  67.     if (p.is_master()) {
  68.         p << "final result=" << final_result << endl;
  69.     }
  70.    
  71.     sleep(1);
  72.     p.logs(cout);
  73.    
  74. }
  75.  
  76. /**
  77.  * main function
  78.  */
  79. int main(int argc, char ** argv) {
  80.    
  81.     // call a function that contains code for master and slave
  82.     // in order to avoid problems of initialization and finalization
  83.     run(argc, argv);
  84.    
  85.    
  86.     exit(EXIT_SUCCESS);
  87. }
  88.  

On calcule l'intégrale de la fonction $f(x) = x^2$ entre $x=1$ et $x=3$. Pour cela on utilise la méthode des rectangles. On veut 10_000_000 de rectangles entre 1 et 3. Si on utilise 4 processeurs on aura donc $10\_000\_000 / 4 = 2\_500\_000$ itérations par processeur. Au final, le résultat de la somme de chaque partie de l'intégrale est renvoyé au maître qui en fait la somme.

---------------------
CPU 0
---------------------
2016-08-24.02:37:35  cpu 0/4: pid=22172, rank=0
2016-08-24.02:37:35  cpu 0/4: integrate from 1 to 1.5 during 2500000 steps
2016-08-24.02:37:35  cpu 0/4: local_result=0.791666
2016-08-24.02:37:35  cpu 0/4: reduce gives value=8.66666
2016-08-24.02:37:35  cpu 0/4: final result=8.66666
---------------------
CPU 1
---------------------
2016-08-24.02:37:35  cpu 1/4: pid=22173, rank=1
2016-08-24.02:37:35  cpu 1/4: integrate from 1.5 to 2 during 2500000 steps
2016-08-24.02:37:35  cpu 1/4: local_result=1.54167
2016-08-24.02:37:35  cpu 1/4: reduce gives value=0
---------------------
CPU 2
---------------------
2016-08-24.02:37:35  cpu 2/4: pid=22174, rank=2
2016-08-24.02:37:35  cpu 2/4: integrate from 2 to 2.5 during 2500000 steps
2016-08-24.02:37:35  cpu 2/4: local_result=2.54167
2016-08-24.02:37:35  cpu 2/4: reduce gives value=0
---------------------
CPU 3
---------------------
2016-08-24.02:37:35  cpu 3/4: pid=22175, rank=3
2016-08-24.02:37:35  cpu 3/4: integrate from 2.5 to 3 during 2500000 steps
2016-08-24.02:37:35  cpu 3/4: local_result=3.79167
2016-08-24.02:37:35  cpu 3/4: reduce gives value=0

5.8. Gather et scatter

MPI propose deux opérations inverses :

scatter gather

Voici un exemple pour lequel $K$ processus créent des tableaux de 10 entiers qui sont envoyés au master (processus de rang 0) :

  1. // ==================================================================
  2. // Author: Jean-Michel Richer
  3. // Date: 20 Aug 2016
  4. // Purpose: Use of MPI::Scatter
  5. // The master process creates its own local data and they are send
  6. // to other process (of rank not equal to 0)
  7. // ==================================================================
  8. #include <unistd.h>
  9. #include <iostream>
  10. #include <cstdlib>
  11. #include <unistd.h> // for sleep
  12. using namespace std;
  13. #include "ezmpi.h"
  14. using namespace ez::mpi;
  15.  
  16. /**
  17.  * run master and slaves
  18.  */
  19. void run(int argc, char *argv[]) {
  20.    
  21.     Process p(argc, argv);
  22.    
  23.     const int local_data_size = 10;
  24.     int *local_data;
  25.     int global_data_size = p.max() * local_data_size;
  26.     int *global_data = NULL;
  27.    
  28.     local_data = new int [ local_data_size ];
  29.     for (int i=0; i<local_data_size; ++i) {
  30.         local_data[i] = 0;
  31.     }
  32.        
  33.    
  34.     if (p.is_master()) {
  35.    
  36.         // master process will send data to others
  37.         global_data = new int [ global_data_size ];
  38.         for (int i=0; i<global_data_size; ++i) global_data[i] =  i+1;
  39.        
  40.         p << "global array = [";
  41.         for (int i=0; i<global_data_size; ++i) p << global_data[i] << " ";
  42.         p << "]" << endl;
  43.     }
  44.    
  45.     // Master sends data to all others
  46.     p.scatter(global_data, local_data_size, local_data);
  47.    
  48.     // Each process reports the data it has received           
  49.     p << "local array=[";
  50.     for (int i=0; i<local_data_size; ++i) {
  51.         p << local_data[i] << " ";
  52.     }
  53.     p << "]" << endl;
  54.  
  55.     sleep(1);
  56.    
  57.     p.logs(cout);
  58. }
  59.  
  60. // ==================================================================
  61. // C++ version
  62. // ==================================================================
  63. int main(int argc, char ** argv) {
  64.     // call a function that contains code for master and slave
  65.     // in order to avoid problems of initialization and finalization
  66.     run(argc, argv);
  67.    
  68.     exit(EXIT_SUCCESS);
  69. }
  70.  
> mpirun -n 4 ./ezmpi_scatter.exe
====================
=== FINAL RESULT ===
====================
---------------------
CPU 0
---------------------
14:15:05 cpu 0/4: global array = [1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 ]
14:15:05 cpu 0/4: scatter
14:15:05 cpu 0/4: local array=[1 2 3 4 5 6 7 8 9 10 ]
---------------------
CPU 1
---------------------
14:15:05 cpu 1/4: scatter
14:15:05 cpu 1/4: local array=[11 12 13 14 15 16 17 18 19 20 ]
---------------------
CPU 2
---------------------
14:15:05 cpu 2/4: scatter
14:15:05 cpu 2/4: local array=[21 22 23 24 25 26 27 28 29 30 ]
---------------------
CPU 3
---------------------
14:15:05 cpu 3/4: scatter
14:15:05 cpu 3/4: local array=[31 32 33 34 35 36 37 38 39 40 ]
  1. // ==================================================================
  2. // Author: Jean-Michel Richer
  3. // Date: Aug 2020
  4. // Last modified: November 2020
  5. // Purpose: Use of MPI::Gather
  6. // Each process creates its own local data and they are all send
  7. // to the master process (of rank 0)
  8. // ==================================================================
  9. #include <unistd.h>
  10. #include <iostream>
  11. #include <cstdlib>
  12. #include <unistd.h> // for sleep
  13. #include <sstream>
  14. using namespace std;
  15. #include "ezmpi.h"
  16. using namespace ez::mpi;
  17.  
  18. /**
  19.  * run master and slaves
  20.  */
  21. void run(int argc, char *argv[]) {
  22.  
  23.     // data to send
  24.     float *data;
  25.     int data_size;
  26.  
  27.     // Create Process from ezmpi
  28.     // that retrives cpu rank (id), # cpus (max), cpu name (name)
  29.     Process p(argc, argv);
  30.    
  31.     const int local_data_size = 10;
  32.     int *local_data;
  33.     int global_data_size = 0;
  34.     int *global_data = NULL;
  35.    
  36.     // each processor creates its local data
  37.     local_data = new int [ local_data_size ];
  38.     for (int i=0; i<local_data_size; ++i) {
  39.         local_data[i] = (p.id() * 10) + i + 1;
  40.     }
  41.        
  42.     p << "local array=[";
  43.     for (int i=0; i<local_data_size; ++i) {
  44.         p << local_data[i] << " ";
  45.     }
  46.     p << "]" << endl;
  47.    
  48.     if (p.is_master()) {
  49.         // master process will gather data from others
  50.         global_data_size = p.max() * local_data_size ;
  51.         global_data = new int [ global_data_size ];
  52.         for (int i=0; i<global_data_size; ++i) global_data[i] = 0;
  53.     }
  54.    
  55.     p.gather(local_data, local_data_size, global_data);
  56.    
  57.     if (p.is_master()) {
  58.         p << "global array = [";
  59.         for (int i=0; i<global_data_size; ++i) {
  60.             p << global_data[i] << " ";
  61.         }
  62.         p << "]" << endl;;
  63.     }
  64.    
  65.     sleep(1);
  66.    
  67.     p.logs(cout);
  68. }
  69.    
  70. // ==================================================================
  71. // C++ version
  72. // ==================================================================
  73. int main(int argc, char ** argv) {
  74.     // call a function that contains code for master and slave
  75.     // in order to avoid problems of initialization and finalization
  76.     run(argc, argv);
  77.    
  78.     exit(EXIT_SUCCESS);
  79. }
  80.  
> mpirun -n 4 ./ezmpi_gather.exe
====================
=== FINAL RESULT ===
====================
---------------------
CPU 0
---------------------
14:19:02 cpu 0/4: local array=[1 2 3 4 5 6 7 8 9 10 ]
14:19:02 cpu 0/4: gather
14:19:02 cpu 0/4: global array = [1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 ]
---------------------
CPU 1
---------------------
14:19:02 cpu 1/4: local array=[11 12 13 14 15 16 17 18 19 20 ]
14:19:02 cpu 1/4: gather
---------------------
CPU 2
---------------------
14:19:02 cpu 2/4: local array=[21 22 23 24 25 26 27 28 29 30 ]
14:19:02 cpu 2/4: gather
---------------------
CPU 3
---------------------
14:19:02 cpu 3/4: local array=[31 32 33 34 35 36 37 38 39 40 ]
14:19:02 cpu 3/4: gather

5.9. Broadcast (diffuser)

Enfin, l'opération broadcast envoie à tous les esclaves une copie d'un tableau détenu par le maître par exemple :

  1. // ==================================================================
  2. // Author: Jean-Michel Richer
  3. // Date: Aug 2020
  4. // Last modified: November 2020
  5. // Purpose: Use of MPI::BCast
  6. // The master process creates its own local data and they are send
  7. // to other process (of rank not equal to 0)
  8. // ==================================================================
  9. #include <unistd.h>
  10. #include <iostream>
  11. #include <cstdlib>
  12. #include <unistd.h> // for sleep
  13. using namespace std;
  14. #include "ezmpi.h"
  15. using namespace ez::mpi;
  16.  
  17. /**
  18.  * run master and slaves
  19.  */
  20. void run(int argc, char *argv[]) {
  21.    
  22.     Process p(argc, argv);
  23.    
  24.  
  25.     int global_data_size = 100;
  26.     int *global_data = NULL;
  27.    
  28.     global_data = new int [ global_data_size ];
  29.     for (int i=0; i<global_data_size; ++i) {
  30.         global_data[i] = 0;
  31.     }
  32.        
  33.    
  34.     if (p.is_master()) {
  35.    
  36.         // master process will send data to others
  37.         for (int i=0; i<global_data_size; ++i) global_data[i] =  i+1;
  38.        
  39.         p << "global array = [";
  40.         for (int i=0; i<global_data_size; ++i) p << global_data[i] << " ";
  41.         p << "]" << endl;
  42.     }
  43.    
  44.     // Master sends data to all others
  45.     //p.synchronize();
  46.     p.broadcast(global_data, global_data_size, 0);
  47.    
  48.    
  49.     // Each process reports the data it has received           
  50.     p << "local copy array=[";
  51.     for (int i=0; i<global_data_size; ++i) {
  52.         p << global_data[i] << " ";
  53.     }
  54.     p << "]" << endl;
  55.  
  56.     sleep(1);
  57.    
  58.     p.logs(cout);
  59. }
  60.  
  61. // ==================================================================
  62. // C++ version
  63. // ==================================================================
  64. int main(int argc, char ** argv) {
  65.     // call a function that contains code for master and slave
  66.     // in order to avoid problems of initialization and finalization
  67.     run(argc, argv);
  68.    
  69.     exit(EXIT_SUCCESS);
  70. }
  71.  
> mpirun -n 4 ./ezmpi_broadcast.exe
====================
=== FINAL RESULT ===
====================
---------------------
CPU 0
---------------------
17:08:27 cpu 0/4: global array on master = [1 2 3 4 5 6 7 8 9 10 ]
17:08:27 cpu 0/4: broadcast
17:08:27 cpu 0/4: global array (after broadcast) =[1 2 3 4 5 6 7 8 9 10 ]
---------------------
CPU 1
---------------------
17:08:27 cpu 1/4: global array on slave = [0 0 0 0 0 0 0 0 0 0 ]
17:08:27 cpu 1/4: broadcast
17:08:27 cpu 1/4: global array (after broadcast) =[1 2 3 4 5 6 7 8 9 10 ]
---------------------
CPU 2
---------------------
17:08:27 cpu 2/4: global array on slave = [0 0 0 0 0 0 0 0 0 0 ]
17:08:27 cpu 2/4: broadcast
17:08:27 cpu 2/4: global array (after broadcast) =[1 2 3 4 5 6 7 8 9 10 ]
---------------------
CPU 3
---------------------
17:08:27 cpu 3/4: global array on slave = [0 0 0 0 0 0 0 0 0 0 ]
17:08:27 cpu 3/4: broadcast
17:08:27 cpu 3/4: global array (after broadcast) =[1 2 3 4 5 6 7 8 9 10 ]

5.10. Execution sur plusieurs machines

Exécuter un programme MPI sur plusieurs machine relève de la gageure, j'ai passé 4 heures pour pouvoir y parvenir après moult problèmes :

Pour que cela fonctionne il faut, sur toutes les machines :

  • disposer du même système d'exploitation (ou de versions proches)
  • avoir la même version d'OpenMPI (ex : 4.1.8) installée dans les mêmes répertoires (ex : /usr/local/openmpi-4.1.8)
  • disposer des mêmes compilateurs C++
  • avoir la même hierarchie de répertoires, là où se trouvent vos fichiers MPI (ex : ~/dev/cpp/parallelism)
  • compiler le même programme (ou une version proche)
  • créer une clé ssh et la partager avec toutes les machines (secondaires) qui exécuteront le code pour pouvoir se connecter en SSH automatiquement sans avoir à saisir de mot de passe

Dans l'exemple qui suit, on dispose de deux machines :

Eventuellement, modifiez /etc/hosts en ajoutant les lignes suivantes pour pouvoir faire référence aux machines par leur nom et non leur IP :

192.168.1.194 solaris
192.168.1.109 jupiter

5.10.1. Génération de la clé ssh

Sur solaris :

solaris> ssh-keygen -t ed25519 -N "" -f ~/.ssh/id_ed25519
solaris> ssh-copy-id richer@192.168.1.109

Se connecter une première fois afin d'activer la connexion ssh et la clé :

solaris> ssh richer@192.168.1.109
Welcome to Ubuntu 24.04 LTS (GNU/Linux 6.8.0-31-generic x86_64)
...
jupiter> exit

5.10.2. Compilation

Sur chaque machine compiler le programme à exécuter :

> mpic++ -o equation_mpi.exe equation_mpi.cpp -O3

On peut probablement le compiler sur la machine principale et le copier sur les machines secondaires par scp si on dispose des mêmes OS et compilateurs.

5.10.3. Fichier hosts.txt

Ce fichier décrit le nombre de slots (threads / processus) sur chaque machine, il faut le créer sur la machine principale :

solaris@~dev/cpp/parallelism> cat hosts.txt
solaris slots=12
jupiter slots=12

5.10.4. Exécution du programme

Pour exécuter le programme avec 16 processus, il faut procéder ainsi sur la machine principale :

solaris@~/dev/cpp/parallelism> time mpirun -np 16 --hostfile hosts.txt \
   --host solaris:8,jupiter:8  --prefix /usr/local/openmpi-4.1.8 --map-by ppr:8:node  \
   --mca btl tcp,self --mca btl_tcp_if_include 192.168.1.0/24 \
   --mca oob_tcp_if_include 192.168.1.0/24 \
   ~/dev/cpp/parallelism/equation_mpi.exe -n 26
2 0 0 0 0 4 0   # 1*2 + 6*4 = 26
4 0 0 0 2 2 0   # 1*4 + 5*2 + 6*2 = 26
6 0 0 0 0 1 2   # ...
8 0 0 0 0 3 0 
10 0 0 0 2 1 0 
12 0 0 0 0 0 2 
14 0 0 0 0 2 0 
16 0 0 0 2 0 0 
18 0 0 2 0 0 0 
20 0 0 0 0 1 0 
22 0 0 1 0 0 0 
23 0 1 0 0 0 0 
0 0 0 0 0 2 2 
26 0 0 0 0 0 0 
24 1 0 0 0 0 0 

real    0m6,095s
user    0m18,259s
sys     0m2,002s

5.11. Liens

5.12. Exercices

Exercice 5.1

Une suite de Syracuse est une suite d'entiers naturels définie de la manière suivante : on part d'un nombre entier strictement positif

  • s'il est pair, on le divise par 2
  • s'il est impair, on le multiplie par 3 et on ajoute 1

Cette suite possède la propriété de converger vers 1 après un certain nombre d'étapes.

Mettre en place une solution MPI avec trois instances :

  • le maître génére un nombre entier aléatoire $x$
  • si le nombre $x$ est pair, il l'envoie à l'esclave n°1 qui retourne $x/2$
  • si le nombre $x$ est impair, il l'envoie à l'esclave n°2 qui retourne $3x+1$
  • à chaque étape, le maître affiche la nouvelle valeur de $x$

Le maître comptera le nombre d'appels à l'esclave n°1 et l'esclave n°2.

Lorsque le maître recoît la valeur 1, il s'arrête et envoie un code d'arrêt aux esclaves (un nombre négatif par exemple). Puis il affiche le nombre d'appels à chaque esclave.

Exercice 5.2

Objectif : Écrire un programme MPI pour rechercher un élément dans un tableau.

Description :

  • créer 5 processus
  • générer un tableau d'un million d'entiers aléatoires sur le processus maître (processus de rang 0)
  • répartir le tableau entre les différents processus esclaves en utilisant la fonction MPI_Scatter
  • le maître génère un nombre aléatoire $x$ et demande à chaque esclave de rechercher le nombre d'occurrences de $x$ dans son tableau et de retourner le résultat
  • afficher sur le maître, le nombre d'occurrences par esclave