1 什么是线程池
线程池从本质上可以看做是一个多生产者多消费者的多线程应用。
一个线程池包括以下四个基本组成部分:
- 线程池管理器:用于创建并管理线程池,包括创建线程池,销毁线程池,添加新的工作线程,添加工作任务;
- 工作线程:属于线程池中的线程,用于处理实际任务,在没有工作任务时等待,在任务队列不为空时主动获取任务并处理任务;
- 任务接口:每个任务必须实现的接口,以供工作线程调度任务的执行;
- 工作任务队列:用于存放需要处理的工作任务,采用先进先出机制;
线程池根据机器性能预先创建多个工作线程,位于主线程的线程池接收到工作任务并存入到工作任务队列中,工作线程从工作队列中取出工作任务进行处理,如果工作队列为空,则工作线程进入挂起状态。
2 C++实现一个线程池
在C++中实现一个线程池,通过对线程池的特性分析,线程池主要有以下功能:
- 线程池可以创建给定数量的工作线程,工作线程执行在子线程中开启任务函数,并等待工作队列的新任务
- 线程池可主动关闭线程池并结束所创建的工作线程;
- 线程池对象可被多个工作线程互斥访问,我们可以将线程池看作为一个单例,而这个单例可满足被多个线程互斥访问,这需要实现一个线程安全的单例模式;
- 需要实现一个线程安全的队列,在队列中有新任务压入时通知工作线程领取工作任务,当队列为空时阻塞线程,避免资源开销;并支持主动解锁。
2.1 线程安全的单例类
Singleton.h
#ifndef SINGLETON_H
#define SINGLETON_H
#include <memory>
#include <mutex>
template<typename T>
class Singleton
{
public:
// 获取全局单例对象
template<typename ...Args>
static std::shared_ptr<T> GetInstance(Args&&... args)
{
if (!m_pSingleton)
{
std::lock_guard<std::mutex> gLock(m_Mutex);
if (nullptr == m_pSingleton)
{
m_pSingleton = std::make_shared<T>(std::forward<Args>(args)...);
}
}
return m_pSingleton;
}
// 主动析构单例对象(提供接口,但是不建议主动调用)
static void DeleteInstance()
{
if (m_pSingleton != nullptr)
{
m_pSingleton.reset();
m_pSingleton = nullptr;
}
}
private:
explicit Singleton();
Singleton(const Singleton&) = delete;
Singleton& operator=(const Singleton&) = delete;
~Singleton() = default;
private:
static std::shared_ptr<T> m_pSingleton;
static std::mutex m_Mutex;
};
template<typename T>
std::shared_ptr<T> Singleton<T>::m_pSingleton = nullptr;
template<typename T>
std::mutex Singleton<T>::m_Mutex;
#endif
2.2 线程安全的队列类
参考项目链接:https://github.com/alfredopons/queue-thread-safe
并对该线程安全类进行了魔改,主要增加了不管当前队列是否有工作任务,都可强制退出的新功能,方便线程池退出。
SafeQueue.hpp
/*
* SafeQueue.hpp
* Copyright (C) 2019 Alfredo Pons Menargues <apons@linucleus.com>
*
* This program is free software: you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the
* Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
* See the GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef SAFEQUEUE_HPP_
#define SAFEQUEUE_HPP_
#include <queue>
#include <list>
#include <mutex>
#include <thread>
#include <cstdint>
#include <condition_variable>
/** A thread-safe asynchronous queue */
template <class T, class Container = std::list<T>>
class SafeQueue
{
typedef typename Container::value_type value_type;
typedef typename Container::size_type size_type;
typedef Container container_type;
public:
/*! Create safe queue. */
SafeQueue() = default;
SafeQueue (SafeQueue&& sq)
{
m_queue = std::move (sq.m_queue);
}
SafeQueue (const SafeQueue& sq)
{
std::lock_guard<std::mutex> lock (sq.m_mutex);
m_queue = sq.m_queue;
}
/*! Destroy safe queue. */
~SafeQueue()
{
std::lock_guard<std::mutex> lock (m_mutex);
}
/**
* Sets the maximum number of items in the queue. Defaults is 0: No limit
* \param[in] item An item.
*/
void set_max_num_items (unsigned int max_num_items)
{
m_max_num_items = max_num_items;
}
/**
* Pushes the item into the queue.
* \param[in] item An item.
* \return true if an item was pushed into the queue
*/
bool push (const value_type& item)
{
std::lock_guard<std::mutex> lock (m_mutex);
if (m_max_num_items > 0 && m_queue.size() > m_max_num_items)
return false;
m_queue.push (item);
m_condition.notify_one();
return true;
}
/**
* Pushes the item into the queue.
* \param[in] item An item.
* \return true if an item was pushed into the queue
*/
bool push (const value_type&& item)
{
std::lock_guard<std::mutex> lock (m_mutex);
if (m_max_num_items > 0 && m_queue.size() > m_max_num_items)
return false;
m_queue.push (item);
m_condition.notify_one();
return true;
}
/**
* Pops item from the queue. If queue is empty, this function blocks until item becomes available.
* \param[out] item The item.
*/
void pop (value_type& item)
{
std::unique_lock<std::mutex> lock (m_mutex);
m_condition.wait (lock, [this]() // Lambda funct
{
return !m_queue.empty() || m_Unlock;
});
if (!m_queue.empty())
{
item = m_queue.front();
m_queue.pop();
}
}
/**
* Pops item from the queue using the contained type's move assignment operator, if it has one..
* This method is identical to the pop() method if that type has no move assignment operator.
* If queue is empty, this function blocks until item becomes available.
* \param[out] item The item.
*/
void move_pop (value_type& item)
{
std::unique_lock<std::mutex> lock (m_mutex);
m_condition.wait (lock, [this]() // Lambda funct
{
return !m_queue.empty() || m_Unlock;
});
if (!m_queue.empty())
{
item = std::move(m_queue.front());
m_queue.pop();
}
}
/**
* Tries to pop item from the queue.
* \param[out] item The item.
* \return False is returned if no item is available.
*/
bool try_pop (value_type& item)
{
std::unique_lock<std::mutex> lock (m_mutex);
if (m_queue.empty())
return false;
item = m_queue.front();
m_queue.pop();
return true;
}
/**
* Tries to pop item from the queue using the contained type's move assignment operator, if it has one..
* This method is identical to the try_pop() method if that type has no move assignment operator.
* \param[out] item The item.
* \return False is returned if no item is available.
*/
bool try_move_pop (value_type& item)
{
std::unique_lock<std::mutex> lock (m_mutex);
if (m_queue.empty())
return false;
item = std::move (m_queue.front());
m_queue.pop();
return true;
}
/**
* Pops item from the queue. If the queue is empty, blocks for timeout microseconds, or until item becomes available.
* \param[out] t An item.
* \param[in] timeout The number of microseconds to wait.
* \return true if get an item from the queue, false if no item is received before the timeout.
*/
bool timeout_pop (value_type& item, std::uint64_t timeout)
{
std::unique_lock<std::mutex> lock (m_mutex);
if (m_queue.empty())
{
if (timeout == 0)
return false;
if (m_condition.wait_for (lock, std::chrono::microseconds (timeout)) == std::cv_status::timeout)
return false;
}
item = m_queue.front();
m_queue.pop();
return true;
}
/**
* Pops item from the queue using the contained type's move assignment operator, if it has one..
* If the queue is empty, blocks for timeout microseconds, or until item becomes available.
* This method is identical to the try_pop() method if that type has no move assignment operator.
* \param[out] t An item.
* \param[in] timeout The number of microseconds to wait.
* \return true if get an item from the queue, false if no item is received before the timeout.
*/
bool timeout_move_pop (value_type& item, std::uint64_t timeout)
{
std::unique_lock<std::mutex> lock (m_mutex);
if (m_queue.empty())
{
if (timeout == 0)
return false;
if (m_condition.wait_for (lock, std::chrono::microseconds (timeout)) == std::cv_status::timeout)
return false;
}
item = std::move (m_queue.front());
m_queue.pop();
return true;
}
/**
* Gets the number of items in the queue.
* \return Number of items in the queue.
*/
size_type size() const
{
std::lock_guard<std::mutex> lock (m_mutex);
return m_queue.size();
}
/**
* Check if the queue is empty.
* \return true if queue is empty.
*/
bool empty() const
{
std::lock_guard<std::mutex> lock (m_mutex);
return m_queue.empty();
}
/**
* Swaps the contents.
* \param[out] sq The SafeQueue to swap with 'this'.
*/
void swap (SafeQueue& sq)
{
if (this != &sq)
{
std::lock_guard<std::mutex> lock1 (m_mutex);
std::lock_guard<std::mutex> lock2 (sq.m_mutex);
m_queue.swap (sq.m_queue);
if (!m_queue.empty())
m_condition.notify_all();
if (!sq.m_queue.empty())
sq.m_condition.notify_all();
}
}
/*! The copy assignment operator */
SafeQueue& operator= (const SafeQueue& sq)
{
if (this != &sq)
{
std::lock_guard<std::mutex> lock1 (m_mutex);
std::lock_guard<std::mutex> lock2 (sq.m_mutex);
std::queue<T, Container> temp {sq.m_queue};
m_queue.swap (temp);
if (!m_queue.empty())
m_condition.notify_all();
}
return *this;
}
/*! The move assignment operator */
SafeQueue& operator= (SafeQueue && sq)
{
std::lock_guard<std::mutex> lock (m_mutex);
m_queue = std::move (sq.m_queue);
if (!m_queue.empty()) m_condition.notify_all();
return *this;
}
void lock()
{
m_Unlock = false;
}
void unlock()
{
m_Unlock = true;
m_condition.notify_all();
}
void clear()
{
std::lock_guard<std::mutex> lock(m_mutex);
while (!m_queue.empty())
{
m_queue.pop();
}
}
private:
std::queue<T, Container> m_queue;
mutable std::mutex m_mutex;
std::condition_variable m_condition;
unsigned int m_max_num_items = 0;
std::atomic<bool> m_Unlock = false;
};
/*! Swaps the contents of two SafeQueue objects. */
template <class T, class Container>
void swap (SafeQueue<T, Container>& q1, SafeQueue<T, Container>& q2)
{
q1.swap (q2);
}
#endif /* SAFEQUEUE_HPP_ */
2.3 线程池
ThreadPool.h
#ifndef THREAD_POOL_H
#define THREAD_POOL_H
#include <vector>
#include <thread>
#include <atomic>
#include "SafeQueue.hpp"
#include "ThreadWorker.h"
#include "Singleton.h"
class ThreadPool
{
public:
ThreadPool():
m_bThreadPoolStop(false)
{
}
static std::shared_ptr<ThreadPool> GetSingleton()
{
return Singleton<ThreadPool>::GetInstance();
}
void CreateThreads(unsigned int worker_num)
{
for (int i = 0; i < worker_num; i++)
{
std::shared_ptr<ThreadWorker> pThreadWorker = std::make_shared<ThreadWorker>(i);
std::shared_ptr<std::thread> pThread = std::make_shared<std::thread>(&ThreadWorker::Run, pThreadWorker);
m_ThreadWorkers.push_back(pThreadWorker);
m_Threads.push_back(pThread);
}
}
virtual~ThreadPool()
{
m_TaskQueue.clear();
}
void AddTask(const std::string& task_str)
{
m_TaskQueue.push(task_str);
}
bool IsStop()
{
return m_bThreadPoolStop;
}
void Stop()
{
m_bThreadPoolStop = true;
m_TaskQueue.unlock();
for (int i = 0; i < m_Threads.size(); ++i)
{
if (m_Threads[i]->joinable())
{
m_Threads[i]->join();
}
}
}
SafeQueue<std::string>& GetTaskQueue()
{
return m_TaskQueue;
}
private:
std::atomic<bool> m_bThreadPoolStop;
SafeQueue<std::string> m_TaskQueue;
std::vector<std::shared_ptr<ThreadWorker>> m_ThreadWorkers;
std::vector<std::shared_ptr<std::thread>> m_Threads;
};
#endif // !THREAD_POOL_H
2.4 工作线程
2.4.1 ThreadWorker.h
#ifndef THREAD_WORKER_H
#define THREAD_WORKER_H
class ThreadPool;
class ThreadWorker
{
public:
ThreadWorker();
ThreadWorker(int thread_index);
virtual~ThreadWorker();
void Run();
private:
int m_ThreadIndex;
};
#endif // !THREAD_WORKER_H
2.4.2 ThreadWorker.cpp
#include <iostream>
#include "ThreadWorker.h"
#include "ThreadPool.h"
ThreadWorker::ThreadWorker()
{
}
ThreadWorker::ThreadWorker(int thread_index)
:m_ThreadIndex(-1)
{
m_ThreadIndex = thread_index;
std::cout << "线程" << m_ThreadIndex << "被创建" << std::endl;
}
ThreadWorker::~ThreadWorker()
{
}
void ThreadWorker::Run()
{
while (!ThreadPool::GetSingleton()->IsStop())
{
std::string task_str = "";
ThreadPool::GetSingleton()->GetTaskQueue().pop(task_str);
if (!task_str.empty())
{
std::cout << "线程" << m_ThreadIndex << "处理了" << task_str << std::endl;
}
}
std::cout << "子线程"<<m_ThreadIndex<< "退出" << std::endl;
}
2.5 线程池测试代码
main.cpp
#include <vld.h>
#include <iostream>
#include <string>
#include "ThreadPool.h"
int main()
{
ThreadPool::GetSingleton()->CreateThreads(10);
std::thread prod_thread1([&]() {
for(int i=0;i<3000;i++)
{
ThreadPool::GetSingleton()->AddTask(std::to_string(i));
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
std::cout << "生产线程1退出" << std::endl;
});
std::thread prod_thread2([&]() {
for (int i = 0; i < 3000; i++)
{
ThreadPool::GetSingleton()->AddTask(std::to_string(i));
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
std::cout << "生产线程2退出" << std::endl;
});
if(prod_thread1.joinable())
prod_thread1.join();
if (prod_thread2.joinable())
prod_thread2.join();
ThreadPool::GetSingleton()->Stop();
std::cout << "执行完成" << std::endl;
return 0;
}
执行结果:
本文作者:StubbornHuang
版权声明:本文为站长原创文章,如果转载请注明原文链接!
原文标题:C++11 – 构建一个符合实际应用要求的线程池
原文链接:https://www.stubbornhuang.com/1808/
发布于:2021年11月08日 14:53:57
修改于:2023年06月26日 21:08:33
声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。
评论
50