Die Boost C++ Bibliotheken

Kapitel 46. Boost.Lockfree

Boost.Lockfree bietet thread-safe und lock-freie Container an. Sie können auf die von dieser Bibliothek bereitgestellten Container von mehreren Threads aus zugreifen, ohne Zugriffe synchronisieren zu müssen.

In der Version 1.57.0 bietet Boost.Lockfree lediglich zwei Container an: Eine Queue vom Typ boost::lockfree::queue und einen Stack vom Typ boost::lockfree::stack. Für die Queue wird eine zweite Implementation in Form der Klasse boost::lockfree::spsc_queue angeboten, die für Anwendungsfälle optimiert ist, in denen genau ein Thread in die Queue schreibt und genau ein Thread aus der Queue liest. Die Abkürzung spsc im Klassennamen steht für single producer/single consumer.

Beispiel 46.1. boost::lockfree::spsc_queue in Aktion
#include <boost/lockfree/spsc_queue.hpp>
#include <thread>
#include <iostream>

boost::lockfree::spsc_queue<int> q{100};
int sum = 0;

void produce()
{
  for (int i = 1; i <= 100; ++i)
    q.push(i);
}

void consume()
{
  int i;
  while (q.pop(i))
    sum += i;
}

int main()
{
  std::thread t1{produce};
  std::thread t2{consume};
  t1.join();
  t2.join();
  consume();
  std::cout << sum << '\n';
}

Beispiel 46.1 verwendet den Container boost::lockfree::spsc_queue. Im ersten Thread, der die Funktion produce() ausführt, werden dem Container die Zahlen 1 bis 100 hinzugefügt. Der zweite Thread, der consume() ausführt, liest die Zahlen aus dem Container aus und addiert sie zu sum. Da der Container boost::lockfree::spsc_queue explizit einen derartigen gleichzeitigen Zugriff von zwei Threads aus unterstützt, ist es nicht notwendig, die Threads zu synchronisieren.

Beachten Sie, dass die Funktion consume() ein zweites Mal aufgerufen wird, nachdem die Threads beendet wurden. Dies ist notwendig, damit zwingend alle 100 Zahlen addiert werden und die Summe 5050 errechnet wird. Da consume() in einer Schleife auf die Queue zugreift, könnte es sein, dass Zahlen schneller ausgelesen werden als sie mit produce() der Queue hinzugefügt werden. Ist die Queue leer, gibt pop() false zurück. Der Thread, der consume() ausführt, könnte also beendet werden, weil produce() im anderen Thread die Queue nicht schnell genug füllen konnte. Wenn der Thread, der produce() ausführt, beendet wird, ist jedoch klar, dass alle Zahlen der Queue hinzugefügt wurden. Der zweite Aufruf von consume() stellt sicher, dass möglicherweise noch nicht ausgelesene Zahlen zu sum addiert werden.

Beachten Sie, dass die Größe der Queue dem Konstruktor als Parameter übergeben wird. Da boost::lockfree::spsc_queue als Ringspeicher implementiert ist, hat die Queue im Beispiel 46.1 eine Kapazität von 100 Elementen. Kann ein Element der Queue nicht hinzugefügt werden, weil sie voll ist, gibt push() false zurück. Im Beispielprogramm wird der Rückgabewert von push() nicht überprüft, weil der Queue genau 100 Zahlen hinzugefügt werden. Eine Kapazität von 100 ist demnach ausreichend.

Beispiel 46.2. boost::lockfree::spsc_queue mit boost::lockfree::capacity
#include <boost/lockfree/spsc_queue.hpp>
#include <boost/lockfree/policies.hpp>
#include <thread>
#include <iostream>

using namespace boost::lockfree;

spsc_queue<int, capacity<100>> q;
int sum = 0;

void produce()
{
  for (int i = 1; i <= 100; ++i)
    q.push(i);
}

void consume()
{
  while (q.consume_one([](int i){ sum += i; }))
    ;
}

int main()
{
  std::thread t1{produce};
  std::thread t2{consume};
  t1.join();
  t2.join();
  q.consume_all([](int i){ sum += i; });
  std::cout << sum << '\n';
}

Beispiel 46.2 funktioniert wie das vorherige. Diesmal wird die Größe des Ringbuffers jedoch zur Kompilierung angegeben. Dazu wird auf die Template-Klasse boost::lockfree::capacity zugegriffen und die Kapazität als Template-Parameter übergeben. q wird daraufhin über den Standardkonstruktor instanziiert – die Kapazität kann nicht mehr zur Laufzeit angegeben werden.

Die Funktion consume() wurde dahingehend geändert, dass Zahlen nicht mehr über pop() gelesen werden. Stattdessen wird consume_one() aufgerufen und als Parameter eine Lambda-Funktion übergeben. consume_one() liest ebenso wie pop() eine Zahl aus. Die Zahl wird jedoch nicht über eine Referenz an den Aufrufer übergeben. Sie wird als einziger Parameter an die Lambda-Funktion weitergereicht.

