### 使用c++11开发一个半同步半异步线程池 ``` #pragma once #include #include #include #include #include using namespace std; template class SyncQueue { public: SyncQueue(int maxSize) :m_maxSize(maxSize), m_needStop(false) { } void Put(const T&x) { Add(x); } void Put(T&&x) { Add(std::forward(x)); } void Take(std::list& list) { std::unique_lock locker(m_mutex); m_notEmpty.wait(locker, [this] {return m_needStop || NotEmpty(); }); if (m_needStop) return; list = std::move(m_queue); m_notFull.notify_one(); } void Take(T& t) { std::unique_lock locker(m_mutex); m_notEmpty.wait(locker, [this] {return m_needStop || NotEmpty(); }); if (m_needStop) return; t = m_queue.front(); m_queue.pop_front(); m_notFull.notify_one(); } void Stop() { { std::lock_guard locker(m_mutex); m_needStop = true; } m_notFull.notify_all(); m_notEmpty.notify_all(); } bool Empty() { std::lock_guard locker(m_mutex); return m_queue.empty(); } bool Full() { std::lock_guard locker(m_mutex); return m_queue.size() == m_maxSize; } size_t Size() { std::lock_guard locker(m_mutex); return m_queue.size(); } int Count() { return m_queue.size(); } private: bool NotFull() const { bool full = m_queue.size() >= m_maxSize; if (full) cout << "full, waiting,thread id: " << this_thread::get_id() << endl; return !full; } bool NotEmpty() const { bool empty = m_queue.empty(); if (empty) cout << "empty,waiting,thread id: " << this_thread::get_id() << endl; return !empty; } template void Add(F&&x) { std::unique_lock< std::mutex> locker(m_mutex); m_notFull.wait(locker, [this] {return m_needStop || NotFull(); }); if (m_needStop) return; m_queue.push_back(std::forward(x)); m_notEmpty.notify_one(); } private: std::list m_queue; //缓冲区 std::mutex m_mutex; //互斥量和条件变量结合起来使用 std::condition_variable m_notEmpty;//不为空的条件变量 std::condition_variable m_notFull; //没有满的条件变量 int m_maxSize; //同步队列最大的size bool m_needStop; //停止的标志 }; ``` ``` #pragma once #include #include #include #include #include #include "SyncQueue.hpp" const int MaxTaskCount = 100; class ThreadPool { public: using Task = std::function; ThreadPool(int numThreads = std::thread::hardware_concurrency()) : m_queue(MaxTaskCount) { Start(numThreads); } ~ThreadPool(void) { //如果没有停止时则主动停止线程池 Stop(); } void Stop() { std::call_once(m_flag, [this]{StopThreadGroup(); }); //保证多线程情况下只调用一次StopThreadGroup } void AddTask(Task&&task) { m_queue.Put(std::forward(task)); } void AddTask(const Task& task) { m_queue.Put(task); } private: void Start(int numThreads) { m_running = true; //创建线程组 for (int i = 0; i (&ThreadPool::RunInThread, this)); } } void RunInThread() { while (m_running) { //取任务分别执行 std::list list; m_queue.Take(list); for (auto& task : list) { if (!m_running) return; task(); } } } void StopThreadGroup() { m_queue.Stop(); //让同步队列中的线程停止 m_running = false; //置为false,让内部线程跳出循环并退出 for (auto thread : m_threadgroup) //等待线程结束 { if (thread) thread->join(); } m_threadgroup.clear(); } std::list> m_threadgroup; //处理任务的线程组 SyncQueue m_queue; //同步队列 atomic_bool m_running; //是否停止的标志 std::once_flag m_flag; }; ```