Die Boost C++ Bibliotheken

Threads synchronisieren

Während der Einsatz mehrerer Threads die Performance eines Programms erhöhen kann, erhöht sich üblicherweise auch die Komplexität. Wenn mehrere Funktionen dank Threads gleichzeitig ausgeführt werden, müssen Zugriffe dieser Threads auf globale Ressourcen, die also mehreren Threads zur Verfügung stehen, synchronisiert werden. Die Synchronisation von Threads kann in größeren Programmen sehr schwierig sein und viel Detailarbeit erfordern. Im Folgenden lernen Sie die Klassen kennen, die Boost.Thread zur Synchronisation von Threads zur Verfügung stellt.

Beispiel 44.7. Exklusiver Zugriff mit boost::mutex
#include <boost/thread.hpp>
#include <boost/chrono.hpp>
#include <iostream>

void wait(int seconds)
{
  boost::this_thread::sleep_for(boost::chrono::seconds{seconds});
}

boost::mutex mutex;

void thread()
{
  using boost::this_thread::get_id;
  for (int i = 0; i < 5; ++i)
  {
    wait(1);
    mutex.lock();
    std::cout << "Thread " << get_id() << ": " << i << std::endl;
    mutex.unlock();
  }
}

int main()
{
  boost::thread t1{thread};
  boost::thread t2{thread};
  t1.join();
  t2.join();
}

Multithreaded-Programme verwenden zur Synchronisation Objekte, die Mutex genannt werden. Boost.Thread stellt entsprechend verschiedene Mutex-Klassen zur Verfügung, von denen die einfachste boost::mutex ist. Das Grundprinzip eines Mutex ist: Wenn ein Thread einen Mutex in Besitz nimmt, können andere Threads den gleichen Mutex erst dann ihrerseits in Besitz nehmen, wenn der Mutex wieder freigegeben wurde. Auf diese Weise können Threads gezwungen werden zu warten, bis ein anderer Thread bestimmte Operationen ausgeführt und anschließend einen Mutex wieder freigegeben hat.

Im Beispiel 44.7 wird ein globales Objekt mutex vom Typ boost::mutex verwendet. Dieser Mutex wird innerhalb der for-Schleife in der Funktion thread() kurz vor dem Zugriff auf die Standardausgabe in Besitz genommen. Dies geschieht über den Aufruf der Methode lock(). Nachdem eine Meldung auf die Standardausgabe ausgegeben wurde, wird der Mutex mit unlock() wieder freigegeben.

In der Funktion main() wird die Funktion thread() in zwei Threads gestartet. Jeder dieser beiden Threads zählt innerhalb einer for-Schleife bis fünf und gibt in jedem Schleifendurchgang eine Meldung auf die Standardausgabe aus. Da die Standardausgabe ein globales Objekt ist, das von beiden Threads geteilt wird, muss der Zugriff auf std::cout synchronisiert werden. Andernfalls könnten sich Schreibvorgänge überschneiden. Die Synchronisierung stellt sicher, dass zu jedem beliebigen Zeitpunkt nur ein einziger Thread auf std::cout zugreift.

Indem beide Threads vor dem Zugriff auf die Standardausgabe versuchen, den gleichen Mutex in Besitz zu nehmen, ist garantiert, dass genau ein Thread auf die Standardausgabe zugreift. Egal, welcher Thread erfolgreich lock() aufruft – der andere Thread muss warten, bis unlock() aufgerufen wurde.

Diese für Mutexe typische Vorgehensweise – das In-Besitz-nehmen und wieder Freigeben – wird durch verschiedene Typen in Boost.Thread unterstützt. Anstatt lock() und unlock() selbst aufzurufen, kann zum Beispiel die Klasse boost::lock_guard verwendet werden.

Beispiel 44.8. boost::lock_guard mit garantierter Mutex-Freigabe
#include <boost/thread.hpp>
#include <boost/chrono.hpp>
#include <iostream>

void wait(int seconds)
{
  boost::this_thread::sleep_for(boost::chrono::seconds{seconds});
}

boost::mutex mutex;

void thread()
{
  using boost::this_thread::get_id;
  for (int i = 0; i < 5; ++i)
  {
    wait(1);
    boost::lock_guard<boost::mutex> lock{mutex};
    std::cout << "Thread " << get_id() << ": " << i << std::endl;
  }
}

int main()
{
  boost::thread t1{thread};
  boost::thread t2{thread};
  t1.join();
  t2.join();
}

