Parallélisme : travaux dirigés et pratiques

3. P-Threads et C++11 Threads

3.1. POSIX Threads

Cette partie du cours s'inspire grandement du tutoriel sur les pThread (où POSIX threads) du LLNL)

Historiquement plusieurs versions spécifiques de l'implantation des threads ont été créé ce qui posa de nombreux problèmes de portabilité du code. Pour UNIX une interface standard IEEE POSIX 1003.1c a été créé en 1995.

Ce qui différencie un thread d'un processus est que le thread est beaucoup plus léger c'est d'ailleurs pour cela que l'on qualifie souvent les threads de processus légers càd qu'ils demandent moins de ressources qu'un processus lors de sa création et sa gestion. Les threads sont créé à l'intérieur d'un processus et n'utilisent que le strict nécessaire pour leur exécution. On rappelle qu'un processus prend en compte :

Pour simplifier, le thread, quant à lui, nécessite une pile d'exécution et partage les ressources du processus.

Pour utiliser les threads il faut :

3.1.1. Création de POSIX threads

La gestion des threads est laissée à l'utilisateur : il faut créer les threads ainsi qu'une structure de données qui contient les données liées au thread.

Voici un premier exemple. On passe à chaque thread une structure de données qui contient l'identifiant local du processus donné par l'utilisateur ainsi qu'un message à afficher.

La fonction pthread_self() donne l'identifiant du thread donné par le système.

On définit une fonction CHECK qui permet de vérifier la bon déroulement de l'exécution des fonctions liée aux threads.

  1. #include <iostream>
  2. #include <cstdio>
  3. #include <cstdlib>
  4. using namespace std;
  5. #include <pthread.h>
  6. #include <unistd.h>
  7.  
  8. #define CHECK(x) \
  9.     { \
  10.         int res = x; \
  11.         if (res != 0) cerr << "PTHREAD ERROR: " << res << endl; \
  12.     }
  13.  
  14. /**
  15.  * data structure used to pass argument to thread
  16.  * which contains the thread id
  17.  */
  18. typedef struct {
  19.   int id;
  20.   char msg[20];
  21. } thread_data_t;
  22.  
  23.  
  24. /**
  25.  * thread to execute
  26.  */
  27. void *thread_code(void *argument) {
  28.     thread_data_t *data = (thread_data_t *) argument;
  29.     int n_repeat = 5;
  30.     while (n_repeat) {
  31.         cout << "thread(" << pthread_self() <<") ";
  32.         cout << ", id=" << data->id;
  33.         cout << ", msg=[" << data->msg << "]";
  34.         cout << ", repeat=" << n_repeat << endl;
  35.         --n_repeat;
  36.         sleep(1);
  37.     }
  38. }
  39.  
  40. /**
  41.  * Main function
  42.  */
  43. int main() {
  44.  
  45.     // array of threads
  46.     pthread_t threads[10];
  47.    
  48.     // array of data for threads
  49.     thread_data_t data[10];
  50.  
  51.     // create threads and data
  52.     for (int i=0; i<10; ++i) {
  53.    
  54.         // initialize data for thread i
  55.         data[i].id = i;
  56.         sprintf(data[i].msg, "coucou %d", 100 + rand() % 100);
  57.        
  58.         // create thread i
  59.         CHECK( pthread_create( &threads[i], NULL, thread_code, (void *) &data[i]) );
  60.     }
  61.  
  62.     cout << "!!! END OF PROGRAM !!!!" << endl;
  63.    
  64.     pthread_exit(NULL);
  65.  
  66.     return 0;
  67. }
  68.  

L'exécution du programme donne le résultat suivant qui est illisible car l'ensemble des threads partage le même flux de sortie (cout) et ils écrivent en même temps. De plus le programme se termine avant que les threads n'aient terminé leur exécution :


thread(thread(thread(140487017768704140487009376000) , id=140487034554112) ) , id=, id=23, msg=[0, msg=[, msg=[coucou 177]coucou 183, repeat=coucou 115]]5, repeat=5, repeat=5
thread(140486992590592) , id=5, msg=[coucou 135], repeat=

thread(140487000983296) , id=4, msg=[coucou 193], repeat=5
5
!!! END OF PROGRAM !!!!
thread(140487026161408) , id=1, msg=[coucou 186], repeat=5
thread(140486984197888) , id=6, msg=[coucou 186], repeat=5
...

