STL Concurrency

Further Reading可看"C++ Concurrency in Action"。

The High-Level Interface: async() and Futures

  • async() provides an interface to let a piece of functionality, a callable object run in the background as a separate thread, if possible.
  • class future<> allows you to wait for the thread to be finished and provides access to its outcome: return value or exception, if any.

A First Example Using async() and Futures

1
2
3
4
5
6
...
// start func1() asynchronously (now or later or never)
std::future<int> result1(std::async(func1));
int result2 = func2(); // call func2() synchronously (here and now)
// print result (wait for func1() to finish and add its result to result2)
int result = result1.get() + result2;

如果支持,async()中的任务会立刻执行;如果环境不支持,默认情况下async()中定义的任务会被搁置,直到直接调用需要结果时才执行,也就是调用get()的时候。
尽量让调用async()和调用get()之间的距离更远。Call early and return late.
为一个future<>只能调用一次get(),之后这个future就处于不可用状态了,可用valid()查看。
有launch policy。

An Example of Waiting for Two Tasks

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
#include <future>
#include <thread>
#include <chrono>
#include <random>
#include <iostream>
#include <exception>
using namespace std;

void doSomething(char c)
{

// random-number generator (use c as seed to get different sequences)
default_random_engine dre(c);
uniform_int_distribution<int> id(10, 1000);

// loop to print character after a random period of time
for (int i = 0; i < 10; ++i)
{
this_thread::sleep_for(chrono::milliseconds(id(dre)));
cout.put(c).flush();
}
}

int main()
{

cout << "starting 2 operations asynchronously" << endl;

// start two loops in the background printing characters . or +
auto f1 = async([] {doSomething('.'); });
auto f2 = async([] {doSomething('+'); });

// if at least one of the background tasks is running
if (f1.wait_for(chrono::seconds(0)) != future_status::deferred ||
f2.wait_for(chrono::seconds(0)) != future_status::deferred)
{
// poll until at least one of the loops finished
while (f1.wait_for(chrono::seconds(0)) != future_status::ready &&
f2.wait_for(chrono::seconds(0)) != future_status::ready)
{
// ...
this_thread::yield(); // hint to reschedule to the next thread
}
}
cout.put('\n').flush();

// wait for all loops to be finished and process any exception
// or none of them was started
try {
f1.get();
f2.get();
}
catch (const exception& e) {
cout << "\nEXCEPTION: " << e.what() << endl;
}
cout << "\ndone" << endl;
}

向async()里传参不要是外部变量的引用,因为可能此thread还没执行完,该变量的生命周期就结束了。所以,永远以值传参,不可用引用。

Shared Futures

std::shared_future允许多次调用get()。
Example:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
#include <future>
#include <thread>
#include <iostream>
#include <exception>
#include <stdexcept>
using namespace std;

int queryNumber()
{

// read number
cout << "read number: ";
int num;
cin >> num;

// throw exception if none
if (!cin)
{
throw runtime_error("no number read");
}
return num;
}

void doSomething(char c, shared_future<int> f)
{

try {
// wait for number of characters to print
int num = f.get(); // get result of queryNumber()

for (int i = 0; i < num; ++i)
{
this_thread::sleep_for(chrono::milliseconds(100));
cout.put(c).flush();
}
}
catch (const exception& e) {
cerr << "EXCEPTION in thread " << this_thread::get_id() << ": " << e.what() << endl;
}
}

int main()
{

try {
// start one thread to query a number
shared_future<int> f = async(queryNumber());

// start three threads each processing this number in a loop
auto f1 = async(launch::async, doSomething, '.', f);
auto f2 = async(launch::async, doSomething, '+', f);
auto f3 = async(launch::async, doSomething, '*', f);

// wait for all loops to be finished
f1.get();
f2.get();
f3.get();
}
catch (const exception& e) {
cout << "\nEXCEPTION: " << e.what() << endl;
}
cout << "\ndone" << endl;
}

The Low-Level Interface: Threads and Promises

Class std::thread

1
2
3
void doSomething();
std::thread t(doSomething); // start doSomething() in the background
t.join(); // wait for t to finish (block until doSomething() ends)

与async()的不同之处是:

  • thread不支持launch policy。总是试图立即启动,不支持的话就抛出std::system_error,code是resource_unavailable_try_again。
  • 没有取得thread运行结果的接口,只能拿到threadID。
  • 如果thread有异常并没在内部catch的话,程序立刻调用std::terminate()进行abort。如果想把exception传到外部要用exception_ptrs。
  • 必须选择是等待thread执行结束(join),还是让它背后执行不管了(detach)。如果不在thread生命期完结前指定,就abort给你看。
  • 如果选择让thread背后执行,main()结束的时候,所有thread也都骤然终结。