Im Beispiel 44.8 wird für den Mutex automatisch lock() im Konstruktor und unlock() im Destruktor von boost::lock_guard aufgerufen. Der Zugriff ist wie zuvor beim expliziten Aufruf von lock() und unlock() synchronisiert. Die Klasse boost::lock_guard ist ein Beispiel für das RAII-Idiom, mit dessen Hilfe sichergestellt werden kann, dass Ressourcen garantiert freigegeben werden.

Neben boost::mutex und boost::lock_guard bietet Boost.Thread weitere Klassen an, die verschiedene Spielarten der Synchronisierung unterstützen. Eine wichtige Klasse ist dabei boost::unique_lock, die im Vergleich zu boost::lock_guard eine Reihe nützlicher Methoden bietet und nicht nur aus einem Konstruktor und Destruktor besteht.

Beispiel 44.9. Der Allrounder-Lock boost::unique_lock
#include <boost/thread.hpp>
#include <boost/chrono.hpp>
#include <iostream>

void wait(int seconds)
{
  boost::this_thread::sleep_for(boost::chrono::seconds{seconds});
}

boost::timed_mutex mutex;

void thread1()
{
  using boost::this_thread::get_id;
  for (int i = 0; i < 5; ++i)
  {
    wait(1);
    boost::unique_lock<boost::timed_mutex> lock{mutex};
    std::cout << "Thread " << get_id() << ": " << i << std::endl;
    boost::timed_mutex *m = lock.release();
    m->unlock();
  }
}

void thread2()
{
  using boost::this_thread::get_id;
  for (int i = 0; i < 5; ++i)
  {
    wait(1);
    boost::unique_lock<boost::timed_mutex> lock{mutex,
      boost::try_to_lock};
    if (lock.owns_lock() || lock.try_lock_for(boost::chrono::seconds{1}))
    {
      std::cout << "Thread " << get_id() << ": " << i << std::endl;
    }
  }
}

int main()
{
  boost::thread t1{thread1};
  boost::thread t2{thread2};
  t1.join();
  t2.join();
}

Im Beispiel 44.9 kommen zwei Varianten der Funktion thread() vor, wie sie in den vorherigen Beispielen eingesetzt wurde. Beide Varianten geben noch immer in einer Schleife fünf Zahlen auf die Standardausgabe aus. Beide Varianten verwenden jedoch nun die Klasse boost::unique_lock, um einen Mutex in Besitz zu nehmen.

In thread1() wird mutex an den Konstruktor von boost::unique_lock übergeben. boost::unique_lock versucht daraufhin, den Mutex in Besitz zu nehmen. In diesem Fall verhält sich boost::unique_lock nicht anders als boost::lock_guard. boost::unique_lock ruft im Konstruktor lock() für den Mutex auf.

Der Mutex wird jedoch nicht im Destruktor von boost::unique_lock freigegeben. In thread1() wird für den Lock release() aufgerufen. Dadurch wird der Mutex vom Lock entkoppelt. So wie boost::lock_guard gibt boost::unique_lock üblicherweise im Destruktor einen Mutex frei. Wird der Mutex entkoppelt, geschieht dies jedoch nicht. Der Mutex wird daher in thread1() explizit über einen Aufruf von unlock() freigegeben.

In thread2() wird an den Konstruktor von boost::unique_lock neben mutex boost::try_to_lock übergeben. In diesem Fall ruft der Konstruktor von boost::unique_lock für den Mutex nicht lock() auf, sondern try_lock(). Der Lock versucht also lediglich, den Mutex in Besitz zu nehmen. Ist der Mutex von einem anderen Thread in Besitz genommen worden, schlägt der Versuch fehl.

Über owns_lock() kann überprüft werden, ob boost::unique_lock einen Mutex in Besitz genommen hat. Gibt owns_lock() true zurück, kann im thread2() sofort auf std::cout zugegriffen werden.

Gibt owns_lock() false zurück, wird die Methode try_lock_for() aufgerufen. Diese Methode versucht ebenfalls, den Mutex in Besitz zu nehmen. Der Versuch wird jedoch erst nach Ablauf einer bestimmten Zeitspanne abgebrochen. Im Beispiel 44.9 ist angegeben, dass lock eine Sekunde lang versuchen soll, den Mutex in Besitz zu nehmen. Gibt try_lock_for() true zurück, kann auf std::cout zugegriffen werden. Andernfalls gibt thread2() auf und überspringt die Ausgabe. Wenn Sie das Beispiel ausführen, ist es möglich, dass der zweite Thread weniger als fünf Zahlen ausgibt.