3.1.2. Synchronisation des POSIX threads (join)

Un première amélioration peut être apportée afin que le programme ne termine pas avant que l'ensemble des threads n'aient terminé leur exécution. Il s'agit de faire un join.

  1. #include <iostream>
  2. #include <cstdio>
  3. #include <cstdlib>
  4. using namespace std;
  5. #include <pthread.h>
  6. #include <unistd.h>
  7.  
  8. #define CHECK(x) \
  9.     { \
  10.         int res = x; \
  11.         if (res != 0) cerr << "PTHREAD ERROR: " << res << endl; \
  12.     }
  13.    
  14. /**
  15.  * data structure used to pass argument to thread
  16.  * which contains the thread id
  17.  */
  18. typedef struct {
  19.   int id;
  20.   char msg[20];
  21. } thread_data_t;
  22.  
  23.  
  24. /**
  25.  * thread to execute
  26.  */
  27. void *thread_code(void *argument) {
  28.     thread_data_t *data = (thread_data_t *) argument;
  29.     int n_repeat = 5;
  30.     while (n_repeat) {
  31.         cout << "thread(" << pthread_self() <<") ";
  32.         cout << ", id=" << data->id;
  33.         cout << ", msg=[" << data->msg << "]";
  34.         cout << ", repeat=" << n_repeat << endl;
  35.         --n_repeat;
  36.         sleep(1);
  37.     }
  38. }
  39.  
  40. /**
  41.  * Main function
  42.  */
  43. int main() {
  44.  
  45.     // array of threads
  46.     pthread_t threads[10];
  47.     // array of data for threads
  48.     thread_data_t data[10];
  49.  
  50.     // set attribute to indicate that thread is joinable
  51.     pthread_attr_t attr;
  52.     CHECK( pthread_attr_init(&attr) );
  53.     CHECK( pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE) );
  54.      
  55.     // create threads and data
  56.     for (int i=0; i<10; ++i) {
  57.        
  58.         // create data for thread i
  59.         data[i].id = i;
  60.         sprintf(data[i].msg, "coucou %d", 100 + rand() % 100);
  61.        
  62.         // create thread i
  63.         CHECK( pthread_create(&threads[i], &attr, thread_code, (void *) &data[i]) );
  64.     }
  65.    
  66.     // destroy attribute:  not necessary any more
  67.     pthread_attr_destroy(&attr);
  68.  
  69.  
  70.     // perform the join
  71.     void *status;
  72.     for (int i=0; i<10; ++i) {
  73.         CHECK( pthread_join(threads[i], &status) );
  74.     }
  75.    
  76.     // this line won't be executed until all threads have terminated
  77.     // their execution
  78.     cout << "!!! END OF PROGRAM !!!!" << endl;
  79.    
  80.     pthread_exit(NULL);
  81.  
  82.     return 0;
  83. }
  84.  

thread(thread(thread(139750116677376139750108284672) ) 139750125070080, id=thread() , id=139750099891968) , id=230, msg=[, msg=[, msg=[coucou 183coucou 177], repeat=coucou 1155]], repeat=, repeat=55
...
!!! END OF PROGRAM !!!!

3.1.3. Verou (mutex) pour l'exclusion mutuelle

