14 KiB
使用c++11让线程线程开发变得简单
5.1 线程创建
- jon()
#include <thread>
void func(){}
int main()
{
std::thread t(func);
t.join();//join将会阻塞当前线程,直到线程执行结束
return 0;
}
- detach
#include <thread>
void func(){}
int main()
{
std::thread t(func);
t.detach();//线程与对象分离,让线程作为后台线程去运行,但同时也无法与线程产生联系(控制)了
return 0;
}
线程不能被复制,但是可以被移动
#include <thread>
void func(){}
int main()
{
std::thread t(func);
std::thread t2(std::move(t));
t2.join();
return 0;
}
需要注意线程的生命周期
#include <thread>
void func(){}
int main()
{
std::thread t(func);
return 0;
}
以上代码会报错,因为线程对象先于线程结束了,可以通过join()或者detach()来解决,当然也可以放到一个容器中来保存,以保证线程生命周期
#include <thread>
std::vector<std::thread> g_list;
std::vector<std::shared_ptr<std::thread>> g_list2;
void func(){}
void CreateThread()
{
std::thread t(func);
g_list.push_back(std::move(t));
g_list2.push_back(std::make_shared<std::thread>(func));
}
int main()
{
void CreateThread();
for(auto& thread : g_list)
{
thread.join();
}
for(auto& thread : g_list2)
{
thread->join();
}
return 0;
}
线程的基本用法
- 获取当前信息
void func()
{
}
int main()
{
std::thread t(func);
std::cout << t.get_id() << std::endl;//当前线程id
//获得cpu核心数
std::cout << std::thread::hardware_concurrency() << std::endl;
}
- 线程休眠
void f() {
std::this_thread::sleep_for(std::chrono::seconds(3));
std::cout << "time out" << std::endl;
}
int main()
{
std::thread t(f);
t.join();
}
互斥量
独占互斥锁std::mutex
std::mutex g_lock;
void func() {
g_lock.lock();
std::cout << "entered thread" << std::this_thread::get_id() << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << "leaving thread" << std::this_thread::get_id() << std::endl;
g_lock.unlock();
}
int main()
{
std::thread t1(func);
std::thread t2(func);
std::thread t3(func);
t1.join();
t2.join();
t3.join();
}
使用lock_guard可以简化lock/unlock,同时也更加安全。因为是在构造时自动锁定互斥量,析构时自动解锁。
void func() {
std::lock_guard<std::mutex> locker(g_lock);
std::cout << "entered thread" << std::this_thread::get_id() << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << "leaving thread" << std::this_thread::get_id() << std::endl;
}
5.2.2 递归的独占互斥量std::recursive_mutex
递归锁允许同一个线程多次获得该锁,可以用来解决同一线程需要多次获取互斥量时死锁的问题。
struct Complex
{
std::mutex mutex;
int i;
Complex() :i(0) {}
void mul(int x) {
std::lock_guard<std::mutex> lock(mutex);
i *= x;
}
void div(int x) {
std::lock_guard<std::mutex> lock(mutex);
i *= x;
}
void both(int x,int y) {
std::lock_guard<std::mutex> lock(mutex);
mul(x);
div(y);
}
};
int main()
{
Complex complex;
complex.both(32, 23);
}
简单的方法就是使用std::recursive_mutex
struct Complex
{
std::recursive_mutex mutex;
int i;
Complex() :i(0) {}
void mul(int x) {
std::lock_guard<std::recursive_mutex> lock(mutex);
i *= x;
}
void div(int x) {
std::lock_guard<std::recursive_mutex> lock(mutex);
i *= x;
}
void both(int x,int y) {
std::lock_guard<std::recursive_mutex> lock(mutex);
mul(x);
div(y);
}
};
int main()
{
Complex complex;
complex.both(32, 23);
}
需要注意的是:
- 需要用到递归锁定的多线程互斥处理往往本身就可以简化,允许递归互斥很容易放纵复杂逻辑的产生
- 递归锁比起非递归锁效率会低一些
- 递归锁虽然允许同一个线程多次获得同一个互斥锁,可重复获得的最大次数并未具体说明,一旦超过一定次数,再对lock进行调用就会抛出std::system错误
5.2.3带超时的互斥量std::timed_mutex和std::recursive_timed_mutex
主要用于在获取锁时增加超时等待功能。不至于一直等待
std::timed_mutex mutex;
void work()
{
std::chrono::milliseconds timeout(100);
while (true)
{
if (mutex.try_lock_for(timeout))
{
std::cout << std::this_thread::get_id() << ": do work with the mutex" << std::endl;
std::chrono::milliseconds sleepDuration(250);
std::this_thread::sleep_for(sleepDuration);
mutex.unlock();
std::this_thread::sleep_for(sleepDuration);
}
else
{
std::cout << std::this_thread::get_id() << ": do work without the mutex" << std::endl;
std::chrono::milliseconds sleepDuration(100);
std::this_thread::sleep_for(sleepDuration);
}
}
}
int main(void)
{
std::thread t1(work);
std::thread t2(work);
t1.join();
t2.join();
return 0;
}
5.3 条件变量
条件变量时c++11提供的另外一种用于等待的同步机制,它能柱塞一个或者多个线程,直到收到另一个线程发出的通知或者超时,才会唤醒当前阻塞的线程。
condition_variable
condition_variable_any
#include<list>
#include<mutex>
#include<thread>
#include<condition_variable>
#include <iostream>
template<typename T>
class SyncQueue
{
private:
bool IsFull() const
{
return m_queue.size() == m_maxSize;
}
bool IsEmpty() const
{
return m_queue.empty();
}
public:
SyncQueue(int maxSize) : m_maxSize(maxSize)
{
}
void Put(const T& x)
{
std::lock_guard<std::mutex> locker(m_mutex);
while (IsFull())
{
std::cout << "缓冲区满了,需要等待..." << std::endl;
m_notFull.wait(m_mutex);
}
m_queue.push_back(x);
m_notFull.notify_one();
}
void Take(T& x)
{
std::lock_guard<std::mutex> locker(m_mutex);
while (IsEmpty())
{
std::cout << "缓冲区空了,需要等待..." << std::endl;
m_notEmpty.wait(m_mutex);
}
x = m_queue.front();
m_queue.pop_front();
m_notFull.notify_one();
}
bool Empty()
{
std::lock_guard<std::mutex> locker(m_mutex);
return m_queue.empty();
}
bool Full()
{
std::lock_guard<std::mutex> locker(m_mutex);
return m_queue.size() == m_maxSize;
}
size_t Size()
{
std::lock_guard<std::mutex> locker(m_mutex);
return m_queue.size();
}
int Count()
{
return m_queue.size();
}
private:
std::list<T> m_queue; //缓冲区
std::mutex m_mutex; //互斥量和条件变量结合起来使用
std::condition_variable_any m_notEmpty;//不为空的条件变量
std::condition_variable_any m_notFull; //没有满的条件变量
int m_maxSize; //同步队列最大的size
};
更好的方法是使用unique_lock,unique可以随时释放锁
#include <thread>
#include <condition_variable>
#include <mutex>
#include <list>
#include <iostream>
template<typename T>
class SimpleSyncQueue
{
public:
SimpleSyncQueue(){}
void Put(const T& x)
{
std::lock_guard<std::mutex> locker(m_mutex);
m_queue.push_back(x);
m_notEmpty.notify_one();
}
void Take(T& x)
{
std::unique_lock<std::mutex> locker(m_mutex);
m_notEmpty.wait(locker, [this]{return !m_queue.empty(); });
x = m_queue.front();
m_queue.pop_front();
}
bool Empty()
{
std::lock_guard<std::mutex> locker(m_mutex);
return m_queue.empty();
}
size_t Size()
{
std::lock_guard<std::mutex> locker(m_mutex);
return m_queue.size();
}
private:
std::list<T> m_queue;
std::mutex m_mutex;
std::condition_variable m_notEmpty;
};
5.4 原子变量
使用原子变量就不需要来使用互斥量来保护该变量了
使用std::mutex
#include <iostream>
#include <thread>
#include <mutex>
struct Counter
{
int value = 0;
std::mutex mutex;
void increment()
{
std::lock_guard<std::mutex> lock(mutex);
++value;
std::cout << value << std::endl;
}
void decrement()
{
std::lock_guard<std::mutex> lock(mutex);
--value;
std::cout << value << std::endl;
}
int get()
{
return value;
}
};
Counter g_counter;
void Increments()
{
for (int i = 0; i < 10; ++i)
{
g_counter.increment();
}
}
void Decrements()
{
for (int i = 0; i < 5; ++i)
{
g_counter.decrement();
}
}
int main(void)
{
std::thread t1(Increments);
std::thread t2(Decrements);
t1.join();
t2.join();
system("pause");
return 0;
}
使用std::atomic
#include <iostream>
#include <thread>
#include <atomic>
struct Counter
{
std::atomic<int> value = 0;
void increment()
{
++value;
}
void decrement()
{
--value;
}
int get()
{
return value;
}
};
Counter g_counter;
void Increments()
{
for (int i = 0; i < 10; ++i)
{
g_counter.increment();
std::cout << g_counter.get() << std::endl;
}
}
void Decrements()
{
for (int i = 0; i < 5; ++i)
{
g_counter.decrement();
std::cout << g_counter.get() << std::endl;
}
}
int main(void)
{
std::thread t1(Increments);
std::thread t2(Decrements);
t1.join();
t2.join();
system("pause");
return 0;
}
5.5 call_once/once_flag的使用
为了保证多线程环境中某个函数仅被调用一次,比如需要初始化某个对象,而这个对象只能初始化一次。
#include <thread>
#include <iostream>
#include <mutex>
std::once_flag flag;
void do_once() {
std::call_once(flag, [] {std::cout << "Called once" << std::endl; });
}
int main(void)
{
std::thread t1(do_once);
std::thread t2(do_once);
std::thread t3(do_once);
t1.join();
t2.join();
t3.join();
system("pause");
return 0;
}
5.6 异步操作类
5.6.1 std::future
thread库提供了future用来访问异步操作结果,因为异步操作结果是一个未来的期待值,所以被称为future。future提供了异步获取操作结果的通道,我们可以以同步等待的方式来获取结果,可以通过查询future的状态(future_status)来获取异步操作结果。
std::future_status status;
do
{
status = future.wait_for(std::chromno::seconds(1));
if (status == std::future_status::deferred) {
}
else if (status == std::future_status::timeout) {
}
else if (status==std::future_status::ready) {
}
} while (status!= std::future_status::ready);
获取future有三种方式:
- get:等待异步操作结束并返回结果
- wait:等待异步操作完成
- wait_for:超时等待返回结果
5.6.2 std::promise
std::promise将数据和future绑定起来,为获取线程函数中的某个值提供便利,在线程函数中为外面传进来的promise赋值,在线程函数执行完成之后就可以通过promise的future获取该值。
std::promise<int> pr;
std::thread t([](std::promise<int>&p) {
p.set_value_at_thread_exit(9);
}, std::ref(pr));
std::future<int> f = pr.get_future();
auto r = f.get();
5.6.3 std::package_task
std::packaged_task<int()> task([]() {return 7; });
std::thread t1(std::ref(task));
std::future<int> f1 = task.get_future();
auto r1 = f1.get();
5.6.4 以上三者关系
std::future提供了一个访问异步操作结果的机制,它和线程是一个级别的,属于低层次对象。之上是std::packaged_task和std::promise,他们内部都有future以便访问异步操作结果,std::packaged_task包装的是一个异步操作,std::promise包装的是一个值。那这两者又是什么关系呢?可以将一个异步操作的结果放到std::promise中。
future被promise和package_task用来作为异步操作或者异步的结果的连接通道,用std::future和std::shared_future来获取异步的调用结果。future是不可拷贝的,shared_future是可以拷贝的,当需要将future放到容器中则需要shared_future。
#include <iostream>
#include <thread>
#include <utility>
#include <future>
#include <vector>
int func(int x) { return x + 2; }
int main(void)
{
std::packaged_task<int(int)> tsk(func);
std::future<int> fut = tsk.get_future(); //获取future
std::thread(std::move(tsk), 2).detach();
int value = fut.get(); //等待task完成并获取返回值
std::cout << "The result is " << value << ".\n";
std::vector<std::shared_future<int>> v;
std::shared_future<int> f = std::async(std::launch::async, [](int a, int b){return a + b; }, 2, 3);
v.push_back(f);
std::cout << "The shared_future result is " << v[0].get() << std::endl;
return 0;
}
5.7 线程异步操作函数
std::async可以直接创建异步的task,异步任务结果也保存在future中,调用furturn.get()获取即可,如果不关心结果,可以使用furturn.wait()
两种线程创建策略:
- std::launch::async:在调用async时就开始创建线程。
- std::launch::deferred:延迟加载方式创建线程。调用async时不创建线程,直到调用future的get或者wait时才创建。
#include <iostream>
#include <future>
void TestAsync()
{
std::future<int> f1 = std::async(std::launch::async, [](){
return 8;
});
std::cout << f1.get() << std::endl; //output: 8
std::future<void> f2 = std::async(std::launch::async, [](){
std::cout << 8 << std::endl; return;
});
f2.wait(); //output: 8
std::future<int> future = std::async(std::launch::async, [](){
std::this_thread::sleep_for(std::chrono::seconds(3));
return 8;
});
std::cout << "waiting...\n";
std::future_status status;
do {
status = future.wait_for(std::chrono::seconds(1));
if (status == std::future_status::deferred)
{
std::cout << "deferred\n";
}
else if (status == std::future_status::timeout)
{
std::cout << "timeout\n";
}
else if (status == std::future_status::ready)
{
std::cout << "ready!\n";
}
} while (status != std::future_status::ready);
std::cout << "result is " << future.get() << '\n';
}
int main(void)
{
TestAsync();
return 0;
}