210 lines
		
	
	
		
			4.2 KiB
		
	
	
	
		
			Markdown
		
	
	
	
	
	
		
		
			
		
	
	
			210 lines
		
	
	
		
			4.2 KiB
		
	
	
	
		
			Markdown
		
	
	
	
	
	
| 
								 | 
							
								### 使用c++11开发一个半同步半异步线程池
							 | 
						|||
| 
								 | 
							
								```
							 | 
						|||
| 
								 | 
							
								#pragma once
							 | 
						|||
| 
								 | 
							
								#include<list>
							 | 
						|||
| 
								 | 
							
								#include<mutex>
							 | 
						|||
| 
								 | 
							
								#include<thread>
							 | 
						|||
| 
								 | 
							
								#include<condition_variable>
							 | 
						|||
| 
								 | 
							
								#include <iostream>
							 | 
						|||
| 
								 | 
							
								using namespace std;
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								template<typename T>
							 | 
						|||
| 
								 | 
							
								class SyncQueue
							 | 
						|||
| 
								 | 
							
								{
							 | 
						|||
| 
								 | 
							
								public:
							 | 
						|||
| 
								 | 
							
									SyncQueue(int maxSize) :m_maxSize(maxSize), m_needStop(false)
							 | 
						|||
| 
								 | 
							
									{
							 | 
						|||
| 
								 | 
							
									}
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
									void Put(const T&x)
							 | 
						|||
| 
								 | 
							
									{
							 | 
						|||
| 
								 | 
							
										Add(x);
							 | 
						|||
| 
								 | 
							
									}
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
									void Put(T&&x)
							 | 
						|||
| 
								 | 
							
									{
							 | 
						|||
| 
								 | 
							
										Add(std::forward<T>(x));
							 | 
						|||
| 
								 | 
							
									}
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
									void Take(std::list<T>& list)
							 | 
						|||
| 
								 | 
							
									{
							 | 
						|||
| 
								 | 
							
										std::unique_lock<std::mutex> locker(m_mutex);
							 | 
						|||
| 
								 | 
							
										m_notEmpty.wait(locker, [this] {return m_needStop || NotEmpty(); });
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
										if (m_needStop)
							 | 
						|||
| 
								 | 
							
											return;
							 | 
						|||
| 
								 | 
							
										list = std::move(m_queue);
							 | 
						|||
| 
								 | 
							
										m_notFull.notify_one();
							 | 
						|||
| 
								 | 
							
									}
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
									void Take(T& t)
							 | 
						|||
| 
								 | 
							
									{
							 | 
						|||
| 
								 | 
							
										std::unique_lock<std::mutex> locker(m_mutex);
							 | 
						|||
| 
								 | 
							
										m_notEmpty.wait(locker, [this] {return m_needStop || NotEmpty(); });
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
										if (m_needStop)
							 | 
						|||
| 
								 | 
							
											return;
							 | 
						|||
| 
								 | 
							
										t = m_queue.front();
							 | 
						|||
| 
								 | 
							
										m_queue.pop_front();
							 | 
						|||
| 
								 | 
							
										m_notFull.notify_one();
							 | 
						|||
| 
								 | 
							
									}
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
									void Stop()
							 | 
						|||
| 
								 | 
							
									{
							 | 
						|||
| 
								 | 
							
										{
							 | 
						|||
| 
								 | 
							
											std::lock_guard<std::mutex> locker(m_mutex);
							 | 
						|||
| 
								 | 
							
											m_needStop = true;
							 | 
						|||
| 
								 | 
							
										}
							 | 
						|||
| 
								 | 
							
										m_notFull.notify_all();
							 | 
						|||
| 
								 | 
							
										m_notEmpty.notify_all();
							 | 
						|||
| 
								 | 
							
									}
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
									bool Empty()
							 | 
						|||
| 
								 | 
							
									{
							 | 
						|||
| 
								 | 
							
										std::lock_guard<std::mutex> locker(m_mutex);
							 | 
						|||
| 
								 | 
							
										return m_queue.empty();
							 | 
						|||
| 
								 | 
							
									}
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
									bool Full()
							 | 
						|||
| 
								 | 
							
									{
							 | 
						|||
| 
								 | 
							
										std::lock_guard<std::mutex> locker(m_mutex);
							 | 
						|||
| 
								 | 
							
										return m_queue.size() == m_maxSize;
							 | 
						|||
| 
								 | 
							
									}
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
									size_t Size()
							 | 
						|||
| 
								 | 
							
									{
							 | 
						|||
| 
								 | 
							
										std::lock_guard<std::mutex> locker(m_mutex);
							 | 
						|||
| 
								 | 
							
										return m_queue.size();
							 | 
						|||
| 
								 | 
							
									}
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
									int Count()
							 | 
						|||
| 
								 | 
							
									{
							 | 
						|||
| 
								 | 
							
										return m_queue.size();
							 | 
						|||
| 
								 | 
							
									}
							 | 
						|||
| 
								 | 
							
								private:
							 | 
						|||
| 
								 | 
							
									bool NotFull() const
							 | 
						|||
| 
								 | 
							
									{
							 | 
						|||
| 
								 | 
							
										bool full = m_queue.size() >= m_maxSize;
							 | 
						|||
| 
								 | 
							
										if (full)
							 | 
						|||
| 
								 | 
							
											cout << "full, waiting,thread id: " << this_thread::get_id() << endl;
							 | 
						|||
| 
								 | 
							
										return !full;
							 | 
						|||
| 
								 | 
							
									}
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
									bool NotEmpty() const
							 | 
						|||
| 
								 | 
							
									{
							 | 
						|||
| 
								 | 
							
										bool empty = m_queue.empty();
							 | 
						|||
| 
								 | 
							
										if (empty)
							 | 
						|||
| 
								 | 
							
											cout << "empty,waiting,thread id: " << this_thread::get_id() << endl;
							 | 
						|||
| 
								 | 
							
										return !empty;
							 | 
						|||
| 
								 | 
							
									}
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
									template<typename F>
							 | 
						|||
| 
								 | 
							
									void Add(F&&x)
							 | 
						|||
| 
								 | 
							
									{
							 | 
						|||
| 
								 | 
							
										std::unique_lock< std::mutex> locker(m_mutex);
							 | 
						|||
| 
								 | 
							
										m_notFull.wait(locker, [this] {return m_needStop || NotFull(); });
							 | 
						|||
| 
								 | 
							
										if (m_needStop)
							 | 
						|||
| 
								 | 
							
											return;
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
										m_queue.push_back(std::forward<F>(x));
							 | 
						|||
| 
								 | 
							
										m_notEmpty.notify_one();
							 | 
						|||
| 
								 | 
							
									}
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								private:
							 | 
						|||
| 
								 | 
							
									std::list<T> m_queue; //缓冲区
							 | 
						|||
| 
								 | 
							
									std::mutex m_mutex; //互斥量和条件变量结合起来使用
							 | 
						|||
| 
								 | 
							
									std::condition_variable m_notEmpty;//不为空的条件变量
							 | 
						|||
| 
								 | 
							
									std::condition_variable m_notFull; //没有满的条件变量
							 | 
						|||
| 
								 | 
							
									int m_maxSize; //同步队列最大的size
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
									bool m_needStop; //停止的标志
							 | 
						|||
| 
								 | 
							
								};
							 | 
						|||
| 
								 | 
							
								```
							 | 
						|||