Enfin, afin d'améliorer la lisibilité on utilise un mutex, abréviation pour mutual exclusion. Un mutex est un verou qui ne pourra être fermé (lock) que par un seul thread à la fois, puis ouvert (unlock). Le verou permet de créer l'exclusion mutuelle d'une ressource pour les sections critiques.

  1. #include <iostream>
  2. #include <cstdio>
  3. #include <cstdlib>
  4. using namespace std;
  5. #include <pthread.h>
  6. #include <unistd.h>
  7.  
  8. #define CHECK(x) \
  9.     { \
  10.         int res = x; \
  11.         if (res != 0) cerr << "PTHREAD ERROR: " << res << endl; \
  12.     }
  13.    
  14. /** !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
  15.  * mutex accessible by all threads
  16.  */
  17. pthread_mutex_t mutex;
  18.  
  19. /**
  20.  * data structure used to pass argument to thread
  21.  * which contains the thread id
  22.  */
  23. typedef struct {
  24.   int id;
  25.   char msg[20];
  26. } thread_data_t;
  27.  
  28.  
  29. /**
  30.  * thread to execute
  31.  */
  32. void *thread_code(void *argument) {
  33.  
  34.     // convert argument
  35.     thread_data_t *data = (thread_data_t *) argument;
  36.  
  37.     int n_repeat = 5;
  38.     while (n_repeat) {
  39.    
  40.         // critical section
  41.         // !!!!!!!!!! LOCK !!!!!!!!!!
  42.         CHECK( pthread_mutex_lock(&mutex) );
  43.        
  44.         cout << "thread(" << pthread_self() <<") ";
  45.         cout << ", id=" << data->id;
  46.         cout << ", msg=[" << data->msg << "]";
  47.         cout << ", repeat=" << n_repeat << endl;
  48.        
  49.         // !!!!!!!!!! UNLOCK !!!!!!!!!!
  50.         CHECK( pthread_mutex_unlock(&mutex) );
  51.        
  52.         --n_repeat;
  53.         sleep(1);
  54.     }
  55. }
  56.  
  57. /**
  58.  * Main function
  59.  */  
  60. int main() {
  61.  
  62.     // array of threads
  63.     pthread_t threads[10];
  64.     // array of data for threads
  65.     thread_data_t data[10];
  66.  
  67.     // create mutex
  68.     CHECK( pthread_mutex_init(&mutex, NULL) );
  69.  
  70.     // set attribute to indicate that thread is joinable
  71.     pthread_attr_t attr;
  72.     CHECK( pthread_attr_init(&attr) );
  73.     CHECK( pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE) );
  74.    
  75.     // create threads
  76.     for (int i=0; i<10; ++i) {
  77.    
  78.         data[i].id = i;
  79.         sprintf(data[i].msg, "coucou %d", 100 + rand() % 100);
  80.        
  81.         CHECK( pthread_create(&threads[i], &attr, thread_code, (void *) &data[i]) );
  82.     }
  83.    
  84.     // destroy attribute: no necessary any more
  85.     pthread_attr_destroy(&attr);
  86.  
  87.     // perform the join
  88.     void *status;
  89.     for (int i=0; i<10; ++i) {
  90.    
  91.         CHECK( pthread_join(threads[i], &status) );
  92.        
  93.     }
  94.    
  95.     // this line won't be executed until all threads have terminated
  96.     // their execution
  97.     cout << "!!! END OF PROGRAM !!!!" << endl;
  98.    
  99.     // !!!!!!!!!! destroy mutex !!!!!!!!!!
  100.     CHECK( pthread_mutex_destroy(&mutex) );
  101.    
  102.    
  103.     pthread_exit(NULL);
  104.  
  105.     return 0;
  106. }
  107.  

thread(140293526013696) , id=1, msg=[coucou 186], repeat=5
thread(140293534406400) , id=0, msg=[coucou 183], repeat=5
thread(140293475657472) , id=7, msg=[coucou 192], repeat=5
thread(140293467264768) , id=8, msg=[coucou 149], repeat=5
thread(140293500835584) , id=4, msg=[coucou 193], repeat=5
thread(140293492442880) , id=5, msg=[coucou 135], repeat=5
thread(140293484050176) , id=6, msg=[coucou 186], repeat=5
thread(140293517620992) , id=2, msg=[coucou 177], repeat=5
thread(140293509228288) , id=3, msg=[coucou 115], repeat=5
thread(140293458872064) , id=9, msg=[coucou 121], repeat=5
thread(140293526013696) , id=1, msg=[coucou 186], repeat=4
...
!!! END OF PROGRAM !!!!

3.1.4. Exemple producteur / consommateur