Example:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
#include <thread>
#include <chrono>
#include <random>
#include <iostream>
#include <exception>
using namespace std;

void doSomething(int num, char c)
{

try {
// random-number generator (use c as seed to get different sequences)
default_random_engine dre(42 * c);
uniform_int_distribution<int> id(10, 1000);
for (int i = 0; i < num; ++i) {
this_thread::sleep_for(chrono::milliseconds(id(dre)));
cout.put(c).flush();
// ...
}
}
// make sure no exception leaves the thread and terminates the program
catch (const exception& e) {
cerr << "THREAD-EXCEPTION (thread "
<< this_thread::get_id() << "): " << e.what() << endl;
}
catch (...) {
cerr << "THREAD-EXCEPTION (thread "
<< this_thread::get_id() << ")" << endl;
}
}
int main()
{

try {
thread t1(doSomething, 5, '.'); // print five dots in separate thread
cout << "- started fg thread " << t1.get_id() << endl;
// print other characters in other background threads
for (int i = 0; i < 5; ++i) {
thread t(doSomething, 10, 'a' + i); // print 10 chars in separate thread
cout << "- detach started bg thread " << t.get_id() << endl;
t.detach(); // detach thread into the background
}
cin.get(); // wait for any input (return)
cout << "- join fg thread " << t1.get_id() << endl;
t1.join(); // wait for t1 to finish
}
catch (const exception& e) {
cerr << "EXCEPTION: " << e.what() << endl;
}
}

最好不要使用detached threads。
对于thread ID唯一合法的操作就是比较和输出。

Promises

std::promise对应future物体,可以暂时存储一个shared state。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
#include <thread>
#include <future>
#include <iostream>
#include <string>
#include <exception>
#include <stdexcept>
#include <functional>
#include <utility>
void doSomething(std::promise<std::string>& p)
{
try {
// read character and throw exceptiopn if 'x'
std::cout << "read char ('x' for exception): ";
char c = std::cin.get();
if (c == 'x') {
throw std::runtime_error(std::string("char ") + c + " read");
}
// ...
std::string s = std::string("char ") + c + " processed";
p.set_value(std::move(s)); // store result
}
catch (...) {
p.set_exception(std::current_exception()); // store exception
}
}
int main()
{
try {
// start thread using a promise to store the outcome
std::promise<std::string> p;
std::thread t(doSomething, std::ref(p));
t.detach();
...
// create a future to process the outcome
std::future<std::string> f(p.get_future());
// process the outcome
std::cout << "result: " << f.get() << std::endl;
}
catch (const std::exception& e) {
std::cerr << "EXCEPTION: " << e.what() << std::endl;
}
catch (...) {
std::cerr << "EXCEPTION " << std::endl;
}
}

Class packaged_task<>

1
2
3
4
5
6
7
double compute(int x, int y);
std::packaged_task<double(int, int)> task(compute); // create a task
std::future<double> f = task.get_future(); // get its future
// ...
task(7, 5); // start the task (typically in a separate thread)
// ...
double res = f.get(); // wait for its end and process result/exception

Starting a Thread in Detail

Layers of Thread Interfaces

async() in Detail

Synchronizing Threads, or the Problem of Concurrency

使用multiple threads不可避免要用到concurrent data access,此时容易出问题。
data race: two conflicting actions in different threads, at least one of which is not atomic, and neither happens before the other.
除非明确说明,STL并不保证线程安全。

一个原则:
The only safe way to concurrently access the same data by multiple threads without synchronization is when ALL threads only READ the data.

可能的问题:

  • Unsynchronized data access: When two threads running in parallel read and write the same data, it is open which statement comes first.
  • Half-written data: When one thread reads data, which another thread modifies, the reading thread might even read the data in the middle of the write of the other thread, thus reading neither the old nor the new value.
  • Reordered statements: Statements and operations might be reordered so that the behavior of each single thread is correct, but in combination of all threads, expected behavior is broken. (compiler and/or the hardware might reorder the statements.)

解决问题的方法:

  • Atomicity: This means that read or write access to a variable or to a sequence of statements happens exclusively and without any interruption, so that one thread can’t read intermediate states caused by another thread.
  • Order: We need some ways to guarantee the order of specific statements or of a group of specific statements.