| 
								 | 
							
								```
							 | 
						|||
| 
								 | 
							
								#pragma once
							 | 
						|||
| 
								 | 
							
								#include<list>
							 | 
						|||
| 
								 | 
							
								#include<thread>
							 | 
						|||
| 
								 | 
							
								#include<functional>
							 | 
						|||
| 
								 | 
							
								#include<memory>
							 | 
						|||
| 
								 | 
							
								#include <atomic>
							 | 
						|||
| 
								 | 
							
								#include "SyncQueue.hpp"
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								const int MaxTaskCount = 100;
							 | 
						|||
| 
								 | 
							
								class ThreadPool
							 | 
						|||
| 
								 | 
							
								{
							 | 
						|||
| 
								 | 
							
								public:
							 | 
						|||
| 
								 | 
							
								    using Task = std::function<void()>;
							 | 
						|||
| 
								 | 
							
								    ThreadPool(int numThreads = std::thread::hardware_concurrency()) : m_queue(MaxTaskCount)
							 | 
						|||
| 
								 | 
							
								    {
							 | 
						|||
| 
								 | 
							
								        Start(numThreads);
							 | 
						|||
| 
								 | 
							
								    }
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								    ~ThreadPool(void)
							 | 
						|||
| 
								 | 
							
								    {
							 | 
						|||
| 
								 | 
							
								        //如果没有停止时则主动停止线程池
							 | 
						|||
| 
								 | 
							
								        Stop();
							 | 
						|||
| 
								 | 
							
								    }
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								    void Stop()
							 | 
						|||
| 
								 | 
							
								    {
							 | 
						|||
| 
								 | 
							
								        std::call_once(m_flag, [this]{StopThreadGroup(); }); //保证多线程情况下只调用一次StopThreadGroup
							 | 
						|||
| 
								 | 
							
								    }
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								    void AddTask(Task&&task)
							 | 
						|||
| 
								 | 
							
								    {
							 | 
						|||
| 
								 | 
							
								        m_queue.Put(std::forward<Task>(task));
							 | 
						|||
| 
								 | 
							
								    }
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								    void AddTask(const Task& task)
							 | 
						|||
| 
								 | 
							
								    {
							 | 
						|||
| 
								 | 
							
								        m_queue.Put(task);
							 | 
						|||
| 
								 | 
							
								    }
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								private:
							 | 
						|||
| 
								 | 
							
								    void Start(int numThreads)
							 | 
						|||
| 
								 | 
							
								    {
							 | 
						|||
| 
								 | 
							
								        m_running = true;
							 | 
						|||
| 
								 | 
							
								        //创建线程组
							 | 
						|||
| 
								 | 
							
								        for (int i = 0; i <numThreads; ++i)
							 | 
						|||
| 
								 | 
							
								        {
							 | 
						|||
| 
								 | 
							
								            m_threadgroup.push_back(std::make_shared<std::thread>(&ThreadPool::RunInThread, this));
							 | 
						|||
| 
								 | 
							
								        }
							 | 
						|||
| 
								 | 
							
								    }    
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								    void RunInThread()
							 | 
						|||
| 
								 | 
							
								    {
							 | 
						|||
| 
								 | 
							
								        while (m_running)
							 | 
						|||
| 
								 | 
							
								        {
							 | 
						|||
| 
								 | 
							
								            //取任务分别执行
							 | 
						|||
| 
								 | 
							
								            std::list<Task> list;
							 | 
						|||
| 
								 | 
							
								            m_queue.Take(list);
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								            for (auto& task : list)
							 | 
						|||
| 
								 | 
							
								            {
							 | 
						|||
| 
								 | 
							
								                if (!m_running)
							 | 
						|||
| 
								 | 
							
								                    return;
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								                task();
							 | 
						|||
| 
								 | 
							
								            }
							 | 
						|||
| 
								 | 
							
								        }
							 | 
						|||
| 
								 | 
							
								    }
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								    void StopThreadGroup()
							 | 
						|||
| 
								 | 
							
								    {
							 | 
						|||
| 
								 | 
							
								        m_queue.Stop(); //让同步队列中的线程停止
							 | 
						|||
| 
								 | 
							
								        m_running = false; //置为false,让内部线程跳出循环并退出
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								        for (auto thread : m_threadgroup) //等待线程结束
							 | 
						|||
| 
								 | 
							
								        {
							 | 
						|||
| 
								 | 
							
								            if (thread)
							 | 
						|||
| 
								 | 
							
								                thread->join();
							 | 
						|||
| 
								 | 
							
								        }
							 | 
						|||
| 
								 | 
							
								        m_threadgroup.clear();
							 | 
						|||
| 
								 | 
							
								    }
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								    std::list<std::shared_ptr<std::thread>> m_threadgroup; //处理任务的线程组
							 | 
						|||
| 
								 | 
							
								    SyncQueue<Task> m_queue; //同步队列     
							 | 
						|||
| 
								 | 
							
								    atomic_bool m_running; //是否停止的标志
							 | 
						|||
| 
								 | 
							
								    std::once_flag m_flag;
							 | 
						|||
| 
								 | 
							
								};
							 | 
						|||
| 
								 | 
							
								```
							 |