Voici un exemple avec trois threads dont 2 producteurs et 1 consomateur:

  1. #include <iostream>
  2. #include <cstdio>
  3. #include <cstdlib>
  4. using namespace std;
  5. #include <pthread.h>
  6. #include <unistd.h>
  7.  
  8. #define CHECK(x) \
  9.     { \
  10.         int res = x; \
  11.         if (res != 0) cerr << "PTHREAD ERROR: " << res << endl; \
  12.     }
  13.  
  14. /**
  15.  * mutex accessible by all threads
  16.  */
  17. pthread_mutex_t mutex;
  18.  
  19. // Number of tires produced
  20. int counter_tires = 0;
  21.  
  22. // Number of motors produced
  23. int counter_motors = 0;
  24.  
  25. // stop execution of all threads
  26. bool terminate_threads = false;
  27.  
  28.  
  29. /**
  30.  * Thread that produces tires every minute (here we use
  31.  * 1 second instead of one minute).
  32.  * We simply increment the number of tires produced
  33.  */
  34. void *thread_inc_tires(void *argument) {
  35.  
  36.     while (!terminate_threads) {
  37.         CHECK( pthread_mutex_lock(&mutex) );
  38.         ++counter_tires;
  39.         cout << "create tire, #tires=" << counter_tires << endl;
  40.         CHECK( pthread_mutex_unlock(&mutex) );
  41.         sleep(1);
  42.     }
  43.    
  44.     CHECK( pthread_mutex_lock(&mutex) );
  45.     cout << "terminate thread_inc_tires" << endl;
  46.     CHECK( pthread_mutex_unlock(&mutex) );
  47. }
  48.  
  49. /**
  50.  * Thread that produces motors every 5 minutes (here we use
  51.  * 5 seconds instead of 5 minutes).
  52.  * We simply increment the number of motors produced
  53.  */
  54. void *thread_inc_motors(void *argument) {
  55.    
  56.     while (!terminate_threads) {
  57.         CHECK( pthread_mutex_lock(&mutex) );
  58.         ++counter_motors;
  59.         cout << "create motor, #motors=" << counter_motors << endl;
  60.         CHECK( pthread_mutex_unlock(&mutex) );
  61.         sleep(5);
  62.     }
  63.    
  64.     CHECK( pthread_mutex_lock(&mutex) );
  65.     cout << "terminate thread_inc_motors" << endl;
  66.     CHECK( pthread_mutex_unlock(&mutex) );
  67. }  
  68.  
  69. /**
  70.  * Thread that produces cars
  71.  * We simply wait for at least 4 tires and 1 motor, then
  72.  * we decrease the number of tires, the number of motors
  73.  * and increase the number of cars.
  74.  * We stop all threads as soon as we produce 3 cars.
  75.  */
  76. void *thread_create_car(void *argument) {
  77.     int counter_cars = 0;
  78.    
  79.     while (counter_cars < 3) {
  80.    
  81.         // critical section
  82.         CHECK( pthread_mutex_lock(&mutex) );
  83.         if ((counter_tires >= 4) && (counter_motors >= 1)) {
  84.             counter_tires -= 4;
  85.             counter_motors -= 1;
  86.             ++counter_cars;
  87.             cout << "create car, #cars=" << counter_cars << endl;
  88.         }
  89.         CHECK( pthread_mutex_unlock(&mutex) );
  90.  
  91.         sleep(1);
  92.     }
  93.    
  94.     CHECK( pthread_mutex_lock(&mutex) );
  95.     terminate_threads = true;
  96.     CHECK( pthread_mutex_unlock(&mutex) );
  97.    
  98. }
  99.  
  100. /**
  101.  * Main function
  102.  */
  103. int main() {
  104.     // threads for tires, motors and cars
  105.     pthread_t t_inc_tires, t_inc_motors, t_create_cars;
  106.    
  107.     // create mutex
  108.     CHECK( pthread_mutex_init(&mutex, NULL) );
  109.  
  110.  
  111.     // set attribute to indicate that thread is joinable
  112.     pthread_attr_t attr;
  113.     CHECK( pthread_attr_init(&attr) );
  114.     CHECK( pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE) );
  115.    
  116.     // create threads
  117.     CHECK( pthread_create( &t_inc_tires,   &attr, thread_inc_tires,  (void *) NULL) );
  118.     CHECK( pthread_create( &t_inc_motors,  &attr, thread_inc_motors, (void *) NULL) );
  119.     CHECK( pthread_create( &t_create_cars, &attr, thread_create_car, (void *) NULL) );
  120.    
  121.     // destroy attribute no more necessary
  122.     pthread_attr_destroy(&attr);
  123.  
  124.     // make the join
  125.     void *status;
  126.     CHECK( pthread_join(t_create_cars, &status) );
  127.     CHECK( pthread_join(t_inc_tires, &status) );
  128.     CHECK( pthread_join(t_inc_motors, &status) );
  129.    
  130.     // this line won't be executed until all threads have terminated
  131.     cout << "!!! END OF PROGRAM !!!!" << endl;
  132.    
  133.     // destroy mutex
  134.     CHECK( pthread_mutex_destroy(&mutex) );
  135.     pthread_exit(NULL);
  136.  
  137.     return 0;
  138. }
  139.  

3.1.5. variables de condition