Beachten Sie, dass der Typ von mutex im Beispiel 44.9 nicht boost::mutex, sondern boost::timed_mutex ist. Im Beispiel muss diese Klasse verwendet werden, da nur sie die Methode try_lock_for() anbietet. Auf diese Methode greift der Lock zu, wenn für diesen try_lock_for() aufgerufen wird. boost::mutex bietet lediglich die Methoden lock() und try_lock() an.

Die Klasse boost::unique_lock ist ein exklusiver Lock. Das bedeutet, dass jeweils nur ein Thread mit dieser Klasse einen Mutex in Besitz nehmen kann und andere Threads warten müssen, bis der Mutex wieder freigegeben wurde. Neben exklusiven Locks gibt es auch nicht-exklusive Locks. Boost.Thread bietet hierfür die Klasse boost::shared_lock an, die zusammen mit einem Mutex vom Typ shared_mutex verwendet werden muss.

Beispiel 44.10. Nicht-exklusive Locks mit boost::shared_lock
#include <boost/thread.hpp>
#include <boost/chrono.hpp>
#include <iostream>
#include <vector>
#include <cstdlib>
#include <ctime>

void wait(int seconds)
{
  boost::this_thread::sleep_for(boost::chrono::seconds{seconds});
}

boost::shared_mutex mutex;
std::vector<int> random_numbers;

void fill()
{
  std::srand(static_cast<unsigned int>(std::time(0)));
  for (int i = 0; i < 3; ++i)
  {
    boost::unique_lock<boost::shared_mutex> lock{mutex};
    random_numbers.push_back(std::rand());
    lock.unlock();
    wait(1);
  }
}

void print()
{
  for (int i = 0; i < 3; ++i)
  {
    wait(1);
    boost::shared_lock<boost::shared_mutex> lock{mutex};
    std::cout << random_numbers.back() << '\n';
  }
}

int sum = 0;

void count()
{
  for (int i = 0; i < 3; ++i)
  {
    wait(1);
    boost::shared_lock<boost::shared_mutex> lock{mutex};
    sum += random_numbers.back();
  }
}

int main()
{
  boost::thread t1{fill}, t2{print}, t3{count};
  t1.join();
  t2.join();
  t3.join();
  std::cout << "Sum: " << sum << '\n';
}

Nicht-exklusive Locks vom Typ boost::shared_lock können dann verwendet werden, wenn Threads lediglich lesend auf eine Ressource zugreifen. Ein Thread, der eine Ressource verändert und daher schreibend auf sie zugreift, benötigt einen exklusiven Lock. Das sollte einleuchten: Threads, die lediglich lesend auf eine Ressource zugreifen, merken nicht, dass eine Ressource zeitgleich in einem anderen Thread gelesen wird. Nicht-exklusive Locks können daher einen Mutex mit anderen nicht-exklusiven Locks teilen.

Im Beispiel 44.10 greifen die beiden Funktionen print() und count() lesend auf random_numbers zu. Während print() die letzte Zahl in random_numbers auf die Standardausgabe ausgibt, addiert count() sie zur Variablen sum hinzu. Weil beide Funktionen random_numbers nicht ändern, können sie gleichzeitig auf diese Variable zugreifen. Deswegen wird der Zugriff auf random_numbers mit einem nicht-exklusiven Lock vom Typ boost::shared_lock synchronisiert.

In der Funktion fill() jedoch wird ein exklusiver Lock vom Typ boost::unique_lock benötigt, da in dieser Funktion neue Zufallszahlen in den Container random_numbers eingefügt werden. Damit der Mutex freigegeben wird, bevor in der for-Schleife der Funktion fill() eine Sekunde gewartet wird, wird explizit unlock() aufgerufen. Die Funktion wait() wird im Gegensatz zu den vorherigen Beispielen nicht zu Beginn, sondern am Ende der for-Schleife aufgerufen, damit der Container random_numbers auf alle Fälle eine Zufallszahl enthält, bevor print() und count() zum ersten Mal auf den Container zugreifen. In diesen Funktionen wird wait() zu Beginn der for-Schleifen aufgerufen.

Wenn Ihnen die Aufrufe von wait() an den unterschiedlichen Stellen in den for-Schleifen nicht geheuer sind, haben Sie Recht: Je nachdem, welche Threads wann und wie schnell vom Prozessor ausgeführt werden, kann die Reihenfolge durcheinander geraten. Mit Hilfe von Bedingungsvariablen können die Threads so synchronisiert werden, dass Zahlen sofort dann, wenn sie dem Container random_numbers hinzugefügt wurden, in einem anderen Thread verarbeitet werden.

