async() provides an interface to let a piece of functionality, a callable object run in the background as a separate thread, if possible.
class future<> allows you to wait for the thread to be finished and provides access to its outcome: return value or exception, if any.
A First Example Using async() and Futures
1 2 3 4 5 6
... // start func1() asynchronously (now or later or never) std::future<int> result1(std::async(func1)); int result2 = func2(); // call func2() synchronously (here and now) // print result (wait for func1() to finish and add its result to result2) int result = result1.get() + result2;
如果支持,async()中的任务会立刻执行;如果环境不支持,默认情况下async()中定义的任务会被搁置,直到直接调用需要结果时才执行,也就是调用get()的时候。
尽量让调用async()和调用get()之间的距离更远。Call early and return late.
为一个future<>只能调用一次get(),之后这个future就处于不可用状态了,可用valid()查看。
有launch policy。
voiddoSomething(char c) { // random-number generator (use c as seed to get different sequences) default_random_engine dre(c); uniform_int_distribution<int> id(10, 1000);
// loop to print character after a random period of time for (int i = 0; i < 10; ++i) { this_thread::sleep_for(chrono::milliseconds(id(dre))); cout.put(c).flush(); } }
// start two loops in the background printing characters . or + auto f1 = async([] {doSomething('.'); }); auto f2 = async([] {doSomething('+'); });
// if at least one of the background tasks is running if (f1.wait_for(chrono::seconds(0)) != future_status::deferred || f2.wait_for(chrono::seconds(0)) != future_status::deferred) { // poll until at least one of the loops finished while (f1.wait_for(chrono::seconds(0)) != future_status::ready && f2.wait_for(chrono::seconds(0)) != future_status::ready) { // ... this_thread::yield(); // hint to reschedule to the next thread } } cout.put('\n').flush();
// wait for all loops to be finished and process any exception // or none of them was started try { f1.get(); f2.get(); } catch (const exception& e) { cout << "\nEXCEPTION: " << e.what() << endl; } cout << "\ndone" << endl; }
intqueryNumber() { // read number cout << "read number: "; int num; cin >> num;
// throw exception if none if (!cin) { throw runtime_error("no number read"); } return num; }
voiddoSomething(char c, shared_future<int> f) { try { // wait for number of characters to print int num = f.get(); // get result of queryNumber()
for (int i = 0; i < num; ++i) { this_thread::sleep_for(chrono::milliseconds(100)); cout.put(c).flush(); } } catch (const exception& e) { cerr << "EXCEPTION in thread " << this_thread::get_id() << ": " << e.what() << endl; } }
intmain() { try { // start one thread to query a number shared_future<int> f = async(queryNumber());
// start three threads each processing this number in a loop auto f1 = async(launch::async, doSomething, '.', f); auto f2 = async(launch::async, doSomething, '+', f); auto f3 = async(launch::async, doSomething, '*', f);
// wait for all loops to be finished f1.get(); f2.get(); f3.get(); } catch (const exception& e) { cout << "\nEXCEPTION: " << e.what() << endl; } cout << "\ndone" << endl; }
The Low-Level Interface: Threads and Promises
Class std::thread
1 2 3
voiddoSomething(); std::thread t(doSomething); // start doSomething() in the background t.join(); // wait for t to finish (block until doSomething() ends)
#include <thread> #include <future> #include <iostream> #include <string> #include <exception> #include <stdexcept> #include <functional> #include <utility> void doSomething(std::promise<std::string>& p) { try { // read character and throw exceptiopn if 'x' std::cout << "read char ('x' for exception): "; char c = std::cin.get(); if (c == 'x') { throw std::runtime_error(std::string("char ") + c + " read"); } // ... std::string s = std::string("char ") + c + " processed"; p.set_value(std::move(s)); // store result } catch (...) { p.set_exception(std::current_exception()); // store exception } } int main() { try { // start thread using a promise to store the outcome std::promise<std::string> p; std::thread t(doSomething, std::ref(p)); t.detach(); ... // create a future to process the outcome std::future<std::string> f(p.get_future()); // process the outcome std::cout << "result: " << f.get() << std::endl; } catch (const std::exception& e) { std::cerr << "EXCEPTION: " << e.what() << std::endl; } catch (...) { std::cerr << "EXCEPTION " << std::endl; } }
Class packaged_task<>
1 2 3 4 5 6 7
double compute(int x, int y); std::packaged_task<double(int, int)> task(compute); // create a task std::future<double> f = task.get_future(); // get its future // ... task(7, 5); // start the task (typically in a separate thread) // ... double res = f.get(); // wait for its end and process result/exception
Starting a Thread in Detail
async() in Detail
Synchronizing Threads, or the Problem of Concurrency
使用multiple threads不可避免要用到concurrent data access,此时容易出问题。
data race: two conflicting actions in different threads, at least one of which is not atomic, and neither happens before the other.
除非明确说明,STL并不保证线程安全。
一个原则: The only safe way to concurrently access the same data by multiple threads without synchronization is when ALL threads only READ the data.
可能的问题:
Unsynchronized data access: When two threads running in parallel read and write the same data, it is open which statement comes first.
Half-written data: When one thread reads data, which another thread modifies, the reading thread might even read the data in the middle of the write of the other thread, thus reading neither the old nor the new value.
Reordered statements: Statements and operations might be reordered so that the behavior of each single thread is correct, but in combination of all threads, expected behavior is broken. (compiler and/or the hardware might reorder the statements.)
解决问题的方法:
Atomicity: This means that read or write access to a variable or to a sequence of statements happens exclusively and without any interruption, so that one thread can’t read intermediate states caused by another thread.
Order: We need some ways to guarantee the order of specific statements or of a group of specific statements.
STL中可以用到的feature有:
futures和promises保证atomicity和order。
使用mutexes和locks来处理critical sections或protected zones。lock可以保证atomicity。The release of a lock object acquired by one thread is guaranteed to happen before the acquisition of the same lock object by another thread is sucessful.
用condition variables来allow one thread to wait for some predicate controlled by another thread to become true.有助于解决order问题。
可使用atomic data types来保证每次access一个变量都是atomic的。
可以使用low-level interface of atomic data types, which allow experts to relax the order of atomic statements or to use manual barriers for memory access (so-called fences).
Mutexes and Locks
mutex, or mutual exclusion, is an object that helps to control the concurrent access of a resource by providing exclusive access to it. To get exclusive access to the resource, the corresponding thread locks the mutex, which
prevents other threads from locking that mutex until the first thread unlocks the mutex.
Using Mutexes and Locks
防止异常发生时一个锁会永远锁上,采用RAII principle (Resource Acquisition Is Initialization), whereby the constructor acquires a resource so that the destructor, which is always called even when an exception causes the end of the lifetime, releases the resource automatically. 所以STL提供了std::lock_guard。
单独添加大括号,保证锁能及时解开。
1 2 3 4 5 6 7 8 9 10 11 12
int val; std::mutex valMutex; // control exclusive access to val // ... { std::lock_guard<std::mutex> lg(valMutex); // lock and automatically unlock if (val >= 0) { f(val); // val is positive } else { f(-val); // pass negated negative val } } // ensure that lock gets released here
#include<future> #include<mutex> #include<iostream> #include<string> std::mutex printMutex; // enable synchronized output with print() voidprint(conststd::string& s) { std::lock_guard<std::mutex> l(printMutex); for (char c : s) { std::cout.put(c); } std::cout << std::endl; } intmain() { auto f1 = std::async(std::launch::async, print, "Hello from a first thread"); auto f2 = std::async(std::launch::async, print, "Hello from a second thread"); print("Hello from the main thread"); }
class DatabaseAccess { private: std::recursive_mutex dbMutex; ... // state of database access public: voidinsertData(...) { std::lock_guard<std::recursive_mutex> lg(dbMutex); ... } voidinsertData(...) { std::lock_guard<std::recursive_mutex> lg(dbMutex); ... } voidcreateTableAndinsertData(...) { std::lock_guard<std::recursive_mutex> lg(dbMutex); ... createTable(...); // OK: no deadlock } ... };
Tried and Timed Locks
试着锁一下,不行就拉倒。可用try_lock()
1 2 3 4 5 6
std::mutex m; // try to acquire a lock and do other stuff while this isn’t possible while (m.try_lock() == false) { doSomeOtherStuff(); } std::lock_guard<std::mutex> lg(m, std::adopt_lock);
可以计时。
1 2 3 4 5 6 7 8 9
std::timed_mutex m; // try for one second to acquire a lock if (m.try_lock_for(std::chrono::seconds(1))) { std::lock_guard<std::timed_mutex> lg(m, std::adopt_lock); ... } else { couldNotGetTheLock(); }
Dealing with Multiple Locks
一个线程内要上多个锁时。可以用global std::lock()。
1 2 3 4 5 6 7 8 9
std::mutex m1; std::mutex m2; ... { std::lock(m1, m2); // lock both mutexes (or none if not possible) std::lock_guard<std::mutex> lockM1(m1, std::adopt_lock); std::lock_guard<std::mutex> lockM2(m2, std::adopt_lock); ... } // automatically unlock all mutexes
或者也先try一下。
1 2 3 4 5 6 7 8 9 10 11 12
std::mutex m1; std::mutex m2; int idx = std::try_lock(m1, m2); // try to lock both mutexes if (idx < 0) { // both locks succeeded std::lock_guard<std::mutex> lockM1(m1, std::adopt_lock); std::lock_guard<std::mutex> lockM2(m2, std::adopt_lock); ... } // automatically unlock all mutexes else { // idx has zero-based index of first failed lock std::cerr << "could not lock mutex m" << idx + 1 << std::endl; }