Les condition variables sont des primitives de synchronisation qui permettent aux threads de se mettre en atente jusqu'à ce qu'une condition particulière soit vérifiée. Elles fonctione de concert avec les verous.

Typiquement le code utilisé suit le schéma suivant où cond est la variable de condition :

// code en atente de la réalisation de la condition
pthread_mutex_lock(&lock);
while (SOME-CONDITION is false) {
	pthread_cond_wait(&cond, &lock);
}

do_something();
pthread_mutex_unlock(&lock);
// code qui réalise la condition
pthread_mutex_lock(&lock);

ALTER-CONDITION

// réveille les threads en attente de la réalisation de la condition
pthread_cond_signal(&cond);

// levée du verou
pthread_mutex_unlock (&lock)

Un certain nombre de fonctions sont dédiée à la gestion de ces variables :

Voici à titre d'exemple le programme précédent avec utilisation des variables de condition. On utilise ici une condition qui indique si le nombre de pneus et de moteurs est suffisant pour produire une voiture.

  1. #include <iostream>
  2. #include <cstdio>
  3. #include <cstdlib>
  4. using namespace std;
  5. #include <pthread.h>
  6. #include <unistd.h>
  7.  
  8. #define CHECK(x) \
  9.     { \
  10.         int res = x; \
  11.         if (res != 0) cerr << "PTHREAD ERROR: " << res << endl; \
  12.     }
  13.  
  14. /**
  15.  * mutex accessible by all threads
  16.  */
  17. pthread_mutex_t mutex;
  18.  
  19. /**
  20.  * !!!!!!!!!! condition variable !!!!!!!!!!
  21.  */
  22. pthread_cond_t condition;
  23.  
  24. int counter_tires = 0;
  25. int counter_motors = 0;
  26. bool terminate_threads = false;
  27.  
  28. /**
  29.  * Thread to create tires
  30.  */
  31. void *thread_inc_tires(void *argument) {
  32.  
  33.     while (!terminate_threads) {
  34.         CHECK( pthread_mutex_lock(&mutex) );
  35.         ++counter_tires;
  36.         cout << "create tire, #tires=" << counter_tires << endl;
  37.         if ((counter_tires >= 4) && (counter_motors >= 1)) {
  38.             cout << "signal to 'create_car' to inform we can create a car" << endl;
  39.             // !!!!!!!!!!
  40.             pthread_cond_signal(&condition);
  41.         }
  42.         CHECK( pthread_mutex_unlock(&mutex) );
  43.         sleep(1);
  44.     }
  45.    
  46.     CHECK( pthread_mutex_lock(&mutex) );
  47.     cout << "terminate thread_inc_tires" << endl;
  48.     CHECK( pthread_mutex_unlock(&mutex) );
  49. }
  50.  
  51. /**
  52.  * Thread to produce motors
  53.  */
  54. void *thread_inc_motors(void *argument) {
  55.  
  56.     while (!terminate_threads) {
  57.         CHECK( pthread_mutex_lock(&mutex) );
  58.         ++counter_motors;
  59.         cout << "create motor, #motors=" << counter_motors << endl;
  60.         if ((counter_tires >= 4) && (counter_motors >= 1)) {
  61.             cout << "signal to 'create_car' to inform we can create a car" << endl;
  62.             // !!!!!!!!!!
  63.             pthread_cond_signal(&condition);
  64.         }
  65.         CHECK( pthread_mutex_unlock(&mutex) );
  66.         sleep(5);
  67.     }
  68.    
  69.     CHECK( pthread_mutex_lock(&mutex) );
  70.     cout << "terminate thread_inc_motors" << endl;
  71.     CHECK( pthread_mutex_unlock(&mutex) );
  72. }  
  73.  
  74. /**
  75.  * Thread to produce
  76.  */
  77. void *thread_create_car(void *argument) {
  78.     int counter_cars = 0;
  79.    
  80.     while (counter_cars < 5) {
  81.    
  82.         // critical section
  83.         CHECK( pthread_mutex_lock(&mutex) );
  84.        
  85.         // !!!!!!!!!!
  86.         pthread_cond_wait(&condition, &mutex);
  87.        
  88.         counter_tires -= 4;
  89.         counter_motors -= 1;
  90.         ++counter_cars;
  91.         cout << "create car, #cars=" << counter_cars << endl;
  92.        
  93.         CHECK( pthread_mutex_unlock(&mutex) );
  94.  
  95.     }
  96.    
  97.     CHECK( pthread_mutex_lock(&mutex) );
  98.     terminate_threads = true;
  99.     CHECK( pthread_mutex_unlock(&mutex) );
  100.    
  101. }
  102.  
  103. int main() {
  104.  
  105.     // All threads
  106.     pthread_t t_inc_tires, t_inc_motors, t_create_cars;
  107.    
  108.     // create mutex
  109.     CHECK( pthread_mutex_init(&mutex, NULL) );
  110.  
  111.     pthread_cond_init(&condition, NULL);
  112.    
  113.     // set attribute to indicate that thread is joinable
  114.     pthread_attr_t attr;
  115.     CHECK( pthread_attr_init(&attr) );
  116.     CHECK( pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE) );
  117.    
  118.     // create threads
  119.     CHECK( pthread_create(&t_inc_tires, &attr, thread_inc_tires, (void *) NULL) );
  120.     CHECK( pthread_create(&t_inc_motors, &attr, thread_inc_motors, (void *) NULL) );
  121.     CHECK( pthread_create(&t_create_cars, &attr, thread_create_car, (void *) NULL) );
  122.    
  123.     // destroy attribute no more necessary
  124.     pthread_attr_destroy(&attr);
  125.  
  126.     // make the join
  127.     void *status;
  128.     CHECK( pthread_join(t_create_cars, &status) );
  129.     CHECK( pthread_join(t_inc_tires, &status) );
  130.     CHECK( pthread_join(t_inc_motors, &status) );
  131.    
  132.     // this line won't be executed until all threads have terminated
  133.     cout << "!!! END OF PROGRAM !!!!" << endl;
  134.    
  135.     // destroy mutex
  136.     CHECK( pthread_mutex_destroy(&mutex) );
  137.     pthread_exit(NULL);
  138.  
  139.     return 0;
  140. }
  141.  

