《C++标准库(第二版)》第十八章:并发(18.1)
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了《C++标准库(第二版)》第十八章:并发(18.1),小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含11805字,纯文字阅读大概需要17分钟。
内容图文
此章的更多细节,作者推荐《C++ Concurrency in Action》——Anthony Williams一书。
一、高级接口:async()和Future
(1)async()提供一个接口,让一段技能(a piece of functionality)或说一个callable object若是可能的话在后台运行,成为一个独立线程。
(2)Class future<>允许你等待线程结束并获取其结果(一个返回值,或者也许是一个异常)
1.async()和Future第一个用例
假设要计算两个函数返回值的总和,寻常做法如下:
func1()+func2();
这意味着对操作数的处理是循序发生的,先调用func1()再调用func2(),或是颠倒过来(根据语言规则,次序无法预期),总之整体处理时间是两个的总和。但如果尝试并行运行func1()和func2(),整体运行时间只需是“func1()和func2()运行时间中的较大者”加上计算总和的时间。
int doSomething(char c) {
// random-number generator (use c as seed to get different sequences)
std::default_random_engine dre(c);
std::uniform_int_distribution<int> id(10, 1000);
// loop to print character after a random period of time
for (int i = 0; i < 10; ++i) {
this_thread::sleep_for(chrono::milliseconds(id(dre)));
cout.put(c).flush();
}
return c;
}
int func1() {
return doSomething('.');
}
int func2() {
return doSomething('+');
}
int main() {
std::cout << "starting func1() in background"
<< " and func2() in foreground:" << std::endl;
// start func1() asynchronously (now or later or never):
std::future<int> result1(std::async(func1));
int result2 = func2(); // call func2() synchronously (here and now)
// print result (wait for func1() to finish and add its result to result2
int result = result1.get() + result2;
std::cout << "\nresult of func1()+func2(): " << result
<< std::endl;
}
上例说明:
(1)首先使用std::async()"尝试启动"func1()于后台,并将结果赋值给某个std::future object。在这里,async()尝试将其所获得的函数立刻异步启动于一个分离线程内。因此概念上func1()在这里被启动了,不会造成main()停滞,基于两个原因,返回future object是必要的:
<1>允许你取得“传给async()的那个函数”的未来结果——也许是个返回值,也许是个异常。这个future object已受到“被启动函数”返回类型的特化,如果被启动的是个返回“无物”的后台任务(background task),就会是std::future<void>
<2>它必须存在,确保“目标函数”或快或慢终会被调用。此处只是尝试启动,若没发生,后面可以强迫启动(当需要函数的运行结果或确保该函数被执行时)。
(2)此处可用auto
auto result1(std::async(func1));
(3)接下来我们启动func2()于前台(foreground),这是个正常的同步化调用,于是程序在此停滞。如果先前func1()成功的被async()启动且尚未结束,现在func1()和func2()就是并行运作。
(4)然后调用get()获取func1()的返回值。会发生三种情况:
<1>如果func1()被async()启动于一个分离线程中并且已结束,你会立刻获得其结果
<2>如果func1()被启动但尚未结束,get()会引发停滞(block)待func1()结束后获得结果
<3>如果func1()尚未启动,会被强迫启动如同一个同步调用;get()会引发停滞直至产生结果。
(5)上例写法保证了,程序可受益于并行处理(如果底层平台对此有所支持),但仍能够在单线程环境中正确操作。为达此目标,必须:
<1>包含头文件<future>
<2>传递某些可并行执行的函数给std::async(),作为一个可调用对象(callable object)
<3>将结果赋值给一个future<ReturnType> object。
<4>当需要被启动函数的执行结果,或当你想确保该函数结束,就对future<> object调用get()
然而这只适用于不发生data race(数据竞争)的情况下(见18.4.1,982页)。
(6)如果不调用get(),就不保证func1()一定会被调用,即使main()终止造成程序结束,也不会唤醒后台线程。
std::future<int> result1(std::async(func1));
int result=func2()+result1.get() //might call func2() after func1() ends
(7)上面的优化是不可取的。核算顺序不明确。
(8)为了获得最佳效果,一般而言你的目标应该将调用async()和调用get()之间的举例最大化。
(9)如果传给async()的函数不返回任何东西,async()会产出一个future<void>,那是future<>的一个偏特化版,这种情况下get()返回“无物”:
std::future<void> f(std::async(func));
...
f.get();
(10)可传lambda进去
(11)get()只能用一次,后面再用就会出错
2.Launch(发射)策略
(1)也可以强迫async()绝不拖延目标函数的执行,只要明确传入一个launch策略(policy),告诉它当它被调用时应明确地以异步方式启动目标函数。
//force func1() to start asynchronously now or throw std::system_error
std::future<long> result1=std::async(std::launch::async,func1);
如果异步调用在此无法实现,会抛出一个std::system_error异常并带差错码resource_unavailable_try_again,相当于POSIX的errno EAGAIN
有了这个async发射策略,就不必非得调用get()了,因为如果返回的future生命即将结束,这个程序必会等待func1()结束。因此,如果你不调用get(),当离开future object作用域时(此处是指main()结束),程序会等待后台任务(background task)结束。尽管如此,程序结束前调用get()会让行为更加清晰。
如果不将std::async(std::launch::async,..)的结果赋值出去,调用者会在此停滞(block)到目标函数结束,那就相当于一个完完全全的同步调用(synchronous call)。
(2)也可以强制延缓执行(deferred execution),下面的做法允许你延缓func1()直到你对f调用get():
std::future<...> f(std::async(std::launch::deferred,func1)); //defer func1 until get()
这保证func1()绝不会在没有get()(或wait(),见953页)的情况下启动。这个策略的特别在于允许你写出lazy evaluation(缓式求值)。
auto f1=std::async(std::launch::deferred,task1);
auto f2=std::async(std::launch::deferred,task2);
...
auto val=thisOrThatIsTheCase()?f1.get():f2.get();
此外,明确申请deferred发射策略也许有助于在一个单线程环境中模拟async()的行为,或是简化调试——除非需要考虑race condition(竞争形势)
3.处理异常
当get()被调用,且后台操作已经(或随后由于异常)而终止,该异常不会在此线程内被处理,而是会再次被传播出去。因此,欲处理后台操作所产生的异常,只需偕同get()做出“以同步方式调用该操作”所做的相同动作即可。
auto f1(async(func));
try{
f1.get();
}catch(const exception& s){
cerr<<s.what()<<endl;
}
4.等待和轮询(Waiting 和 Polling)
(1)一个future<>只能被调用get()一次。在那之后future就处于无效状态,而这种状态只能借由“对future调用valid()”来检测。此情况下对它的任何调用(析构除外)会导致不可预期的行为(详见18.3.2,975页)
(2)future提供了wait()接口,允许我们等待后台操作完成而不需要处理其结果。这个接口可以被调用一次以上,也可结合一个duration(时间段)或timepoint(时间点)以限制等待时间。
std::future<...> f(std::async(func));
f.wait(); //wait for func to be done(might start background task)
f.wait_for(std::chrono::seconds(10)); //wait at most 10 seconds for func
//try to call func asynchronously
f.wait_until(std::chrono::system_clock::now()+std::chrono::minutes(1));
(3)不论wait_for()或wait_until()都返回以下三种东西之一
<1>std::future_status::deferred——如果async()延缓了操作而程序中又完全没有调用wait()或get()(那会强制启动)。这种情况下上述两个函数都会立刻返回。
<2>std::future_status::timeout——如果某个操作被异步启动但尚未结束,而waiting又已逾期(对于给定的时间段而言)。
<3>std::future_status::ready——如果操作已完成。
(4)wait_for()和wait_until()特别让我们得以写出所谓的speculative execution(投机性运行)。举个例子,我们必须在某个时间段内获得某一运算之尚堪可用的结果(usable result),而如果有精确结果(accurate answer) 更好。
int quickComputation(); //process result "quick and dirty"
int accurateComputation(); //process result "accurate but slow"
//outside declared because lifetime of accurateComputation() might exceed lifetime of bestResultInTime()
std::future<int> f;
int bestResultInTime() {
//define time slot to get the answer
auto tp = std::chrono::system_clock::now() + std::chrono::minutes(1);
//start both a quick and an accurate computation
f = std::async(std::launch::async, accurateComputation);
int guess = quickComputation();
//give accurate computation the rest of the time slot:
std::future_status s = f.wait_until(tp);
//return the best computation result we have:
if (s == std::future_status::ready) {
return f.get();
} else {
return guess; //accurateComputation() continues
}
}
注意,future f 不能是声明于bestResultInTime()内的local对象,那样的话若时间太短以至于无法完成accurateComputation(),future析构函数会停滞(block)直到异步操作结束。
(5)如果传入一个zero时间段,或一个过去时间点,就可以仅轮询(poll)是否有个后台任务已被启动,和/或是否它正在运行中。
auto f(async(task));
//do something while task has not finished(might never happen!)
while(f.wait_for(chrono::second(0))!=future_status::ready)){...}
然而此循环有可能不会结束,因为(例如)在单线程环境中,这一调用将被推迟直至get()被调用。因此,你若非调用async()并以其第一实参指定发射策略为std::launch::async,就该明确检查是否wait_for()返回std::future_status::deferred:
auto f(async(task));
if(f.wait_for(chrono::seconds(0))!=future_status::deferred){
while(f.wait_for(chrono::second(0))!=future_status::ready)){...}
}
auto r=f.get();
(6)引发无限循环的另一个可能原因是,运行此循环的线程完全占用处理器(processor),其他线程无法获得丝毫时间来备妥future。这会巨幅降低程序速度。最简单的修正就是在循环内调用yield()(见18.3.7,981页),或是睡眠一小段时间
std::this_thread::yield() //hint to reschedule to the next thread
二、实例:等待两个Task
void doSomething (char c)
{
// random-number generator (use c as seed to get different sequences)
default_random_engine dre(c);
uniform_int_distribution<int> id(10,1000);
// loop to print character after a random period of time
for (int i=0; i<10; ++i) {
this_thread::sleep_for(chrono::milliseconds(id(dre)));
cout.put(c).flush();
}
}
int main()
{
cout << "starting 2 operations asynchronously" << endl;
// start two loops in the background printing characters . or +
auto f1 = async([]{ doSomething('.'); });
auto f2 = async([]{ doSomething('+'); });
// if at least one of the background tasks is running
if (f1.wait_for(chrono::seconds(0)) != future_status::deferred ||
f2.wait_for(chrono::seconds(0)) != future_status::deferred) {
// poll until at least one of the loops finished
while (f1.wait_for(chrono::seconds(0)) != future_status::ready &&
f2.wait_for(chrono::seconds(0)) != future_status::ready) {
//...;
this_thread::yield(); // hint to reschedule to the next thread
}
}
cout.put('\n').flush();
// wait for all loops to be finished and process any exception
try {
f1.get();
f2.get();
}
catch (const exception& e) {
cout << "\nEXCEPTION: " << e.what() << endl;
}
cout << "\ndone" << endl;
}
唯一确知的是,newline绝不会再两个循环中的某个完成前打印。我们甚至无法保证newline紧邻于“序列之最末字符”之后,因为“循环之一结束后记录相应的future object”以及“该future被核值(evaluated)”可能需要花费一些时间(注意,这并非实时处理)
1.传递实参(Passing Argument)
(1).也可以传递“在async()语句之前就已存在”的实参,一如以往,可采用by value方式或by reference方式传递他们
char c='@';
auto f=async([=]{doSomething(c);}); //pass copy of c to doSomething()
auto f=async(doSomething,c); //call doSomething(c) asynchronusly
auto f=async([&]doSomething(c);}); //risky!
auto f=async(doSomething,std::ref(c)); //risky!
如果是采用by reference方式传递实参,被传递值甚至可能在后台任务启动前就变得无效。但如果控制实参寿命,使它超越后台任务的生命,可以这么做:
void doSomething(const char&c); //pass character by reference
...
char c='@';
auto f=std::async([&]{doSomething(c);}); //pass c by reference
...
f.get(); //needs lifetime of c until here
但是,如果你“以by reference方式传递实参”只是为了可在另一个线程中改动它们,可能轻易落入不明确行为(undefined behavior)之中。例如,在试图启动一个输出循环后,你改变该字符。
auto f=async([&]{doSomething(c);});
...
c='_'; //switch output of doSomething() to underscores,if it still runs
f.get();
首先,“这里”以及“在doSomething()”内对c的处理,其次序无法预期。更糟的是,我们在某一线程中改动c,在另一个线程中读取c,这是对同一对象的异步并发处理(data race,见18.4.1节982页),将导致不可预期的行为,除非使用mutex(互斥体,见18.5,989页)或atomic(见18.7节,1012页)保护并发处理动作。
所以,如果使用async(),就应该以by value方式传递所有“用来处理目标函数”的必要object,使async()只需使用局部拷贝(local copy)。如果复制成本太高,请让那些object以const reference的形式传递,且不使用mutable。其他所有情况请阅读18.4节第982页。
也可以传给async()一个“指向成员函数”的pointer,这种情况下,位于该成员函数名称之后的第一个实参必须是个reference或pointer,指向某个object,后者将调用该成员函数:
class X{
public:
void mem(int num);
};
X x;
auto a=async(&X::mem,x,42); //try to call x.mem(42) asynchronously
内容总结
以上是互联网集市为您收集整理的《C++标准库(第二版)》第十八章:并发(18.1)全部内容,希望文章能够帮你解决《C++标准库(第二版)》第十八章:并发(18.1)所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。