In main() wird, wenn die Threads beendet wurden, für die Queue statt der Funktion consume() die Methode consume_all() aufgerufen. consume_all() funktioniert wie consume_one(), stellt aber sicher, dass die Queue nach dem Methodenaufruf leer ist. Mit consume_all() wird die Lambda-Funktion also genauso oft aufgerufen wie Elemente in der Queue sind.

Wenn Sie Beispiel 46.2 ausführen, wird wieder 5050 ausgegeben.

Beispiel 46.3. boost::lockfree::queue mit variabler Container-Größe
#include <boost/lockfree/queue.hpp>
#include <thread>
#include <atomic>
#include <iostream>

boost::lockfree::queue<int> q{100};
std::atomic<int> sum{0};

void produce()
{
  for (int i = 1; i <= 10000; ++i)
    q.push(i);
}

void consume()
{
  int i;
  while (q.pop(i))
    sum += i;
}

int main()
{
  std::thread t1{produce};
  std::thread t2{consume};
  std::thread t3{consume};
  t1.join();
  t2.join();
  t3.join();
  consume();
  std::cout << sum << '\n';
}

Beispiel 46.3 führt consume() in zwei Threads aus. Da mehr als ein Thread lesend auf die Queue zugreift, darf nicht mehr der Typ boost::lockfree::spsc_queue verwendet werden. Im Beispielprogramm kommt boost::lockfree::queue zum Einsatz.

Zugriffe auf die Variable sum sind nun außerdem dank std::atomic thread-safe.

Die Größe der Queue ist auf 100 gesetzt – dies ist der Parameter, der an den Konstruktor übergeben wird. Dabei handelt es sich jedoch lediglich um die Anfangsgröße. boost::lockfree::queue ist standardmäßig nicht als Ringspeicher implementiert. Werden der Queue mehr Elemente hinzugefügt als die Kapazität erlaubt, wird die Kapazität automatisch erweitert. boost::lockfree::queue reserviert also dynamisch zusätzlichen Speicherplatz, wenn die Anfangsgröße nicht ausreicht.

Dies bedeutet, dass boost::lockfree::queue nicht zwingend lock-frei ist. Der Allokator, der standardmäßig von boost::lockfree::queue verwendet wird, ist boost::lockfree::allocator. boost::lockfree::allocator wiederum basiert auf std::allocator. Es hängt also von diesem Allokator ab, ob boost::lockfree::queue ohne Einschränkungen lock-frei ist.

Beispiel 46.4. boost::lockfree::queue mit konstanter Container-Größe
#include <boost/lockfree/queue.hpp>
#include <thread>
#include <atomic>
#include <iostream>

using namespace boost::lockfree;

queue<int, fixed_sized<true>> q{10000};
std::atomic<int> sum{0};

void produce()
{
  for (int i = 1; i <= 10000; ++i)
    q.push(i);
}

void consume()
{
  int i;
  while (q.pop(i))
    sum += i;
}

int main()
{
  std::thread t1{produce};
  std::thread t2{consume};
  std::thread t3{consume};
  t1.join();
  t2.join();
  t3.join();
  consume();
  std::cout << sum << '\n';
}

Im Beispiel 46.4 wird die Queue auf eine feste Größe von 10000 Elementen gesetzt. Die Queue reserviert nicht mehr zusätzlich Speicher, wenn sie voll ist. 10000 ist eine feste Obergrenze.

Die Kapazität der Queue ist konstant, weil boost::lockfree::fixed_sized als Template-Parameter übergeben wird. Die Kapazität wird als Parameter an den Konstruktor übergeben und kann jederzeit über die Methode reserve() neu gesetzt werden. Möchten Sie die Kapazität zur Kompilierung angeben, können Sie boost::lockfree::queue auch den Template-Parameter boost::lockfree::capacity übergeben. boost::lockfree::capacity schließt boost::lockfree::fixed_sized ein.

Die Queue im Beispiel 46.4 hat eine Kapazität von 10000 Elementen. Da consume() 10000 Zahlen der Queue hinzufügt, wird die Obergrenze nicht überschritten. Würde die Obergrenze überschritten werden können, würde push() false zurückgeben.

boost::lockfree::queue ähnelt boost::lockfree::spsc_queue und bietet ebenfalls Methoden wie consume_one() und consume_all() an.

Die dritte Klasse boost::lockfree::stack, die von Boost.Lockfree angeboten wird, ist den anderen beiden sehr ähnlich. Wie bei boost::lockfree::queue können boost::lockfree::fixed_size und boost::lockfree::capacity als Template-Parameter angegeben werden. Die Methoden unterscheiden sich ebenfalls kaum.

Aufgabe

Entfernen Sie die Klasse boost::lockfree::spsc_queue aus Beispiel 46.1 und implementieren Sie das Programm mit std::queue.