STL中可以用到的feature有:

  • futures和promises保证atomicity和order。
  • 使用mutexes和locks来处理critical sections或protected zones。lock可以保证atomicity。The release of a lock object acquired by one thread is guaranteed to happen before the acquisition of the same lock object by another thread is sucessful.
  • 用condition variables来allow one thread to wait for some predicate controlled by another thread to become true.有助于解决order问题。
  • 可使用atomic data types来保证每次access一个变量都是atomic的。
  • 可以使用low-level interface of atomic data types, which allow experts to relax the order of atomic statements or to use manual barriers for memory access (so-called fences).

Mutexes and Locks

mutex, or mutual exclusion, is an object that helps to control the concurrent access of a resource by providing exclusive access to it. To get exclusive access to the resource, the corresponding thread locks the mutex, which
prevents other threads from locking that mutex until the first thread unlocks the mutex.

Using Mutexes and Locks

防止异常发生时一个锁会永远锁上,采用RAII principle (Resource Acquisition Is Initialization), whereby the constructor acquires a resource so that the destructor, which is always called even when an exception causes the end of the lifetime, releases the resource automatically. 所以STL提供了std::lock_guard。
单独添加大括号,保证锁能及时解开。

1
2
3
4
5
6
7
8
9
10
11
12
int val;
std::mutex valMutex; // control exclusive access to val
// ...
{
std::lock_guard<std::mutex> lg(valMutex); // lock and automatically unlock
if (val >= 0) {
f(val); // val is positive
}
else {
f(-val); // pass negated negative val
}
} // ensure that lock gets released here

一个简单例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#include <future>
#include <mutex>
#include <iostream>
#include <string>
std::mutex printMutex; // enable synchronized output with print()
void print(const std::string& s)
{

std::lock_guard<std::mutex> l(printMutex);
for (char c : s) {
std::cout.put(c);
}
std::cout << std::endl;
}
int main()
{

auto f1 = std::async(std::launch::async,
print, "Hello from a first thread");
auto f2 = std::async(std::launch::async,
print, "Hello from a second thread");
print("Hello from the main thread");
}

Recursive Locks

普通的mutex可能会出现这种死锁情况,第一个函数上了这个锁,其所调用的第二个函数又要上这个锁,然后就死锁了,因为只有第一个锁解开后才能重新锁上。此时可用recursive_mutex,允许同一线程内多次上同一个锁,最后一个unlock时就解锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class DatabaseAccess
{
private:
std::recursive_mutex dbMutex;
... // state of database access
public:
void insertData(...)
{

std::lock_guard<std::recursive_mutex> lg(dbMutex);
...
}
void insertData(...)
{

std::lock_guard<std::recursive_mutex> lg(dbMutex);
...
}
void createTableAndinsertData(...)
{

std::lock_guard<std::recursive_mutex> lg(dbMutex);
...
createTable(...); // OK: no deadlock
}
...
};

Tried and Timed Locks

试着锁一下,不行就拉倒。可用try_lock()

1
2
3
4
5
6
std::mutex m;
// try to acquire a lock and do other stuff while this isn’t possible
while (m.try_lock() == false) {
doSomeOtherStuff();
}
std::lock_guard<std::mutex> lg(m, std::adopt_lock);

可以计时。

1
2
3
4
5
6
7
8
9
std::timed_mutex m;
// try for one second to acquire a lock
if (m.try_lock_for(std::chrono::seconds(1))) {
std::lock_guard<std::timed_mutex> lg(m, std::adopt_lock);
...
}
else {
couldNotGetTheLock();
}

Dealing with Multiple Locks

一个线程内要上多个锁时。可以用global std::lock()。

1
2
3
4
5
6
7
8
9
std::mutex m1;
std::mutex m2;
...
{
std::lock(m1, m2); // lock both mutexes (or none if not possible)
std::lock_guard<std::mutex> lockM1(m1, std::adopt_lock);
std::lock_guard<std::mutex> lockM2(m2, std::adopt_lock);
...
} // automatically unlock all mutexes

或者也先try一下。

1
2
3
4
5
6
7
8
9
10
11
12
std::mutex m1;
std::mutex m2;
int idx = std::try_lock(m1, m2); // try to lock both mutexes
if (idx < 0) { // both locks succeeded
std::lock_guard<std::mutex> lockM1(m1, std::adopt_lock);
std::lock_guard<std::mutex> lockM2(m2, std::adopt_lock);
...
} // automatically unlock all mutexes
else {
// idx has zero-based index of first failed lock
std::cerr << "could not lock mutex m" << idx + 1 << std::endl;
}

Class unique_lock