Practical multithreading
In computer science, a thread of execution is a sequence of code instructions that can be managed independently by a scheduler of the operating system. On a Linux system, the thread is always part of a process. The C++ threads could be executed concurrently with each other via the multithreading capabilities provided by the standard. During execution, threads share common memory space, unlike processes, where each has its own. Specifically, the threads of a process share its executable code, the dynamically and globally allocated objects, which are not defined as thread_local
.
Hello C++ jthread
Every C++ program contains at least one thread, and this is the thread that runs the int main()
method. Multithreaded programs have additional threads started at some point in the execution of the main thread. Let’s have a look at a simple C++ program that uses multiple threads to print to the standard output:
#include <iostream> #include <thread> #include <syncstream> #include <array> int main() { std::array<std::jthread, 5> my_threads; // Just an array of 5 jthread objects which do nothing. const auto worker{[]{ const auto thread_id = std:: this_thread::get_id(); // 3 std::osyncstream sync_cout{std::cout}; sync_cout << "Hello from new jthread with id:" << thread_id << '\n'; }}; for (auto& thread : my_threads) { thread = std::jthread{worker}; // This moves the new jthread on the place of the placeholder } std::osyncstream{std::cout} << "Hello Main program thread with id:" << std::this_thread::get_id() << '\n'; return 0; // jthread dtors join them here. }
When the program starts, the int main()
method is entered. Nothing surprising so far. At the beginning of the execution, we create a variable on the method stack, called my_threads
. It is a type of std::array
, which contains five elements in it. The std::array
type represents a container from the Standard Library, encapsulating C-style, fixed-sized arrays. It has the advantages of a standard container, such as being aware of its own size, supporting assignment, random access iterators, and so on. As with any other array type in C++, we need to specify what kind of elements it contains. In our example, my_threads
contains five std::jthread
objects. The std::jthread
class was introduced in the C++ Standard Library with the C++20 standard release. It represents a single thread of execution, just like std::thread
, which was introduced with the release of C++11. Some advantages of std::jthread
compared to std::thread
are that it automatically rejoins on destruction and it can be canceled or stopped in some specific cases. It is defined in the <thread>
header; therefore, we must include it in order to compile successfully.
Yes, you are asking the right question! If we already defined an array of jthread
objects, what job do they really perform? The expectation is that every thread is associated with some job that needs to be done. But here, the simple answer is nothing. Our array contains five jthread
objects, which don’t actually represent an execution thread. They are used more like placeholders because, when std::array
is instantiated, it also creates the objects it contains using their default constructors if no other arguments are passed.
Let’s now define some workers that our threads can be associated with. The std::jthread
class accepts, as a worker, any callable type. Such types provide a single operation that can be invoked. Widely known examples of such types are function objects and lambda expressions, which we already covered in detail in Chapter 4. In our example, we will use lambda expressions because they provide a way of creating anonymous function objects (functors) that can be utilized in-line or passed as an argument. The introduction of lambda expressions in C++11 simplifies the process of creating anonymous functors, making it more efficient and straightforward. The following code shows our worker method defined as a lambda expression:
const auto worker{[]{ const auto thread_id = std::this_thread::get_id(); std::osyncstream sync_cout{std::cout}; sync_cout << "Hello from new jthread with id:" << thread_id << '\n'; }};
The defined lambda expression, const auto worker{…};
, is pretty simple. It is instantiated on the function stack. It has no input parameters, and it doesn’t capture any state from outside. The only work it does is to print to the standard output the jthread
object’s ID. Every thread in C++ provided by the standard concurrency support library has a unique identifier associated with it. The std::this_thread::get_id()
method returns the ID of the specific thread in which it has been invoked. This means that if this lambda expression is passed to several different threads, it should print a different thread ID.
Printing to std::cout
by many concurrent threads could bring surprising results. The std::cout
object is defined as a global, thread-safe object, which ensures that each character written to it is done so atomically. However, no guarantees are made about a sequence of characters such as strings, and it is likely that the output when multiple threads are concurrently writing strings to std::cout
will be a mixture of these strings. Well, this is not what we really want here. We expect that each thread will be able to fully print its messages. Therefore, we need a synchronization mechanism, which ensures that writing a string to std::cout
is fully atomic. Luckily, C++20 introduces a whole new family of class templates defined in the <syncstream>
standard library header, which provides mechanisms to synchronize threads writing to one and the same stream. One of them is std::osyncstream
. You can use it as a regular stream. Just create an instance of it by passing std::cout
as a parameter. Then, with the help of its std::basic_ostream& operator<<(...)
class method, you can insert data, just like a regular stream. It is guaranteed that all of the inserted data will be flushed atomically to the output once the std::osyncstream
object goes out of scope and is destroyed. In our example, the sync_cout
object will be destroyed when the lambda is about to finish its execution and leave its scope. This is exactly the behavior we want.
Finally, we are ready to give some work to our threads to do. This means that we need to associate worker lambdas with the five threads we have in the my_threads
array. But the std::jthread
type supports adding a worker method only as part of its construction. That’s why we need to create other jthread
objects and replace them with the placeholders in the my_threads
array:
for (auto& thread : my_threads) { thread = jthread{worker}; // This moves the new jthread on the place of the placeholder }
Being a standard container, std::array
natively supports range-based for loops. Therefore, we can easily iterate through all elements in my_threads
and replace them with new jthread
objects that already have associated workers with them. Firstly, we create new jthread
objects with automatic storage duration and assign them a worker object. In our case, for every newly created thread, we assign one and the same worker object. This is possible because, in the current case, the jthread
class makes a copy of the worker instance in the jthread
objects and, therefore, each jthread
object gets its own copy of the worker lambda. When constructing these objects, the process is carried out within the context of the caller. This means that any exceptions that occur during the evaluation and copying or movement of the arguments are thrown in the current main
thread.
An important detail is that the newly created jthread
objects are not copied to the existing elements of the array, but they are moved. Therefore, the std::jthread
class has implicitly deleted its copy constructor and assignment operator because it doesn’t make much sense to copy a thread to an already existing thread. In our case, the newly created jthread
objects will be created in the storage of the existing array elements.
When a jthread
object is constructed, the associated thread starts immediately, although there may be some delays due to Linux scheduling specifics. The thread begins executing at the function specified as an argument to the constructor. In our example, this is the worker lambda associated with each thread. If the worker returns a result, it will be ignored, and if it ends by throwing an exception, the std::terminate
function is executed. Therefore, we need to make sure that either our worker code doesn’t throw or we catch everything throwable.
When a thread is started, it begins executing its dedicated worker. Each thread has its own function stack space, which guarantees that any local variable defined in the worker will have a separate instance per thread. Therefore, const auto thread_id
in the worker is initialized with a different ID depending on the thread it is run by. We do not need to take any precautions to ensure that the data stored in thread_id
is consistent. It is guaranteed by the Standard that data with automatic storage duration is not shared between the threads.
Once all the jthread
objects have been created, the main
thread concurrently prints its ID along with the rest of the threads. There is no guaranteed order of execution for each thread, and it is possible for one thread to be interrupted by another. As a result, it is important to ensure that the code is written in a manner that can handle potential preemption and remains robust in all scenarios:
std::osyncstream{std::cout} << "Hello Main program thread with id:" << std::this_thread::get_id() << '\n';
All threads are now running concurrently with the main
thread. We need to make sure that the main
thread is also printing to the standard output in a thread-safe manner. We again use an instance of std::osyncstream
, but this time, we don’t create a named variable – instead, we create a temporary one. This approach is favored due to its ease of use, similar to using the std::cout
object. The standard guarantees that the output will be flushed at the end of each statement, as the temporary ones persist until the end of the statement and their destructor is invoked, resulting in the flushing of the output.
Here is a sample output from the program:
Hello from new jthread with id:1567180544 Hello from new jthread with id:1476392704 Hello from new jthread with id:1468000000 Hello Main program thread with id:1567184704 Hello from new jthread with id:1558787840 Hello from new jthread with id:1459607296
The std::jthread
name refers to a joining thread. Unlike std::thread
, std::jthread
also has the ability to automatically join the thread that it has been started by. The behavior of std::thread
can be confusing at times. If std::thread
has not been joined or detached, and it is still considered joinable, the std::terminate
function will be called upon its destruction. A thread is considered joinable if neither the join()
nor the detach()
method has been called. In our example, all the jthread
objects automatically join during their destruction and do not result in the termination of the program.
Canceling threads – is this really possible?
Before C++ 20 was released, this wasn’t quite possible. It was not guaranteed that std::thread
was stoppable in the sense that there wasn’t a standard utility to halt the thread’s execution. Different mechanisms were used instead. Stopping std::thread
required cooperation between the main
and worker threads, typically using a flag or atomic variable or some kind of messaging system.
With the release of C++20, there is now a standardized utility for requesting std::jthread
objects to stop their execution. The stop tokens come to help. Looking at the C++ standard reference page about the definition of std::jthread
(https://en.cppreference.com/w/cpp/thread/jthread), we find the following:
We already saw that jthread
objects automatically join on destruction, but what about canceling/stopping and what does “certain situations” mean? Let’s dig deeper into this.
First of all, don’t expect that std::jthread
exposes some magical mechanism, some red button that stops the running thread when it is pressed. It is always a matter of implementation, how exactly your worker function is implemented. If you want your thread to be cancelable, you have to make sure that you have implemented it in the right way in order to allow cancellation:
#include <iostream> #include <syncstream> #include <thread> #include <array> using namespace std::literals::chrono_literals; int main() { const auto worker{[](std::stop_token token, int num){ // {1} while (!token.stop_requested()) { // {2} std::osyncstream{std::cout} << "Thread with id " << num << " is currently working.\n"; std::this_thread::sleep_for(200ms); } std::osyncstream{std::cout} << "Thread with id " << num << " is now stopped!\n"; }}; std::array<std::jthread, 3> my_threads{ std::jthread{worker, 0}, std::jthread{worker, 1}, std::jthread{worker, 2} }; // Give some time to the other threads to start executing … std::this_thread::sleep_for(1s); // 'Let's stop them for (auto& thread : my_threads) { thread.request_stop(); // {3} - this is not a blocking call, it is just a request. } std::osyncstream{std::cout} < "Main thread just requested stop!\n"; return 0; // jthread dtors join them here. }
Looking at the definition of our worker lambda function, we observe that it is now slightly reworked (marker {1}
). It accepts two new parameters – std::stop_token token
and int num
. The stop token reflects the shared stop state that a jthread
object has. If the worker method accepts many parameters, then the stop token must always be the first parameter passed.
It is imperative to ensure that the worker method is designed to be able to handle cancellation. This is what the stop token is used for. Our logic should be implemented in such a way that it regularly checks whether a stop request has been received. This is done with a call to the stop_requested()
method of the std::stop_token
object. Every specific implementation decides where and when these checks are to be done. If the code doesn’t respect the stop token state, then the thread can’t be canceled gracefully. So, it’s up to you to correctly design your code.
Luckily, our worker lambda respects the state of the thread’s stop token. It continuously checks whether a stop is requested (marker {2}
). If not, it prints the thread’s ID and goes to sleep for 200ms
. This loop continues until the parent thread decides to send stop requests to its worker threads (marker {3}
). This is done by invoking the request_stop()
method of the std::jthread
object.
Here is the output of the program:
Thread with id 0 is currently working. Thread with id 1 is currently working. Thread with id 2 is currently working. Thread with id 1 is currently working. Thread with id 2 is currently working. Thread with id 0 is currently working. Thread with id 1 is currently working. Thread with id 2 is currently working. Thread with id 0 is currently working. Thread with id 2 is currently working. Thread with id 1 is currently working. Thread with id 0 is currently working. Thread with id 1 is currently working. Thread with id 0 is currently working. Thread with id 2 is currently working. Main thread just requested stop! Thread with id 1 is now stopped! Thread with id 0 is now stopped! Thread with id 2 is now stopped!
Now that we know how we can stop the execution of a specific std::jthread
using std::stop_token
, let’s see how we can stop the execution of multiple std::jthread
objects using a single stop source.
std::stop_source
The std::stop_source
class enables you to signal a cancellation request for std::jthread
. When a stop request is issued through a stop_source
object, it becomes visible to all other stop_source
and std::stop_token
objects associated with the same stop state. You just need to signal it, and any thread worker that consumes it will be notified.
By utilizing std::stop_token
and std::stop_source
, threads can signal or check for a request to stop their execution asynchronously. The request to stop is made through std::stop_source
, which affects all related std::stop_token
objects. These tokens can be passed to the worker functions and used to monitor stop requests. Both std::stop_source
and std::stop_token
share ownership of the stop state. The method of the std::stop_source
class – request_stop()
– and the methods in std::stop_token
– stop_requested()
and stop_possible()
– are all atomic operations to ensure that no data race will occur.
Let’s have a look at how our previous example could be reworked with the help of the stop tokens:
#include <iostream> #include <syncstream> #include <thread> #include <array> using namespace std::literals::chrono_literals; int main() { std::stop_source source; const auto worker{[](std::stop_source sr, int num){ std::stop_token token = sr.get_token(); while (!token.stop_requested()) { std::osyncstream{std::cout} << "Thread with id " << num << " is currently working.\n"; std::this_thread::sleep_for(200ms); } std::osyncstream{std::cout} << "Thread with id " << num << " is now stopped!\n"; }}; std::array<std::jthread, 3> my_threads{ std::jthread{worker, source, 0}, std::jthread{worker, source, 1}, std::jthread{worker, source, 2} }; std::this_thread::sleep_for(1s); source.request_stop(); // this is not a blocking call, it is just a request. {1} Std::osyncstream{std::cout} << "Main thread just requested stop!\n"; return 0; // jthread dtors join them here. }
The main
method starts with the declaration of the std::stop_source
source, which will be used by the main
thread to signal all child worker threads and request them to stop. The worker lambda is slightly reworked in order to accept std::stop_source sr
as an input. This is in fact the communication channel through which the worker is notified for a stop request. The std::stop_source
object is copied in all workers associated with the started threads.
Rather than iterating through all the threads and invoking on each of them a stop request, the only operation that we need to invoke is to directly call request_stop()
on the source instance in the main
thread (marker {1}
). This will broadcast stop requests to all workers that consume it.
As the name suggests, the call to the request_stop()
method on the stop source object is just a request rather than a blocking call. So, don’t expect your threads to stop immediately once the call is finished.
Here is the sample output from the program:
Thread with id 0 is currently working. Thread with id 1 is currently working. Thread with id 2 is currently working. Thread with id 1 is currently working. Thread with id 2 is currently working. Thread with id 0 is currently working. Thread with id 1 is currently working. Thread with id 2 is currently working. Thread with id 0 is currently working. Thread with id 1 is currently working. Thread with id 0 is currently working. Thread with id 2 is currently working. Thread with id 1 is currently working. Thread with id 0 is currently working. Thread with id 2 is currently working. Main thread just requested stop! Thread with id 1 is now stopped! Thread with id 0 is now stopped! Thread with id 2 is now stopped!
We are now familiar with two mechanisms for halting thread execution in C++. Now, it’s time to see how we can share data between multiple threads.