14 KiB
		
	
	
	
	
	
	
	
			
		
		
	
	使用c++11让线程线程开发变得简单
5.1 线程创建
- jon()
 
#include <thread>
void func(){}
int main()
{
    std::thread t(func);
    t.join();//join将会阻塞当前线程,直到线程执行结束
    return 0;
}
- detach
 
#include <thread>
void func(){}
int main()
{
    std::thread t(func);
    t.detach();//线程与对象分离,让线程作为后台线程去运行,但同时也无法与线程产生联系(控制)了
    return 0;
}
线程不能被复制,但是可以被移动
#include <thread>
void func(){}
int main()
{
    std::thread t(func);
    std::thread t2(std::move(t));
    t2.join();
    return 0;
}
需要注意线程的生命周期
#include <thread>
void func(){}
int main()
{
    std::thread t(func);
    return 0;
}
以上代码会报错,因为线程对象先于线程结束了,可以通过join()或者detach()来解决,当然也可以放到一个容器中来保存,以保证线程生命周期
#include <thread>
std::vector<std::thread> g_list;
std::vector<std::shared_ptr<std::thread>> g_list2;
void func(){}
void CreateThread()
{
    std::thread t(func);
    g_list.push_back(std::move(t));
    g_list2.push_back(std::make_shared<std::thread>(func));
}
int main()
{
    void CreateThread();
    for(auto& thread : g_list)
    {
        thread.join();
    }
    for(auto& thread : g_list2)
    {
        thread->join();
    }
    return 0;
}
线程的基本用法
- 获取当前信息
 
void func()
{
}
int main()
{
	std::thread t(func);
	std::cout << t.get_id() << std::endl;//当前线程id
	//获得cpu核心数
	std::cout << std::thread::hardware_concurrency() << std::endl;
}
- 线程休眠
 
void f() {
	std::this_thread::sleep_for(std::chrono::seconds(3));
	std::cout << "time out" << std::endl;
}
int main()
{
	std::thread t(f);
	t.join();
}
互斥量
独占互斥锁std::mutex
std::mutex g_lock;
void func() {
	g_lock.lock();
	std::cout << "entered thread" << std::this_thread::get_id() << std::endl;
	std::this_thread::sleep_for(std::chrono::seconds(1));
	std::cout << "leaving thread" << std::this_thread::get_id() << std::endl;
	g_lock.unlock();
}
int main()
{
	std::thread t1(func);
	std::thread t2(func);
	std::thread t3(func);
	t1.join();
	t2.join();
	t3.join();
}
使用lock_guard可以简化lock/unlock,同时也更加安全。因为是在构造时自动锁定互斥量,析构时自动解锁。
void func() {
	std::lock_guard<std::mutex> locker(g_lock);
	std::cout << "entered thread" << std::this_thread::get_id() << std::endl;
	std::this_thread::sleep_for(std::chrono::seconds(1));
	std::cout << "leaving thread" << std::this_thread::get_id() << std::endl;
}
5.2.2 递归的独占互斥量std::recursive_mutex
递归锁允许同一个线程多次获得该锁,可以用来解决同一线程需要多次获取互斥量时死锁的问题。
struct Complex
{
	std::mutex mutex;
	int i;
	Complex() :i(0) {}
	void mul(int x) {
		std::lock_guard<std::mutex> lock(mutex);
		i *= x;
	}
	void div(int x) {
		std::lock_guard<std::mutex> lock(mutex);
		i *= x;
	}
	void both(int x,int y) {
		std::lock_guard<std::mutex> lock(mutex);
		mul(x);
		div(y);
	}
};
int main()
{
	Complex complex;
	complex.both(32, 23);
}
简单的方法就是使用std::recursive_mutex
struct Complex
{
	std::recursive_mutex mutex;
	int i;
	Complex() :i(0) {}
	void mul(int x) {
		std::lock_guard<std::recursive_mutex> lock(mutex);
		i *= x;
	}
	void div(int x) {
		std::lock_guard<std::recursive_mutex> lock(mutex);
		i *= x;
	}
	void both(int x,int y) {
		std::lock_guard<std::recursive_mutex> lock(mutex);
		mul(x);
		div(y);
	}
};
int main()
{
	Complex complex;
	complex.both(32, 23);
}
需要注意的是:
- 需要用到递归锁定的多线程互斥处理往往本身就可以简化,允许递归互斥很容易放纵复杂逻辑的产生
 - 递归锁比起非递归锁效率会低一些
 - 递归锁虽然允许同一个线程多次获得同一个互斥锁,可重复获得的最大次数并未具体说明,一旦超过一定次数,再对lock进行调用就会抛出std::system错误
 