Beispiel 44.11. Bedingungsvariablen mit boost::condition_variable_any
#include <boost/thread.hpp>
#include <iostream>
#include <vector>
#include <cstdlib>
#include <ctime>

boost::mutex mutex;
boost::condition_variable_any cond;
std::vector<int> random_numbers;

void fill()
{
  std::srand(static_cast<unsigned int>(std::time(0)));
  for (int i = 0; i < 3; ++i)
  {
    boost::unique_lock<boost::mutex> lock{mutex};
    random_numbers.push_back(std::rand());
    cond.notify_all();
    cond.wait(mutex);
  }
}

void print()
{
  std::size_t next_size = 1;
  for (int i = 0; i < 3; ++i)
  {
    boost::unique_lock<boost::mutex> lock{mutex};
    while (random_numbers.size() != next_size)
      cond.wait(mutex);
    std::cout << random_numbers.back() << '\n';
    ++next_size;
    cond.notify_all();
  }
}

int main()
{
  boost::thread t1{fill};
  boost::thread t2{print};
  t1.join();
  t2.join();
}

Im Beispiel 44.11 wurden die Funktionen wait() und count() aus dem vorherigen Beispiel entfernt. Threads warten also nicht mehr pro Schleifendurchgang eine Sekunde, sondern arbeiten so schnell wie möglich. Außerdem wird keine Summe mehr gebildet – Zahlen werden lediglich auf die Standardausgabe ausgegeben.

Damit die Verarbeitung der Zufallszahlen nicht durcheinander kommt, müssen die Threads mit Hilfe von Bedingungsvariablen synchronisiert werden. Diese ermöglichen es, Bedingungen thread-übergreifend zu überprüfen.

Die Funktion fill() generiert wie zuvor pro Schleifendurchgang eine Zufallszahl und speichert sie im Container random_numbers. Dazu muss wie zuvor ein exklusiver Lock verwendet werden, damit ein anderer Thread nicht gleichzeitig auf random_numbers zugreift, während in diesen Container eine neue Zufallszahl eingefügt wird. Am Ende der for-Schleife in der Funktion fill() wird nicht mehr einfach nur eine Sekunde gewartet, sondern auf eine Bedingungsvariable zugegriffen und die Methode notify_all() aufgerufen. Diese Methode weckt alle Threads auf, in denen für die gleiche Bedingungsvariable wait() aufgerufen wurde und die daher auf eine Benachrichtigung, wie sie durch notify_all() ausgelöst wird, warten.

Wenn Sie sich die for-Schleife in der Funktion print() ansehen, stellen Sie fest, dass dort für die gleiche Bedingungsvariable, die in der Funktion fill() verwendet wird, wait() aufgerufen wird. Wenn der Thread durch einen Aufruf von notify_all() geweckt wird, versucht er, den Mutex in Besitz zu nehmen. Dies gelingt erst, nachdem der Mutex in der Funktion fill() freigegeben wurde.

Der Trick ist, dass der Aufruf von wait() gleichzeitig den entsprechenden Mutex freigibt, der als Parameter übergeben wird. Da am Ende der for-Schleife in der Funktion fill() für die gleiche Bedingungsvariable, für die gerade notify_all() aufgerufen wurde, wait() aufgerufen wird, wird der Mutex freigegeben. Das heißt, die Funktion fill() wartet, bis jemand anderes für die Bedingungsvariable notify_all() aufruft. Dies geschieht in print(), nachdem in dieser Funktion die neue Zufallszahl auf die Standardausgabe ausgegeben wurde.

Beachten Sie, dass der Aufruf von wait() in der Funktion print() außerdem in einer while-Schleife erfolgt: Es könnte sein, dass der Thread, der die Funktion fill() ausführt, zuerst eine Zufallszahl im Container random_numbers speichert, bevor in print() wait() für die Bedingungsvariable aufgerufen wird. Damit die Funktion print() trotzdem die erste bereits zu random_numbers hinzugefügte Zufallszahl verarbeitet, wird die Anzahl der im Container gespeicherten Zufallszahlen mit dem nächsten erwarteten Wert verglichen.

Beispiel 44.11 funktioniert auch problemlos, wenn Sie die Locks nicht in, sondern vor die for-Schleifen setzen. Das ergibt genaugenommen mehr Sinn, da in diesem Fall die Locks nicht in jedem Schleifendurchgang initialisiert und zerstört werden. Da der Mutex jeweils beim Aufruf von wait() freigegeben wird, ist es nicht notwendig, die Locks am Ende jedes Schleifendurchgangs zu zerstören, um auf diese Weise den Mutex freizugeben.