3.2. Threads C++11

Les threads introduits en C++11 sont gérés par une classe.

On pourra consulter la Thread support library

3.2.1. Création de threads C++11

La création d'un thread est plus simple qu'avec les POSIX threads : il suffit de créer une fonction avec les paramètres désirés

  1. #include <iostream>
  2. #include <cstdio>
  3. #include <cstdlib>
  4. using namespace std;
  5. #include <unistd.h>
  6. #include <thread>
  7. #include <chrono>
  8.  
  9.  
  10. /**
  11.  * Thread to execute
  12.  *
  13.  * PARAMETERS
  14.  *  - id, thread identifier
  15.  *  - msg, message to print
  16.  */
  17. void thread_code(int id, string msg) {
  18.  
  19.     int n_repeat = 5;
  20.     while (n_repeat) {
  21.         // id given by the class thread
  22.         cout << "thread(" << this_thread::get_id() <<") ";
  23.         // id given by the user
  24.         cout << ", id=" << id;
  25.         cout << ", msg=[" << msg << "]";
  26.         cout << ", repeat=" << n_repeat << endl;
  27.         --n_repeat;
  28.         // wait one second
  29.         this_thread::sleep_for(std::chrono::seconds(1));
  30.     }
  31.    
  32. }
  33.  
  34. /**
  35.  * Main function
  36.  */
  37. int main() {
  38.  
  39.     // array of threads
  40.     thread *threads[10];
  41.  
  42.     // create threads
  43.     for (int i=0; i<10; ++i) {
  44.  
  45.         char msg[20];
  46.         sprintf(msg, "coucou %d", 100 + rand() % 100);
  47.  
  48.         threads[i] = new thread(thread_code, i, msg );
  49.     }
  50.  
  51.     cout << "!!! END OF PROGRAM !!!!" << endl;
  52.    
  53.     pthread_exit(NULL);
  54.  
  55.     return 0;
  56. }
  57.  

On note le même problème qu'avec les P-threads à savoir : le programe se termine avant que les threads n'aient terminé leur exécution.


thread(thread(thread(thread(140340511086336thread() , id=140340502693632140340494300928) ) 140340519479040, id=, id=) , id=23, msg=[1, msg=[coucou 135], repeat=5coucou 135], repeat=, msg=[5coucou 193]4, repeat=, msg=[coucou 186]5
, repeat=5

140340527871744) , id=0, msg=[coucou 135], repeat=5
...
!!! END OF PROGRAM !!!!
thread(thread(140340388419328) , id=1403403800266248) , id=, msg=[9coucou 121, msg=[]coucou 121, repeat=], repeat=55
...