5.2.3带超时的互斥量std::timed_mutex和std::recursive_timed_mutex
主要用于在获取锁时增加超时等待功能。不至于一直等待
std::timed_mutex mutex;
void work()
{
	std::chrono::milliseconds timeout(100);
	while (true)
	{
		if (mutex.try_lock_for(timeout))
		{
			std::cout << std::this_thread::get_id() << ": do work with the mutex" << std::endl;
			std::chrono::milliseconds sleepDuration(250);
			std::this_thread::sleep_for(sleepDuration);
			mutex.unlock();
			std::this_thread::sleep_for(sleepDuration);
		}
		else
		{
			std::cout << std::this_thread::get_id() << ": do work without the mutex" << std::endl;
			std::chrono::milliseconds sleepDuration(100);
			std::this_thread::sleep_for(sleepDuration);
		}
	}
}
int main(void)
{
	std::thread t1(work);
	std::thread t2(work);
	t1.join();
	t2.join();
	return 0;
}
5.3 条件变量
条件变量时c++11提供的另外一种用于等待的同步机制,它能柱塞一个或者多个线程,直到收到另一个线程发出的通知或者超时,才会唤醒当前阻塞的线程。
condition_variable
condition_variable_any
#include<list>
#include<mutex>
#include<thread>
#include<condition_variable>
#include <iostream>
template<typename T>
class SyncQueue
{
private:
	bool IsFull() const
	{
		return m_queue.size() == m_maxSize;
	}
	bool IsEmpty() const
	{
		return m_queue.empty();
	}
public:
	SyncQueue(int maxSize) : m_maxSize(maxSize)
	{
	}
	void Put(const T& x)
	{
		std::lock_guard<std::mutex> locker(m_mutex);
		while (IsFull())
		{
			std::cout << "缓冲区满了,需要等待..." << std::endl;
			m_notFull.wait(m_mutex);
		}
		m_queue.push_back(x);
		m_notFull.notify_one();
	}
	void Take(T& x)
	{
		std::lock_guard<std::mutex> locker(m_mutex);
		
		while (IsEmpty())
		{
			std::cout << "缓冲区空了,需要等待..." << std::endl;
			m_notEmpty.wait(m_mutex);
		}
		x = m_queue.front();
		m_queue.pop_front();
		m_notFull.notify_one();
	}
	bool Empty()
	{
		std::lock_guard<std::mutex> locker(m_mutex);
		return m_queue.empty();
	}
	bool Full()
	{
		std::lock_guard<std::mutex> locker(m_mutex);
		return m_queue.size() == m_maxSize;
	}
	size_t Size()
	{
		std::lock_guard<std::mutex> locker(m_mutex);
		return m_queue.size();
	}
	int Count()
	{
		return m_queue.size();
	}
private:
	std::list<T> m_queue;                  //缓冲区
	std::mutex m_mutex;                    //互斥量和条件变量结合起来使用
	std::condition_variable_any m_notEmpty;//不为空的条件变量
	std::condition_variable_any m_notFull; //没有满的条件变量
	int m_maxSize;                         //同步队列最大的size
};
更好的方法是使用unique_lock,unique可以随时释放锁
#include <thread>
#include <condition_variable>
#include <mutex>
#include <list>
#include <iostream>
template<typename T>
class SimpleSyncQueue
{
public:
	SimpleSyncQueue(){}
	void Put(const T& x)
	{
		std::lock_guard<std::mutex> locker(m_mutex);
		m_queue.push_back(x);
		m_notEmpty.notify_one();
	}
	void Take(T& x)
	{
		std::unique_lock<std::mutex> locker(m_mutex);
		m_notEmpty.wait(locker, [this]{return !m_queue.empty(); });
		x = m_queue.front();
		m_queue.pop_front();
	}
	bool Empty()
	{
		std::lock_guard<std::mutex> locker(m_mutex);
		return m_queue.empty();
	}
	size_t Size()
	{
		std::lock_guard<std::mutex> locker(m_mutex);
		return m_queue.size();
	}
private:
	std::list<T> m_queue;
	std::mutex m_mutex;
	std::condition_variable m_notEmpty;
};
5.4 原子变量
使用原子变量就不需要来使用互斥量来保护该变量了
使用std::mutex
#include <iostream>
#include <thread>
#include <mutex>
struct Counter
{
	int value = 0;
	std::mutex mutex;
	void increment()
	{
		std::lock_guard<std::mutex> lock(mutex);
		++value;
		std::cout << value << std::endl;
	}
	void decrement()
	{
		std::lock_guard<std::mutex> lock(mutex);
		--value;
		std::cout << value << std::endl;
	}
	int get()
	{
		return value;
	}
};
Counter g_counter;
void Increments()
{
	for (int i = 0; i < 10; ++i)
	{
		g_counter.increment();
	}
}
void Decrements()
{
	for (int i = 0; i < 5; ++i)
	{
		g_counter.decrement();
	}
}
int main(void)
{
	std::thread t1(Increments);
	std::thread t2(Decrements);
	t1.join();
	t2.join();
	system("pause");
	return 0;
}
使用std::atomic
#include <iostream>
#include <thread>
#include <atomic>
struct Counter
{
	std::atomic<int> value = 0;
	void increment()
	{
		++value;
	}
	void decrement()
	{
		--value;
	}
	int get()
	{
		return value;
	}
};
Counter g_counter;
void Increments()
{
	for (int i = 0; i < 10; ++i)
	{
		g_counter.increment();
		std::cout << g_counter.get() << std::endl;
	}
}
void Decrements()
{
	for (int i = 0; i < 5; ++i)
	{
		g_counter.decrement();
		std::cout << g_counter.get() << std::endl;
	}
}
int main(void)
{
	std::thread t1(Increments);
	std::thread t2(Decrements);
	t1.join();
	t2.join();
	system("pause");
	return 0;
}
5.5 call_once/once_flag的使用
为了保证多线程环境中某个函数仅被调用一次,比如需要初始化某个对象,而这个对象只能初始化一次。
#include <thread>
#include <iostream>
#include <mutex>
std::once_flag flag;
void do_once() {
	std::call_once(flag, [] {std::cout << "Called once" << std::endl; });
}
int main(void)
{
	std::thread t1(do_once);
	std::thread t2(do_once);
	std::thread t3(do_once);
	t1.join();
	t2.join();
	t3.join();
	system("pause");
	return 0;
}
5.6 异步操作类
5.6.1 std::future
thread库提供了future用来访问异步操作结果,因为异步操作结果是一个未来的期待值,所以被称为future。future提供了异步获取操作结果的通道,我们可以以同步等待的方式来获取结果,可以通过查询future的状态(future_status)来获取异步操作结果。
std::future_status status;
do 
{
	status = future.wait_for(std::chromno::seconds(1));
	if (status == std::future_status::deferred) {
		
	}
	else if (status == std::future_status::timeout) {
		
	}
	else if (status==std::future_status::ready) {
		
	}
} while (status!= std::future_status::ready);
获取future有三种方式:
- get:等待异步操作结束并返回结果
 - wait:等待异步操作完成
 - wait_for:超时等待返回结果
 
