Parallélisme : travaux dirigés et pratiques
3. P-Threads et C++11 Threads
3.1. POSIX Threads
Cette partie du cours s'inspire grandement du tutoriel sur les pThread (où POSIX threads) du LLNL)
Historiquement plusieurs versions spécifiques de l'implantation des threads ont été créé ce qui posa de nombreux problèmes de portabilité du code. Pour UNIX une interface standard IEEE POSIX 1003.1c a été créé en 1995.
Ce qui différencie un thread d'un processus est que le thread est beaucoup plus léger c'est d'ailleurs pour cela que l'on qualifie souvent les threads de processus légers càd qu'ils demandent moins de ressources qu'un processus lors de sa création et sa gestion. Les threads sont créé à l'intérieur d'un processus et n'utilisent que le strict nécessaire pour leur exécution. On rappelle qu'un processus prend en compte :
- des droits : un numéro de processus, l'identifiant du groupe de l'utilisateur, ...
- un répertoire de travail
- une pile d'exécution
- la gestion du tas (heap)
- des descripteurs de fichiers
- la gestion des signaux
- l'utilisation des librairies partagées
- gestion des pipes, sémaphores, de la mémoire partagée
Pour simplifier, le thread, quant à lui, nécessite une pile d'exécution et partage les ressources du processus.
Pour utiliser les threads il faut :
- compiler avec le fichier entête : #include <pthread.h>
- réaliser l'édition de liens avec -lpthread (GCC/UNIX) ou -pthread selon les systèmes
3.1.1. Création de POSIX threads
La gestion des threads est laissée à l'utilisateur : il faut créer les threads ainsi qu'une structure de données qui contient les données liées au thread.
Voici un premier exemple. On passe à chaque thread une structure de données qui contient l'identifiant local du processus donné par l'utilisateur ainsi qu'un message à afficher.
La fonction pthread_self() donne l'identifiant du thread donné par le système.
On définit une fonction CHECK qui permet de vérifier la bon déroulement de l'exécution des fonctions liée aux threads.
#include <iostream>
#include <cstdio>
#include <cstdlib>
using namespace std;
#include <pthread.h>
#include <unistd.h>
#define CHECK(x) \
{ \
int res = x; \
if (res != 0) cerr << "PTHREAD ERROR: " << res << endl; \
}
/**
* data structure used to pass argument to thread
* which contains the thread id
*/
typedef struct {
int id;
char msg[20];
} thread_data_t;
/**
* thread to execute
*/
void *thread_code(void *argument) {
thread_data_t *data = (thread_data_t *) argument;
int n_repeat = 5;
while (n_repeat) {
cout << "thread(" << pthread_self() <<") ";
cout << ", id=" << data->id;
cout << ", msg=[" << data->msg << "]";
cout << ", repeat=" << n_repeat << endl;
--n_repeat;
sleep(1);
}
}
/**
* Main function
*/
int main() {
// array of threads
pthread_t threads[10];
// array of data for threads
thread_data_t data[10];
// create threads and data
for (int i=0; i<10; ++i) {
// initialize data for thread i
data[i].id = i;
sprintf(data[i].msg, "coucou %d", 100 + rand() % 100);
// create thread i
CHECK( pthread_create( &threads[i], NULL, thread_code, (void *) &data[i]) );
}
cout << "!!! END OF PROGRAM !!!!" << endl;
pthread_exit(NULL);
return 0;
}
L'exécution du programme donne le résultat suivant qui est illisible car l'ensemble des threads partage le même flux de sortie (cout) et ils écrivent en même temps. De plus le programme se termine avant que les threads n'aient terminé leur exécution :
thread(thread(thread(140487017768704140487009376000) , id=140487034554112) ) , id=, id=23, msg=[0, msg=[, msg=[coucou 177]coucou 183, repeat=coucou 115]]5, repeat=5, repeat=5
thread(140486992590592) , id=5, msg=[coucou 135], repeat=
thread(140487000983296) , id=4, msg=[coucou 193], repeat=5
5
!!! END OF PROGRAM !!!!
thread(140487026161408) , id=1, msg=[coucou 186], repeat=5
thread(140486984197888) , id=6, msg=[coucou 186], repeat=5
...
3.1.2. Synchronisation des POSIX threads (join)
Un première amélioration peut être apportée afin que le programme ne termine pas avant que l'ensemble des threads n'aient terminé leur exécution. Il s'agit de faire un join.
#include <iostream>
#include <cstdio>
#include <cstdlib>
using namespace std;
#include <pthread.h>
#include <unistd.h>
#define CHECK(x) \
{ \
int res = x; \
if (res != 0) cerr << "PTHREAD ERROR: " << res << endl; \
}
/**
* data structure used to pass argument to thread
* which contains the thread id
*/
typedef struct {
int id;
char msg[20];
} thread_data_t;
/**
* thread to execute
*/
void *thread_code(void *argument) {
thread_data_t *data = (thread_data_t *) argument;
int n_repeat = 5;
while (n_repeat) {
cout << "thread(" << pthread_self() <<") ";
cout << ", id=" << data->id;
cout << ", msg=[" << data->msg << "]";
cout << ", repeat=" << n_repeat << endl;
--n_repeat;
sleep(1);
}
}
/**
* Main function
*/
int main() {
// array of threads
pthread_t threads[10];
// array of data for threads
thread_data_t data[10];
// set attribute to indicate that thread is joinable
pthread_attr_t attr;
CHECK( pthread_attr_init(&attr) );
CHECK( pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE) );
// create threads and data
for (int i=0; i<10; ++i) {
// create data for thread i
data[i].id = i;
sprintf(data[i].msg, "coucou %d", 100 + rand() % 100);
// create thread i
CHECK( pthread_create(&threads[i], &attr, thread_code, (void *) &data[i]) );
}
// destroy attribute: not necessary any more
pthread_attr_destroy(&attr);
// perform the join
void *status;
for (int i=0; i<10; ++i) {
CHECK( pthread_join(threads[i], &status) );
}
// this line won't be executed until all threads have terminated
// their execution
cout << "!!! END OF PROGRAM !!!!" << endl;
pthread_exit(NULL);
return 0;
}
thread(thread(thread(139750116677376139750108284672) ) 139750125070080, id=thread() , id=139750099891968) , id=230, msg=[, msg=[, msg=[coucou 183coucou 177], repeat=coucou 1155]], repeat=, repeat=55
...
!!! END OF PROGRAM !!!!
3.1.3. verrou (mutex) pour l'exclusion mutuelle
Enfin, afin d'améliorer la lisibilité on utilise un mutex, abréviation pour mutual exclusion. Un mutex est un verrou qui ne pourra être fermé (lock) que par un seul thread à la fois, puis ouvert (unlock). Le verrou permet de créer l'exclusion mutuelle d'une ressource pour les sections critiques.
#include <iostream>
#include <cstdio>
#include <cstdlib>
using namespace std;
#include <pthread.h>
#include <unistd.h>
#define CHECK(x) \
{ \
int res = x; \
if (res != 0) cerr << "PTHREAD ERROR: " << res << endl; \
}
/** !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
* mutex accessible by all threads
*/
pthread_mutex_t mutex;
/**
* data structure used to pass argument to thread
* which contains the thread id
*/
typedef struct {
int id;
char msg[20];
} thread_data_t;
/**
* thread to execute
*/
void *thread_code(void *argument) {
// convert argument
thread_data_t *data = (thread_data_t *) argument;
int n_repeat = 5;
while (n_repeat) {
// critical section
// !!!!!!!!!! LOCK !!!!!!!!!!
CHECK( pthread_mutex_lock(&mutex) );
cout << "thread(" << pthread_self() <<") ";
cout << ", id=" << data->id;
cout << ", msg=[" << data->msg << "]";
cout << ", repeat=" << n_repeat << endl;
// !!!!!!!!!! UNLOCK !!!!!!!!!!
CHECK( pthread_mutex_unlock(&mutex) );
--n_repeat;
sleep(1);
}
}
/**
* Main function
*/
int main() {
// array of threads
pthread_t threads[10];
// array of data for threads
thread_data_t data[10];
// create mutex
CHECK( pthread_mutex_init(&mutex, NULL) );
// set attribute to indicate that thread is joinable
pthread_attr_t attr;
CHECK( pthread_attr_init(&attr) );
CHECK( pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE) );
// create threads
for (int i=0; i<10; ++i) {
data[i].id = i;
sprintf(data[i].msg, "coucou %d", 100 + rand() % 100);
CHECK( pthread_create(&threads[i], &attr, thread_code, (void *) &data[i]) );
}
// destroy attribute: no necessary any more
pthread_attr_destroy(&attr);
// perform the join
void *status;
for (int i=0; i<10; ++i) {
CHECK( pthread_join(threads[i], &status) );
}
// this line won't be executed until all threads have terminated
// their execution
cout << "!!! END OF PROGRAM !!!!" << endl;
// !!!!!!!!!! destroy mutex !!!!!!!!!!
CHECK( pthread_mutex_destroy(&mutex) );
pthread_exit(NULL);
return 0;
}
thread(140293526013696) , id=1, msg=[coucou 186], repeat=5
thread(140293534406400) , id=0, msg=[coucou 183], repeat=5
thread(140293475657472) , id=7, msg=[coucou 192], repeat=5
thread(140293467264768) , id=8, msg=[coucou 149], repeat=5
thread(140293500835584) , id=4, msg=[coucou 193], repeat=5
thread(140293492442880) , id=5, msg=[coucou 135], repeat=5
thread(140293484050176) , id=6, msg=[coucou 186], repeat=5
thread(140293517620992) , id=2, msg=[coucou 177], repeat=5
thread(140293509228288) , id=3, msg=[coucou 115], repeat=5
thread(140293458872064) , id=9, msg=[coucou 121], repeat=5
thread(140293526013696) , id=1, msg=[coucou 186], repeat=4
...
!!! END OF PROGRAM !!!!
3.1.4. Exemple producteur / consommateur
Voici un exemple avec trois threads dont 2 producteurs et 1 consommateur:
- producteur 1 : génère des pneus à intervalle régulier
- producteur 2 : génère des moteurs à intervalle régulier
- consommateur 1 : produit un voiture dès lors que 4 pneus et un moteur sont disponibles
#include <iostream>
#include <cstdio>
#include <cstdlib>
using namespace std;
#include <pthread.h>
#include <unistd.h>
#define CHECK(x) \
{ \
int res = x; \
if (res != 0) cerr << "PTHREAD ERROR: " << res << endl; \
}
/**
* mutex accessible by all threads
*/
pthread_mutex_t mutex;
// Number of tires produced
int counter_tires = 0;
// Number of motors produced
int counter_motors = 0;
// stop execution of all threads
bool terminate_threads = false;
/**
* Thread that produces tires every minute (here we use
* 1 second instead of one minute).
* We simply increment the number of tires produced
*/
void *thread_inc_tires(void *argument) {
while (!terminate_threads) {
CHECK( pthread_mutex_lock(&mutex) );
++counter_tires;
cout << "create tire, #tires=" << counter_tires << endl;
CHECK( pthread_mutex_unlock(&mutex) );
sleep(1);
}
CHECK( pthread_mutex_lock(&mutex) );
cout << "terminate thread_inc_tires" << endl;
CHECK( pthread_mutex_unlock(&mutex) );
}
/**
* Thread that produces motors every 5 minutes (here we use
* 5 seconds instead of 5 minutes).
* We simply increment the number of motors produced
*/
void *thread_inc_motors(void *argument) {
while (!terminate_threads) {
CHECK( pthread_mutex_lock(&mutex) );
++counter_motors;
cout << "create motor, #motors=" << counter_motors << endl;
CHECK( pthread_mutex_unlock(&mutex) );
sleep(5);
}
CHECK( pthread_mutex_lock(&mutex) );
cout << "terminate thread_inc_motors" << endl;
CHECK( pthread_mutex_unlock(&mutex) );
}
/**
* Thread that produces cars
* We simply wait for at least 4 tires and 1 motor, then
* we decrease the number of tires, the number of motors
* and increase the number of cars.
* We stop all threads as soon as we produce 3 cars.
*/
void *thread_create_car(void *argument) {
int counter_cars = 0;
while (counter_cars < 3) {
// critical section
CHECK( pthread_mutex_lock(&mutex) );
if ((counter_tires >= 4) && (counter_motors >= 1)) {
counter_tires -= 4;
counter_motors -= 1;
++counter_cars;
cout << "create car, #cars=" << counter_cars << endl;
}
CHECK( pthread_mutex_unlock(&mutex) );
sleep(1);
}
CHECK( pthread_mutex_lock(&mutex) );
terminate_threads = true;
CHECK( pthread_mutex_unlock(&mutex) );
}
/**
* Main function
*/
int main() {
// threads for tires, motors and cars
pthread_t t_inc_tires, t_inc_motors, t_create_cars;
// create mutex
CHECK( pthread_mutex_init(&mutex, NULL) );
// set attribute to indicate that thread is joinable
pthread_attr_t attr;
CHECK( pthread_attr_init(&attr) );
CHECK( pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE) );
// create threads
CHECK( pthread_create( &t_inc_tires, &attr, thread_inc_tires, (void *) NULL) );
CHECK( pthread_create( &t_inc_motors, &attr, thread_inc_motors, (void *) NULL) );
CHECK( pthread_create( &t_create_cars, &attr, thread_create_car, (void *) NULL) );
// destroy attribute no more necessary
pthread_attr_destroy(&attr);
// make the join
void *status;
CHECK( pthread_join(t_create_cars, &status) );
CHECK( pthread_join(t_inc_tires, &status) );
CHECK( pthread_join(t_inc_motors, &status) );
// this line won't be executed until all threads have terminated
cout << "!!! END OF PROGRAM !!!!" << endl;
// destroy mutex
CHECK( pthread_mutex_destroy(&mutex) );
pthread_exit(NULL);
return 0;
}
3.1.5. variables de condition
Les condition variables sont des primitives de synchronisation qui permettent aux threads de se mettre en attente jusqu'à ce
qu'une condition particulière soit vérifiée. Elles fonctionnent de concert avec les verrous.
Typiquement le code utilisé suit le schéma suivant où cond est la variable de condition :
// code en attente de la réalisation de la condition
pthread_mutex_lock(&lock);
while (SOME-CONDITION is false) {
pthread_cond_wait(&cond, &lock);
}
do_something();
pthread_mutex_unlock(&lock);
// code qui réalise la condition
pthread_mutex_lock(&lock);
ALTER-CONDITION
// réveille les threads en attente de la réalisation de la condition
pthread_cond_signal(&cond);
// levée du verrou
pthread_mutex_unlock (&lock)
Un certain nombre de fonctions sont dédiée à la gestion de ces variables :
- pthread_cond_init(pthread_cond_t *cv,
const pthread_condattr_t *cattr); qui permet de créer une variable de condition
- pthread_cond_destroy(pthread_cond_t *cv); qui permet de détruire une variable de condition
- pthread_cond_wait(pthread_cond_t *cv, pthread_mutex_t *mutex);
qui permet de se mettre dans un état d'attente d'une variable de condition
- pthread_cond_signal(pthread_cond_t *cv);
qui permet de signaler qu'une variable de condition vient de changer d'état
- pthread_cond_timedwait(pthread_cond_t *cv, pthread_mutex_t *mutex, const struct timespec *abstime);
qui permet de se mettre dans un état d'attente d'une variable de condition pendant un temps maximum
- pthread_cond_broadcast(pthread_cond_t *cv);
qui permet de débloquer tous les threads en attente du changement d'une variable de condition
Voici à titre d'exemple le programme précédent avec utilisation des variables de condition. On utilise ici une condition qui indique si le nombre de pneus et de moteurs est suffisant pour produire une voiture.
#include <iostream>
#include <cstdio>
#include <cstdlib>
using namespace std;
#include <pthread.h>
#include <unistd.h>
#define CHECK(x) \
{ \
int res = x; \
if (res != 0) cerr << "PTHREAD ERROR: " << res << endl; \
}
/**
* mutex accessible by all threads
*/
pthread_mutex_t mutex;
/**
* !!!!!!!!!! condition variable !!!!!!!!!!
*/
pthread_cond_t condition;
int counter_tires = 0;
int counter_motors = 0;
bool terminate_threads = false;
/**
* Thread to create tires
*/
void *thread_inc_tires(void *argument) {
while (!terminate_threads) {
CHECK( pthread_mutex_lock(&mutex) );
++counter_tires;
cout << "create tire, #tires=" << counter_tires << endl;
if ((counter_tires >= 4) && (counter_motors >= 1)) {
cout << "signal to 'create_car' to inform we can create a car" << endl;
// !!!!!!!!!!
pthread_cond_signal(&condition);
}
CHECK( pthread_mutex_unlock(&mutex) );
sleep(1);
}
CHECK( pthread_mutex_lock(&mutex) );
cout << "terminate thread_inc_tires" << endl;
CHECK( pthread_mutex_unlock(&mutex) );
}
/**
* Thread to produce motors
*/
void *thread_inc_motors(void *argument) {
while (!terminate_threads) {
CHECK( pthread_mutex_lock(&mutex) );
++counter_motors;
cout << "create motor, #motors=" << counter_motors << endl;
if ((counter_tires >= 4) && (counter_motors >= 1)) {
cout << "signal to 'create_car' to inform we can create a car" << endl;
// !!!!!!!!!!
pthread_cond_signal(&condition);
}
CHECK( pthread_mutex_unlock(&mutex) );
sleep(5);
}
CHECK( pthread_mutex_lock(&mutex) );
cout << "terminate thread_inc_motors" << endl;
CHECK( pthread_mutex_unlock(&mutex) );
}
/**
* Thread to produce
*/
void *thread_create_car(void *argument) {
int counter_cars = 0;
while (counter_cars < 5) {
// critical section
CHECK( pthread_mutex_lock(&mutex) );
// !!!!!!!!!!
pthread_cond_wait(&condition, &mutex);
counter_tires -= 4;
counter_motors -= 1;
++counter_cars;
cout << "create car, #cars=" << counter_cars << endl;
CHECK( pthread_mutex_unlock(&mutex) );
}
CHECK( pthread_mutex_lock(&mutex) );
terminate_threads = true;
CHECK( pthread_mutex_unlock(&mutex) );
}
int main() {
// All threads
pthread_t t_inc_tires, t_inc_motors, t_create_cars;
// create mutex
CHECK( pthread_mutex_init(&mutex, NULL) );
pthread_cond_init(&condition, NULL);
// set attribute to indicate that thread is joinable
pthread_attr_t attr;
CHECK( pthread_attr_init(&attr) );
CHECK( pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE) );
// create threads
CHECK( pthread_create(&t_inc_tires, &attr, thread_inc_tires, (void *) NULL) );
CHECK( pthread_create(&t_inc_motors, &attr, thread_inc_motors, (void *) NULL) );
CHECK( pthread_create(&t_create_cars, &attr, thread_create_car, (void *) NULL) );
// destroy attribute no more necessary
pthread_attr_destroy(&attr);
// make the join
void *status;
CHECK( pthread_join(t_create_cars, &status) );
CHECK( pthread_join(t_inc_tires, &status) );
CHECK( pthread_join(t_inc_motors, &status) );
// this line won't be executed until all threads have terminated
cout << "!!! END OF PROGRAM !!!!" << endl;
// destroy mutex
CHECK( pthread_mutex_destroy(&mutex) );
pthread_exit(NULL);
return 0;
}
3.2. Threads C++11
Les threads introduits en C++11 sont gérés par une classe.
On pourra consulter la Thread support library
3.2.1. Création de threads C++11
La création d'un thread est plus simple qu'avec les POSIX threads : il suffit de créer une fonction avec les paramètres désirés
#include <iostream>
#include <cstdio>
#include <cstdlib>
using namespace std;
#include <unistd.h>
#include <thread>
#include <chrono>
/**
* Thread to execute
*
* PARAMETERS
* - id, thread identifier
* - msg, message to print
*/
void thread_code(int id, string msg) {
int n_repeat = 5;
while (n_repeat) {
// id given by the class thread
cout << "thread(" << this_thread::get_id() <<") ";
// id given by the user
cout << ", id=" << id;
cout << ", msg=[" << msg << "]";
cout << ", repeat=" << n_repeat << endl;
--n_repeat;
// wait one second
this_thread::sleep_for(std::chrono::seconds(1));
}
}
/**
* Main function
*/
int main() {
// array of threads
thread *threads[10];
// create threads
for (int i=0; i<10; ++i) {
char msg[20];
sprintf(msg, "coucou %d", 100 + rand() % 100);
threads[i] = new thread(thread_code, i, msg );
}
cout << "!!! END OF PROGRAM !!!!" << endl;
pthread_exit(NULL);
return 0;
}
On note le même problème qu'avec les P-threads à savoir : le programme se termine avant que les threads n'aient terminé leur exécution.
thread(thread(thread(thread(140340511086336thread() , id=140340502693632140340494300928) ) 140340519479040, id=, id=) , id=23, msg=[1, msg=[coucou 135], repeat=5coucou 135], repeat=, msg=[5coucou 193]4, repeat=, msg=[coucou 186]5
, repeat=5
140340527871744) , id=0, msg=[coucou 135], repeat=5
...
!!! END OF PROGRAM !!!!
thread(thread(140340388419328) , id=1403403800266248) , id=, msg=[9coucou 121, msg=[]coucou 121, repeat=], repeat=55
...
3.2.2. Synchronisation des threads C++11 (join)
Il suffit d'appeler la méthode join() pour chacun des threads
#include <iostream>
#include <cstdio>
#include <cstdlib>
using namespace std;
#include <unistd.h>
#include <thread>
#include <chrono>
/**
* Thread to execute
*
* PARAMETERS
* - id, thread identifier
* - msg, message to print
*/
void thread_code(int id, string msg) {
int n_repeat = 5;
while (n_repeat) {
cout << "thread(" << id <<") ";
cout << ", msg=[" << msg << "]";
cout << ", repeat=" << n_repeat << endl;
--n_repeat;
// wait one second
this_thread::sleep_for(std::chrono::seconds(1));
}
}
/**
* Main function
*/
int main() {
// array of threads
thread *threads[10];
// create threads
for (int i=0; i<10; ++i) {
char msg[20];
sprintf(msg, "coucou %d", 100 + rand() % 100);
threads[i] = new thread(thread_code, i, msg );
}
// !!!!!!!!!! perform join !!!!!!!!!!
for (int i=0; i<10; ++i) {
threads[i]->join();
}
cout << "!!! END OF PROGRAM !!!!" << endl;
pthread_exit(NULL);
return 0;
}
3.2.3. verrou (mutex) pour l'exclusion mutuelle C++11
Il suffit d'utiliser la classe mutex dotée de deux méthodes :
- lock pour verrouiller l'accès à la ressource
- unlock pour déverrouiller l'accès à la ressource
#include <iostream>
#include <cstdio>
#include <cstdlib>
using namespace std;
#include <unistd.h>
#include <thread>
#include <mutex>
#include <chrono>
// !!!!!!!!!! Mutex declaration !!!!!!!!!!
mutex my_lock;
/**
* Thread to execute
*
* PARAMETERS
* - id, thread identifier
* - msg, message to print
*/
void thread_code(int id, string msg) {
int n_repeat = 5;
while (n_repeat) {
// !!!!!!!!!! LOCK !!!!!!!!!!
my_lock.lock();
cout << "thread(" << id <<") ";
cout << ", msg=[" << msg << "]";
cout << ", repeat=" << n_repeat << endl;
// !!!!!!!!!! UNLOCK !!!!!!!!!!
my_lock.unlock();
--n_repeat;
this_thread::sleep_for(std::chrono::seconds(1));
}
}
/**
* Main function
*/
int main() {
// array of threads
thread *threads[10];
// create threads
for (int i=0; i<10; ++i) {
char msg[20];
sprintf(msg, "coucou %d", 100 + rand() % 100);
threads[i] = new thread(thread_code, i, msg );
}
// perform join
for (int i=0; i<10; ++i) {
threads[i]->join();
}
cout << "!!! END OF PROGRAM !!!!" << endl;
pthread_exit(NULL);
return 0;
}
thread(140401278883584) , id=0, msg=[coucou 193], repeat=5
thread(140401262098176) , id=2, msg=[coucou 193], repeat=5
thread(140401270490880) , id=1, msg=[coucou 135], repeat=5
thread(140401253705472) , id=3, msg=[coucou 135], repeat=5
thread(140401245312768) , id=4, msg=[coucou 135], repeat=5
thread(140401236920064) , id=5, msg=[coucou 186], repeat=5
thread(140401228527360) , id=6, msg=[coucou 192], repeat=5
thread(140401220134656) , id=7, msg=[coucou 149], repeat=5
thread(140401138726656) , id=8, msg=[coucou 121], repeat=5
thread(140401130333952) , id=9, msg=[coucou 121], repeat=5
thread(140401278883584) , id=0, msg=[coucou 193], repeat=4
thread(140401262098176) , id=2, msg=[coucou 193], repeat=4
...
!!! END OF PROGRAM !!!!
3.2.4. Valeurs atomiques
On consultera la partie atomic de la STL
// ==================================================================
// Author: Jean-Michel Richer
// Date: August 2016
// Purpose: use of atomic operation fetch_add to perform reduction
// ==================================================================
#include <iostream>
#include <atomic>
#include <thread>
#include <cstdlib>
#include <cassert>
using namespace std;
int main() {
const int SIZE = 1024;
int *tab = new int [SIZE];
for (int i= 0; i < SIZE; ++i) tab[i] = i+1;
// reduction with atomic
atomic<int> sum(0);
for (int i = 0; i < SIZE; ++i) {
sum.fetch_add(tab[i], std::memory_order_relaxed);
}
cout << "reduction=" << sum << endl;
assert(sum == (SIZE)*(SIZE+1)/2);
return EXIT_SUCCESS;
}
Pour de plus amples informations voir ce site
3.3. Affectation des threads aux coeurs physiques
Lorsque l'on utilise $K$ threads, il se peut que dans certains cas l'affectation des threads à des coeurs du processeur pose problème notamment sur les systèmes SMP.
Considérons le cas de l'Intel Xeon E5 2670 qui dispose de 10 coeurs + Hyperthreading. Dans le cas d'un système SMP composé de deux processeurs de ce type la répartition des coeurs est la suivante :
0 |
0 |
0 |
1 |
1 |
0 |
2 |
0 |
1 |
3 |
1 |
1 |
... |
|
|
20 |
0 (HT) |
0 |
21 |
1 (HT) |
0 |
... |
|
|
Identification des coeurs sur Xeon E5 2670
En d'autres termes, les threads se répartissent ainsi :
- CPU 0: 0, 2, 4, 6, 8, 10, 12, 14, 16, 18
- CPU 0 (HT) : 20, 22, 24, 26, 28, 30, 32, 34, 36, 38
- CPU 1: 1, 3, 5, 7, 9, 11, 13, 15, 17, 19
- CPU 1 (HT) : 21, 23, 25, 27, 29, 31, 33, 35, 37, 39
Si on doit utiliser 4 threads, on obtiendra des temps de calcul différents si l'affectation des threads aux coeurs physiques est réalisée de l'une des manières suivantes :
- coeurs du même processeur : 0, 2, 4, 6
- coueurs HT du même processeur : 20, 22, 24, 26
- coeurs de deux processeurs : 0, 1, 2, 3
- coeurs HT de deux processeurs : 20, 21, 22, 23
Pour réaliser l'affectation des threads on peut utiliser la commande :
taskset -c identifiants-threads monprogramme arguments
Exemple :
taskset 0,2,4,6 ./reduction.exe 131072
# ou
taskset 0-6:2 ./reduction.exe 131072
A titre d'application on considère le problème de Maximum de Parcimonie, l'exécution avec un nombre croissant de threads donne les temps suivants sur un noeud Bull Novascale R422 du cluster taurus du LERIA
temps (s) |
207 |
120 |
67 |
41 |
36 |
36 |
39 |
43 |
47 |
49 |
60 |
facteur |
- |
1.72 |
3.08 |
5.04 |
5.75 |
- |
- |
- |
- |
- |
- |
pourcentage |
- |
- 42% |
- 68% |
- 80% |
- 83% |
- 83% |
- 81% |
- 79% |
- 77% |
- 76% |
- 71% |
Temps d'exécution (s) sur Xeon E5 2670
Une étude aprofondie montre que
- jusqu'à 10 (voire 12) threads on note une diminution du temps de calcul, au delà on dégrade les performances
- sans l'HyperThreading : que l'on soit sur un seul CPU ou sur deux CPU on obtient les mêmes résultats
- 8 threads : 41 secondes avec taskset -c 0-7 ou taskset -c 0,2,4,6,8,10,12,14
- 10 threads : 36 secondes avec taskset -c 0-9 ou taskset -c 0,2,4,6,8,10,12,14,16,18
- avec l'HyperThreading : on dégrade les performances
- 8 threads : 1m08s avec taskset -c 0-3,20-23 (4C + 4HT sur le même CPU)
- 10 threads : 1m06s avec taskset -c 0-4,20-24 (5C + 5HT sur le même CPU)
3.3.1. Coeurs Performance et Efficiency
Chez Intel avec l'arrivée de processeurs dotés de deux type de coeurs, les tests sur machine deviennent problématiques.
Il est primordial de choisir les coeurs adéquats.
Pour rappel, certains processeurs sont dotés de deux types de coeurs :
- les coeurs P (Performance) qui sont les plus efficaces et qui sont parfois doté de l'Hyperthreading
- les coeurs E (Efficiency) moins efficaces que les coeurs P et qui ne comportent pas d'Hyperthreading
Pour déterminer quels identifiants sont liés aux coeurs P ou E, il faut utiliser la commande suivante :
paste <(cat /proc/cpuinfo | grep "processor") <(cat /proc/cpuinfo | grep "^core id" )
3.3.1.a Cas de l'Intel I7 12700
L'Intel i7-12700
dispose de 8 coeurs P (+ HT) et 4 coeurs E, soit un total de 20 threads.
En appliquant la commande précédente, on obtient :
processor : 0 core id : 0
processor : 1 core id : 0 coeur P
processor : 2 core id : 4
processor : 3 core id : 4 coeur P
processor : 4 core id : 8
processor : 5 core id : 8 coeur P
processor : 6 core id : 12
processor : 7 core id : 12 coeur P
processor : 8 core id : 16
processor : 9 core id : 16 coeur P
processor : 10 core id : 20
processor : 11 core id : 20 coeur P
processor : 12 core id : 24
processor : 13 core id : 24 coeur P
processor : 14 core id : 28
processor : 15 core id : 28 coeur P
processor : 16 core id : 36 coeur E
processor : 17 core id : 37 coeur E
processor : 18 core id : 38 coeur E
processor : 19 core id : 39 coeur E
- les coeurs P sont numérotés : 0, 2, 4, 6, 8, 10, 12, 14
- les coeurs P HT sont numérotés : 1, 3, 5, 7, 9, 11, 13, 15
- les coeurs E sont numérotés : 16, 17, 18, 19
3.3.1.b Cas de l'AMD Ryen 5 5600G
L'AMD Ryzen 5 5600g est doté de 6 coeurs dotés
du Simultaneous Multithreading (SMT) soit un total de 12 threads.
processor : 0 core id : 0
processor : 1 core id : 1
processor : 2 core id : 2
processor : 3 core id : 3
processor : 4 core id : 4
processor : 5 core id : 5
processor : 6 core id : 0
processor : 7 core id : 1
processor : 8 core id : 2
processor : 9 core id : 3
processor : 10 core id : 4
processor : 11 core id : 5
Ici, on trouve donc :
- les coeurs numérotés 0 à 5
- les coeurs SMT numérotés : 6 à 11