3.2.2. Synchronisation des threads C++11 (join)

Il suffit d'apeler la méthode join() pour chacun des threads

  1. #include <iostream>
  2. #include <cstdio>
  3. #include <cstdlib>
  4. using namespace std;
  5. #include <unistd.h>
  6. #include <thread>
  7. #include <chrono>
  8.  
  9.  
  10. /**
  11.  * Thread to execute
  12.  *
  13.  * PARAMETERS
  14.  *  - id, thread identifier
  15.  *  - msg, message to print
  16.  */
  17. void thread_code(int id, string msg) {
  18.    
  19.     int n_repeat = 5;
  20.    
  21.     while (n_repeat) {
  22.         cout << "thread(" << id <<") ";
  23.         cout << ", msg=[" << msg << "]";
  24.         cout << ", repeat=" << n_repeat << endl;
  25.         --n_repeat;
  26.         // wait one second
  27.         this_thread::sleep_for(std::chrono::seconds(1));
  28.     }
  29.    
  30. }
  31.  
  32. /**
  33.  * Main function
  34.  */
  35. int main() {
  36.  
  37.     // array of threads
  38.     thread *threads[10];
  39.  
  40.     // create threads
  41.     for (int i=0; i<10; ++i) {
  42.         char msg[20];
  43.         sprintf(msg, "coucou %d", 100 + rand() % 100);
  44.         threads[i] = new thread(thread_code, i, msg );
  45.     }
  46.  
  47.     // !!!!!!!!!! perform join !!!!!!!!!!
  48.     for (int i=0; i<10; ++i) {
  49.         threads[i]->join();
  50.     }
  51.    
  52.     cout << "!!! END OF PROGRAM !!!!" << endl;
  53.    
  54.     pthread_exit(NULL);
  55.  
  56.     return 0;
  57. }
  58.  

3.2.3. Verou (mutex) pour l'exclusion mutuelle C++11

Il suffit d'utiliser la classe mutex dotée de deux méthodes :

  1. #include <iostream>
  2. #include <cstdio>
  3. #include <cstdlib>
  4. using namespace std;
  5. #include <unistd.h>
  6. #include <thread>
  7. #include <mutex>
  8. #include <chrono>
  9.  
  10. // !!!!!!!!!! Mutex declaration !!!!!!!!!!
  11. mutex my_lock;
  12.  
  13. /**
  14.  * Thread to execute
  15.  *
  16.  * PARAMETERS
  17.  *  - id, thread identifier
  18.  *  - msg, message to print
  19.  */
  20. void thread_code(int id, string msg) {
  21.  
  22.     int n_repeat = 5;
  23.     while (n_repeat) {
  24.    
  25.         // !!!!!!!!!! LOCK !!!!!!!!!!
  26.         my_lock.lock();
  27.        
  28.         cout << "thread(" << id <<") ";
  29.         cout << ", msg=[" << msg << "]";
  30.         cout << ", repeat=" << n_repeat << endl;
  31.        
  32.         // !!!!!!!!!! UNLOCK !!!!!!!!!!
  33.         my_lock.unlock();
  34.        
  35.         --n_repeat;
  36.         this_thread::sleep_for(std::chrono::seconds(1));
  37.     }
  38.    
  39. }
  40.  
  41. /**
  42.  * Main function
  43.  */
  44. int main() {
  45.     // array of threads
  46.     thread *threads[10];
  47.  
  48.     // create threads
  49.     for (int i=0; i<10; ++i) {
  50.    
  51.         char msg[20];
  52.         sprintf(msg, "coucou %d", 100 + rand() % 100);
  53.        
  54.         threads[i] = new thread(thread_code, i, msg );
  55.     }
  56.  
  57.     // perform join
  58.     for (int i=0; i<10; ++i) {
  59.         threads[i]->join();
  60.     }
  61.     cout << "!!! END OF PROGRAM !!!!" << endl;
  62.    
  63.     pthread_exit(NULL);
  64.  
  65.     return 0;
  66. }
  67.  