5.6.2 std::promise
std::promise将数据和future绑定起来,为获取线程函数中的某个值提供便利,在线程函数中为外面传进来的promise赋值,在线程函数执行完成之后就可以通过promise的future获取该值。
std::promise<int> pr;
std::thread t([](std::promise<int>&p) {
	p.set_value_at_thread_exit(9);
}, std::ref(pr));
std::future<int> f = pr.get_future();
auto r = f.get();
5.6.3 std::package_task
std::packaged_task<int()> task([]() {return 7; });
std::thread t1(std::ref(task));
std::future<int> f1 = task.get_future();
auto r1 = f1.get();
5.6.4 以上三者关系
std::future提供了一个访问异步操作结果的机制,它和线程是一个级别的,属于低层次对象。之上是std::packaged_task和std::promise,他们内部都有future以便访问异步操作结果,std::packaged_task包装的是一个异步操作,std::promise包装的是一个值。那这两者又是什么关系呢?可以将一个异步操作的结果放到std::promise中。
future被promise和package_task用来作为异步操作或者异步的结果的连接通道,用std::future和std::shared_future来获取异步的调用结果。future是不可拷贝的,shared_future是可以拷贝的,当需要将future放到容器中则需要shared_future。
#include <iostream>
#include <thread>
#include <utility>
#include <future>
#include <vector>
int func(int x) { return x + 2; }
int main(void)
{
	std::packaged_task<int(int)> tsk(func);
	std::future<int> fut = tsk.get_future();  //获取future
	std::thread(std::move(tsk), 2).detach();
	int value = fut.get();  //等待task完成并获取返回值
	std::cout << "The result is " << value << ".\n";
	std::vector<std::shared_future<int>> v;
	std::shared_future<int> f = std::async(std::launch::async, [](int a, int b){return a + b; }, 2, 3);
	v.push_back(f);
	std::cout << "The shared_future result is " << v[0].get() << std::endl;
	return 0;
}
5.7 线程异步操作函数
std::async可以直接创建异步的task,异步任务结果也保存在future中,调用furturn.get()获取即可,如果不关心结果,可以使用furturn.wait()
两种线程创建策略:
- std::launch::async:在调用async时就开始创建线程。
 - std::launch::deferred:延迟加载方式创建线程。调用async时不创建线程,直到调用future的get或者wait时才创建。
 
#include <iostream>
#include <future>
void TestAsync()
{
	std::future<int> f1 = std::async(std::launch::async, [](){
		return 8;
	});
	std::cout << f1.get() << std::endl; //output: 8
	std::future<void> f2 = std::async(std::launch::async, [](){
		std::cout << 8 << std::endl; return;
	});
	f2.wait(); //output: 8
	std::future<int> future = std::async(std::launch::async, [](){
		std::this_thread::sleep_for(std::chrono::seconds(3));
		return 8;
	});
	std::cout << "waiting...\n";
	std::future_status status;
	do {
		status = future.wait_for(std::chrono::seconds(1));
		if (status == std::future_status::deferred)
		{
			std::cout << "deferred\n";
		}
		else if (status == std::future_status::timeout)
		{
			std::cout << "timeout\n";
		}
		else if (status == std::future_status::ready)
		{
			std::cout << "ready!\n";
		}
	} while (status != std::future_status::ready);
	std::cout << "result is " << future.get() << '\n';
}
int main(void)
{
	TestAsync();
	return 0;
}