## 使用c++11让线程线程开发变得简单 ### 5.1 线程创建 1. jon() ``` #include void func(){} int main() { std::thread t(func); t.join();//join将会阻塞当前线程,直到线程执行结束 return 0; } ``` 2. detach ``` #include void func(){} int main() { std::thread t(func); t.detach();//线程与对象分离,让线程作为后台线程去运行,但同时也无法与线程产生联系(控制)了 return 0; } ``` 线程不能被复制,但是可以被移动 ``` #include void func(){} int main() { std::thread t(func); std::thread t2(std::move(t)); t2.join(); return 0; } ``` 需要注意线程的生命周期 ``` #include void func(){} int main() { std::thread t(func); return 0; } ``` 以上代码会报错,因为线程对象先于线程结束了,可以通过join()或者detach()来解决,当然也可以放到一个容器中来保存,以保证线程生命周期 ``` #include std::vector g_list; std::vector> g_list2; void func(){} void CreateThread() { std::thread t(func); g_list.push_back(std::move(t)); g_list2.push_back(std::make_shared(func)); } int main() { void CreateThread(); for(auto& thread : g_list) { thread.join(); } for(auto& thread : g_list2) { thread->join(); } return 0; } ``` #### 线程的基本用法 1. 获取当前信息 ``` void func() { } int main() { std::thread t(func); std::cout << t.get_id() << std::endl;//当前线程id //获得cpu核心数 std::cout << std::thread::hardware_concurrency() << std::endl; } ``` 2. 线程休眠 ``` void f() { std::this_thread::sleep_for(std::chrono::seconds(3)); std::cout << "time out" << std::endl; } int main() { std::thread t(f); t.join(); } ``` ### 互斥量 #### 独占互斥锁std::mutex ``` std::mutex g_lock; void func() { g_lock.lock(); std::cout << "entered thread" << std::this_thread::get_id() << std::endl; std::this_thread::sleep_for(std::chrono::seconds(1)); std::cout << "leaving thread" << std::this_thread::get_id() << std::endl; g_lock.unlock(); } int main() { std::thread t1(func); std::thread t2(func); std::thread t3(func); t1.join(); t2.join(); t3.join(); } ``` 使用lock_guard可以简化lock/unlock,同时也更加安全。因为是在构造时自动锁定互斥量,析构时自动解锁。 ``` void func() { std::lock_guard locker(g_lock); std::cout << "entered thread" << std::this_thread::get_id() << std::endl; std::this_thread::sleep_for(std::chrono::seconds(1)); std::cout << "leaving thread" << std::this_thread::get_id() << std::endl; } ``` #### 5.2.2 递归的独占互斥量std::recursive_mutex 递归锁允许同一个线程多次获得该锁,可以用来解决同一线程需要多次获取互斥量时死锁的问题。 ``` struct Complex { std::mutex mutex; int i; Complex() :i(0) {} void mul(int x) { std::lock_guard lock(mutex); i *= x; } void div(int x) { std::lock_guard lock(mutex); i *= x; } void both(int x,int y) { std::lock_guard lock(mutex); mul(x); div(y); } }; int main() { Complex complex; complex.both(32, 23); } ``` 简单的方法就是使用std::recursive_mutex ``` struct Complex { std::recursive_mutex mutex; int i; Complex() :i(0) {} void mul(int x) { std::lock_guard lock(mutex); i *= x; } void div(int x) { std::lock_guard lock(mutex); i *= x; } void both(int x,int y) { std::lock_guard lock(mutex); mul(x); div(y); } }; int main() { Complex complex; complex.both(32, 23); } ``` 需要注意的是: 1. 需要用到递归锁定的多线程互斥处理往往本身就可以简化,允许递归互斥很容易放纵复杂逻辑的产生 2. 递归锁比起非递归锁效率会低一些 3. 递归锁虽然允许同一个线程多次获得同一个互斥锁,可重复获得的最大次数并未具体说明,一旦超过一定次数,再对lock进行调用就会抛出std::system错误 #### 5.2.3带超时的互斥量std::timed_mutex和std::recursive_timed_mutex 主要用于在获取锁时增加超时等待功能。不至于一直等待 ``` std::timed_mutex mutex; void work() { std::chrono::milliseconds timeout(100); while (true) { if (mutex.try_lock_for(timeout)) { std::cout << std::this_thread::get_id() << ": do work with the mutex" << std::endl; std::chrono::milliseconds sleepDuration(250); std::this_thread::sleep_for(sleepDuration); mutex.unlock(); std::this_thread::sleep_for(sleepDuration); } else { std::cout << std::this_thread::get_id() << ": do work without the mutex" << std::endl; std::chrono::milliseconds sleepDuration(100); std::this_thread::sleep_for(sleepDuration); } } } int main(void) { std::thread t1(work); std::thread t2(work); t1.join(); t2.join(); return 0; } ``` #### 5.3 条件变量 条件变量时c++11提供的另外一种用于等待的同步机制,它能柱塞一个或者多个线程,直到收到另一个线程发出的通知或者超时,才会唤醒当前阻塞的线程。
condition_variable condition_variable_any ``` #include #include #include #include #include template class SyncQueue { private: bool IsFull() const { return m_queue.size() == m_maxSize; } bool IsEmpty() const { return m_queue.empty(); } public: SyncQueue(int maxSize) : m_maxSize(maxSize) { } void Put(const T& x) { std::lock_guard locker(m_mutex); while (IsFull()) { std::cout << "缓冲区满了,需要等待..." << std::endl; m_notFull.wait(m_mutex); } m_queue.push_back(x); m_notFull.notify_one(); } void Take(T& x) { std::lock_guard locker(m_mutex); while (IsEmpty()) { std::cout << "缓冲区空了,需要等待..." << std::endl; m_notEmpty.wait(m_mutex); } x = m_queue.front(); m_queue.pop_front(); m_notFull.notify_one(); } bool Empty() { std::lock_guard locker(m_mutex); return m_queue.empty(); } bool Full() { std::lock_guard locker(m_mutex); return m_queue.size() == m_maxSize; } size_t Size() { std::lock_guard locker(m_mutex); return m_queue.size(); } int Count() { return m_queue.size(); } private: std::list m_queue; //缓冲区 std::mutex m_mutex; //互斥量和条件变量结合起来使用 std::condition_variable_any m_notEmpty;//不为空的条件变量 std::condition_variable_any m_notFull; //没有满的条件变量 int m_maxSize; //同步队列最大的size }; ``` 更好的方法是使用unique_lock,unique可以随时释放锁 ``` #include #include #include #include #include template class SimpleSyncQueue { public: SimpleSyncQueue(){} void Put(const T& x) { std::lock_guard locker(m_mutex); m_queue.push_back(x); m_notEmpty.notify_one(); } void Take(T& x) { std::unique_lock locker(m_mutex); m_notEmpty.wait(locker, [this]{return !m_queue.empty(); }); x = m_queue.front(); m_queue.pop_front(); } bool Empty() { std::lock_guard locker(m_mutex); return m_queue.empty(); } size_t Size() { std::lock_guard locker(m_mutex); return m_queue.size(); } private: std::list m_queue; std::mutex m_mutex; std::condition_variable m_notEmpty; }; ``` #### 5.4 原子变量 使用原子变量就不需要来使用互斥量来保护该变量了
使用std::mutex ``` #include #include #include struct Counter { int value = 0; std::mutex mutex; void increment() { std::lock_guard lock(mutex); ++value; std::cout << value << std::endl; } void decrement() { std::lock_guard lock(mutex); --value; std::cout << value << std::endl; } int get() { return value; } }; Counter g_counter; void Increments() { for (int i = 0; i < 10; ++i) { g_counter.increment(); } } void Decrements() { for (int i = 0; i < 5; ++i) { g_counter.decrement(); } } int main(void) { std::thread t1(Increments); std::thread t2(Decrements); t1.join(); t2.join(); system("pause"); return 0; } ``` 使用std::atomic ``` #include #include #include struct Counter { std::atomic value = 0; void increment() { ++value; } void decrement() { --value; } int get() { return value; } }; Counter g_counter; void Increments() { for (int i = 0; i < 10; ++i) { g_counter.increment(); std::cout << g_counter.get() << std::endl; } } void Decrements() { for (int i = 0; i < 5; ++i) { g_counter.decrement(); std::cout << g_counter.get() << std::endl; } } int main(void) { std::thread t1(Increments); std::thread t2(Decrements); t1.join(); t2.join(); system("pause"); return 0; } ``` #### 5.5 call_once/once_flag的使用 为了保证多线程环境中某个函数仅被调用一次,比如需要初始化某个对象,而这个对象只能初始化一次。 ``` #include #include #include std::once_flag flag; void do_once() { std::call_once(flag, [] {std::cout << "Called once" << std::endl; }); } int main(void) { std::thread t1(do_once); std::thread t2(do_once); std::thread t3(do_once); t1.join(); t2.join(); t3.join(); system("pause"); return 0; } ``` #### 5.6 异步操作类 #### 5.6.1 std::future thread库提供了future用来访问异步操作结果,因为异步操作结果是一个未来的期待值,所以被称为future。future提供了异步获取操作结果的通道,我们可以以同步等待的方式来获取结果,可以通过查询future的状态(future_status)来获取异步操作结果。 ``` std::future_status status; do { status = future.wait_for(std::chromno::seconds(1)); if (status == std::future_status::deferred) { } else if (status == std::future_status::timeout) { } else if (status==std::future_status::ready) { } } while (status!= std::future_status::ready); ``` 获取future有三种方式: 1. get:等待异步操作结束并返回结果 2. wait:等待异步操作完成 3. wait_for:超时等待返回结果 #### 5.6.2 std::promise std::promise将数据和future绑定起来,为获取线程函数中的某个值提供便利,在线程函数中为外面传进来的promise赋值,在线程函数执行完成之后就可以通过promise的future获取该值。 ``` std::promise pr; std::thread t([](std::promise&p) { p.set_value_at_thread_exit(9); }, std::ref(pr)); std::future f = pr.get_future(); auto r = f.get(); ``` #### 5.6.3 std::package_task ``` std::packaged_task task([]() {return 7; }); std::thread t1(std::ref(task)); std::future f1 = task.get_future(); auto r1 = f1.get(); ``` #### 5.6.4 以上三者关系 std::future提供了一个访问异步操作结果的机制,它和线程是一个级别的,属于低层次对象。之上是std::packaged_task和std::promise,他们内部都有future以便访问异步操作结果,std::packaged_task包装的是一个异步操作,std::promise包装的是一个值。那这两者又是什么关系呢?可以将一个异步操作的结果放到std::promise中。
future被promise和package_task用来作为异步操作或者异步的结果的连接通道,用std::future和std::shared_future来获取异步的调用结果。future是不可拷贝的,shared_future是可以拷贝的,当需要将future放到容器中则需要shared_future。 ``` #include #include #include #include #include int func(int x) { return x + 2; } int main(void) { std::packaged_task tsk(func); std::future fut = tsk.get_future(); //获取future std::thread(std::move(tsk), 2).detach(); int value = fut.get(); //等待task完成并获取返回值 std::cout << "The result is " << value << ".\n"; std::vector> v; std::shared_future f = std::async(std::launch::async, [](int a, int b){return a + b; }, 2, 3); v.push_back(f); std::cout << "The shared_future result is " << v[0].get() << std::endl; return 0; } ``` #### 5.7 线程异步操作函数 std::async可以直接创建异步的task,异步任务结果也保存在future中,调用furturn.get()获取即可,如果不关心结果,可以使用furturn.wait()
两种线程创建策略:
1. std::launch::async:在调用async时就开始创建线程。 2. std::launch::deferred:延迟加载方式创建线程。调用async时不创建线程,直到调用future的get或者wait时才创建。 ``` #include #include void TestAsync() { std::future f1 = std::async(std::launch::async, [](){ return 8; }); std::cout << f1.get() << std::endl; //output: 8 std::future f2 = std::async(std::launch::async, [](){ std::cout << 8 << std::endl; return; }); f2.wait(); //output: 8 std::future future = std::async(std::launch::async, [](){ std::this_thread::sleep_for(std::chrono::seconds(3)); return 8; }); std::cout << "waiting...\n"; std::future_status status; do { status = future.wait_for(std::chrono::seconds(1)); if (status == std::future_status::deferred) { std::cout << "deferred\n"; } else if (status == std::future_status::timeout) { std::cout << "timeout\n"; } else if (status == std::future_status::ready) { std::cout << "ready!\n"; } } while (status != std::future_status::ready); std::cout << "result is " << future.get() << '\n'; } int main(void) { TestAsync(); return 0; } ```