thread(140401278883584) , id=0, msg=[coucou 193], repeat=5
thread(140401262098176) , id=2, msg=[coucou 193], repeat=5
thread(140401270490880) , id=1, msg=[coucou 135], repeat=5
thread(140401253705472) , id=3, msg=[coucou 135], repeat=5
thread(140401245312768) , id=4, msg=[coucou 135], repeat=5
thread(140401236920064) , id=5, msg=[coucou 186], repeat=5
thread(140401228527360) , id=6, msg=[coucou 192], repeat=5
thread(140401220134656) , id=7, msg=[coucou 149], repeat=5
thread(140401138726656) , id=8, msg=[coucou 121], repeat=5
thread(140401130333952) , id=9, msg=[coucou 121], repeat=5
thread(140401278883584) , id=0, msg=[coucou 193], repeat=4
thread(140401262098176) , id=2, msg=[coucou 193], repeat=4
...
!!! END OF PROGRAM !!!!

3.2.4. Valeurs atomiques

On consultera la partie atomic de la STL

  1. // ==================================================================
  2. // Author: Jean-Michel Richer
  3. // Date: August 2016
  4. // Purpose: use of atomic operation fetch_add to perform reduction
  5. // ==================================================================
  6. #include <iostream>
  7. #include <atomic>
  8. #include <thread>
  9. #include <cstdlib>
  10. #include <cassert>
  11. using namespace std;
  12.  
  13. int main() {
  14.     const int SIZE = 1024;
  15.     int *tab = new int [SIZE];
  16.  
  17.     for (int i= 0; i < SIZE; ++i) tab[i] = i+1;
  18.    
  19.     // reduction with atomic
  20.     atomic<int> sum(0);
  21.    
  22.     for (int i = 0; i < SIZE; ++i) {
  23.         sum.fetch_add(tab[i], std::memory_order_relaxed);
  24.     }
  25.    
  26.     cout << "reduction=" << sum << endl;   
  27.     assert(sum == (SIZE)*(SIZE+1)/2);
  28.    
  29.     return EXIT_SUCCESS;
  30. }
  31.  
  32.  

Pour de plus amples informations voir ce site

3.3. Affectation des threads aux coeurs physiques

Lorsque l'on utilise $K$ threads ils se peut que dans certains cas l'afectation des threads à des coeurs du processeur pose problème notament sur les systèmes SMP.

Considérons le cas de l'Intel Xeon E5 2670 qui dispose de 10 coeurs + Hyperthreading. Dans le cas d'un système SMP composé de deux processeurs de ce type la répartition des coeurs est la suivante :

 id   Cpu   Core 
 0   0   0 
 1   1   0 
 2   0   1 
 3   1   1 
 ...       
 20   0 (HT)   0 
 21   1 (HT)   0 
 ...       
Identification des coeurs sur Xeon E5 2670

En d'autres termes, les threads se répartissent ainsi :

Si on doit utiliser 4 threads, on obtiendra des temps de calcul diférents si l'afectation des threads aux coeurs physiques est réalisé de l'une des manières suivantes :

Pour réaliser l'afectation des threads on peut utiliser la comande :

taskset -c identifiants-threads monprogramme arguments

Exemple :

taskset 0,2,4,6 ./reduction.exe 131072
# ou
taskset 0-6:2 ./reduction.exe 131072

A titre d'aplication on considère le problème de Maximum de Parcimonie, l'exécution avec un nombre croissant de threads done les temps suivants sur un noeud Bull Novascale R422 du cluster taurus du LERIA

 #threads   1   2   4   8   10   12   14   16   18   20   40 
 temps (s)   207   120   67   41   36   36   39   43   47   49   60 
 facteur   -   1.72   3.08   5.04   5.75   -   -   -   -   -   - 
 pourcentage   -   - 42%   - 68%   - 80%   - 83%   - 83%   - 81%   - 79%   - 77%   - 76%   - 71% 
Temps d'exécution (s) sur Xeon E5 2670

Une étude aprofondie montre que

  • jusqu'à 10 (voire 12) threads on note une diminution du temps de calcul, au delà on dégrade les performances
  • sans l'HyperThreading : que l'on soit sur un seul CPU ou sur deux CPU on obtient les memes résultats
    • 8 threads : 41 secondes avec taskset -c 0-7 ou taskset -c 0,2,4,6,8,10,12,14
    • 10 threads : 36 secondes avec taskset -c 0-9 ou taskset -c 0,2,4,6,8,10,12,14,16,18
  • avec l'HyperThreading : on dégrade les performances
    • 8 threads : 1m08s avec taskset -c 0-3,20-23 (4C + 4HT sur le même CPU)
    • 10 threads : 1m06s avec taskset -c 0-4,20-24 (5C + 5HT sur le même CPU)