Algorithm-Pattern
  • Introduction
  • C & C++
    • C语言
      • C/C++编译器
      • 宏的使用
      • 编译过程
      • 指针 & 数组
      • 柔性数组
      • 函数指针 & 回调函数
      • C标准库之stdio
      • C标准库之string
      • C标准库之errno
      • C标准库之stdarg
      • C标准库之regex
    • C++基础语法
      • 自增(++) & 自减(--)
      • c语言到c++
      • 可变模板参数
      • 强制类型转换
      • C/C++类型转换的本质
      • 指针 & 引用
      • const的用法
      • static的用法
      • 重要的关键字(一)
      • 重要的关键字(二)
      • 内存申请和释放
      • 内联函数
      • 函数 & 运算符重载
      • 面向对象之封装
      • 构造函数 & 析构函数
      • 面向对象之继承
      • 面向对象之多态
      • 泛型编程
      • 异常
      • 再谈函数指针
    • C++并发编程
      • C++的锁
      • 并发与多线程
    • C++高级特性
      • 函数对象
      • 移动语义 & 完美转发
      • lambda表达式
      • RTTI技术
      • RAII技术
      • 智能指针
      • 模板的特化
      • C++静态库和动态库
      • 内存溢出和内存泄漏
    • STL基础
      • String
      • array/vector/list
      • deque/priority_queue
      • set/map
      • unordered_set/unordered_map
      • algorithm_1
      • functional
      • allocator
    • C++标准库
      • IO
      • Tuple
      • regex
      • bitset
      • numeric
    • STL深入源码
      • vector内部实现
      • deque内部实现
      • sort函数实现
    • 第三方库
      • JsonCpp
      • ProtoBuf
  • 数据结构
    • 线性表
    • 字符串
    • 栈和队列
    • 二叉树
    • 平衡二叉树
    • 平衡多路搜索树
    • 树结构的延申
    • 图
    • 二进制
    • 散列表
  • 算法基础
    • 排序算法
    • 查找算法
    • 数学问题
    • 并查集
    • 递归算法
    • 附加——主定理
    • Catalan数
  • 算法设计思想
    • 滑动窗口思想
    • BFS/DFS
    • 二分法
    • 回溯法
    • 贪心算法
    • 分治法
    • 动态规划
    • 分支限界算法
    • 有限状态机(FSM)
  • LeetCode系列
    • 死磕二叉树
    • 股票买卖问题
    • 打家劫舍问题
    • 跳跃游戏问题
    • 括号匹配问题
    • 石子游戏问题
    • 子序列问题
    • 数组 & 矩阵
    • 排列 & 组合
  • 经典算法问题
    • 几何问题
    • 区间问题
    • 背包问题
    • 石子堆问题
    • 表达式求值
  • 面试题
    • 数据结构和算法基础
    • 程序设计题
      • 实现双线程交替打印
      • C++实现读写锁
      • 实现阻塞队列
      • 实现环形队列
      • 实现线程池
      • 实现智能指针
      • 实现string类
      • 实现高性能local cache
      • 实现内存池
      • 生产者-消费者模型
      • 设计定时器
    • 经典的算法题
    • C++面试题总结
    • 面试算法题总结
由 GitBook 提供支持
在本页
  • 线程池
  • 1、作用:异步解耦
  • 2、组成
  • 3、TODO

这有帮助吗?

  1. 面试题
  2. 程序设计题

实现线程池

上一页实现环形队列下一页实现智能指针

最后更新于4年前

这有帮助吗?

线程池

1、作用:异步解耦

线程池的运行逻辑:线程池中的线程在不断竞争任务队列中的任务。

  1. 任务队列

  2. 线程集合

  3. 管理组件

  1. 增加线程

  2. 删除过度冗余的线程

  3. 快手面试题:查询一个任务是否被执行完,生产者消费者模式(任务队列用环形阻塞队列即可)

  4. 头条面试题:如果一个任务在执行过程中线程被中断(也许是时钟周期已到,也许是等待资源,也许是被中断,等等),后面怎样保证继续被同一个线程执行。

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>

#define LL_ADD(item, list) do { \
	item->prev = NULL;          \
	item->next = list;          \
	if (list != NULL)           \
		list->prev = item;      \
	list = item;                \
}while(0) // 和使用时的分号做统一

#define LL_REMOVE(item, list) do {     \
	if(item->prev != NULL)             \
		item->prev->next = item->next; \
	if(item->next != NULL)             \
		item->next->prev = item->prev; \
	if(item == list)                   \
		list = item->next;             \
	item->prev = item->next = NULL;    \
}while(0)
// 任务结构体
struct NJOB {
	void (*func)(int arg, pthread_t id);
	int user_data;
	pthread_t id;

	struct NJOB *next;
	struct NJOB *prev;
};

struct NWORKER {
	pthread_t id;
	int terminate; // 是否工作

	struct NMANAGER *pool;

	struct NWORKER *prev;
	struct NWORKER *next;
};

typedef struct NMANAGER {
	pthread_mutex_t mtx;
	pthread_cond_t cond;

	struct NJOB *jobs;
	struct NWORKER *workers;
}nThreadPool;

// 线程的回调函数
void *thread_cb(void *arg){
	struct NWORKER *worker = (struct NWORKER *)arg;

	while (1) {
		pthread_mutex_lock(&worker->pool->mtx);
		// 没有任务则等待
		while(worker->pool->jobs == NULL){
			if (worker->terminate)
				break;
			pthread_cond_wait(&worker->pool->cond, &worker->pool->mtx);
		}
		// 执行任务
		struct NJOB *job = worker->pool->jobs;
		if(job != NULL){
			LL_REMOVE(job, worker->pool->jobs);
		}
		pthread_mutex_unlock(&worker->pool->mtx);
		job->func(job->user_data, worker->id);
	}
	free(worker);
	pthread_exit(NULL);
}

// SDK
int nThreadPoolCreate(nThreadPool *pool, int numWorkers){
	// 参数判断
	if(numWorkers <= 0)
		numWorkers = 1;
	if(pool == NULL)
		return -1;
	memset(pool, 0, sizeof(nThreadPool));

	pthread_mutex_t blank_mutex = PTHREAD_MUTEX_INITIALIZER;
	memcpy(&pool->mtx, &blank_mutex, sizeof(pthread_mutex_t));

	pthread_cond_t blank_cond = PTHREAD_COND_INITIALIZER;
	memcpy(&pool->cond, &blank_cond, sizeof(pthread_cond_t));

	// workers
	int i = 0;
	for(i = 0; i < numWorkers; i ++){
		struct NWORKER *worker = (struct NWORKER*)malloc(sizeof(struct NWORKER));
		if(worker == NULL)
		{
			perror("malloc error");
			return -2;
		}
		memset(worker, 0, sizeof(struct NWORKER));
		worker->pool = pool;
		pthread_create(&worker->id, NULL, thread_cb, worker);
		LL_ADD(worker, pool->workers);
	}
	return 0;
}

int nThreadPoolDestory(nThreadPool *pool){
	struct NWORKER *worker = NULL;
	for(worker = pool->workers; worker != NULL; worker = worker->next){
		worker->terminate = 1;
	}
	pthread_mutex_lock(&pool->mtx);
	pthread_cond_broadcast(&pool->cond);
	pthread_mutex_unlock(&pool->mtx);
}

int nThreadPoolPush(nThreadPool *pool, struct NJOB *job){
	pthread_mutex_lock(&pool->mtx);
	LL_ADD(job, pool->jobs);
	pthread_cond_signal(&pool->cond);
	pthread_mutex_unlock(&pool->mtx);
	return 0;
}

void printer(int arg, pthread_t id){
	printf("thread %ld -- %d\n", id, arg);
	usleep(2);
}

#if 1
// 0 --> 1000
int main(){
	nThreadPool *mem_pool = (nThreadPool *)malloc(sizeof(nThreadPool));
	int re = nThreadPoolCreate(mem_pool, 10);
	if(re == 0){
		int i = 0;
		for(i = 0; i <= 1000; i++){
			struct NJOB *job = (struct NJOB*)malloc(sizeof(struct NJOB));
			job->func = printer;
			job->user_data = i;
			nThreadPoolPush(mem_pool, job);
		}
	}
	sleep(3);
	nThreadPoolDestory(mem_pool);
	return 0;
}
#endif
// ThreadPool.h
#ifndef ALGORITHM_PATTERN_CODE_THREADPOOL_H
#define ALGORITHM_PATTERN_CODE_THREADPOOL_H
#include <mutex>
#include <condition_variable>
#include <thread>
#include <queue>
#include <functional>
#include <list>

using namespace std;

struct Task{
    void (*func)(int arg, thread::id workerID);
    int user_data;
    thread::id workerID;
    Task() : user_data(0){}
};

class ThreadPool {
public:
    ThreadPool(int numWorkers, int maxNum = 20) : max_count(maxNum) {
        if(maxNum <= 0)
            maxNum = 20;
        if(numWorkers > maxNum)
            numWorkers = maxNum;
        if(numWorkers <= 0)
            numWorkers = 1;
        num_workers = numWorkers;
        terminate = false;
        int res = threadCreate(num_workers);
    }
    ~ThreadPool(){
        threadDestory();
    }
    int addOneTask(Task *task);

private:
    int threadCreate(int numWorkers);
    int threadDestory();
    void threadCallback();

    ThreadPool(const ThreadPool&); //禁止复制拷贝.
    const ThreadPool& operator=(const ThreadPool&);

private:
    std::mutex mtx;
    std::condition_variable cond;

    priority_queue<Task *> tasks;
    vector<thread *> workers;

    int num_workers;
    const int max_count;

    bool terminate;
};

int testThreadPool();

#endif //ALGORITHM_PATTERN_CODE_THREADPOOL_H

// ThreadPool.cpp
#include "../../include/tools/ThreadPool.h"
#include <iostream>
#include <algorithm>
#include <windows.h>

void ThreadPool::threadCallback(){
    while(true){
        // 没有任务则等待
        unique_lock<mutex> lock(mtx);
        cond.wait(lock, [this](){return terminate || !tasks.empty();});
        if(terminate)
            break;
        Task *task = tasks.top();
        if(task != nullptr){
            tasks.pop();
        }
        task->workerID = this_thread::get_id();
        lock.unlock();
        task->func(task->user_data, task->workerID);
    }
}
int ThreadPool::addOneTask(Task *task){
    unique_lock<mutex> lock(mtx);
    tasks.push(task);
    cond.notify_one();
    return 0;
}
int ThreadPool::threadCreate(int numWorkers){
    for(int i = 0; i < numWorkers; i++){
        thread *worker = new thread(std::bind(&ThreadPool::threadCallback, this));
        if(worker == nullptr){
            cout << "new thread error" << endl;
            return -1;
        }
        workers.push_back(worker);
    }
    return 0;
}
int ThreadPool::threadDestory(){
    std::unique_lock<std::mutex> lock(mtx);
    terminate = true;
    lock.unlock();
    cond.notify_all();
    for (thread *& worker : workers)
    {
        worker->join();
        delete worker;
    }
    workers.clear();
    return 0;
}

void printer(int arg, thread::id workerID){
    cout << "thread" << " " << workerID << " " << arg << endl;
    Sleep(5);
}

// 0 --> 1000
int testThreadPool(){
    ThreadPool threadPool(10);
    int i = 0;
    for(i = 0; i <= 1000; i++){
        Task *task = new Task();
        task->func = printer;
        task->user_data = i;
        threadPool.addOneTask(task);
    }
    Sleep(8000);
    return 0;
}

2、组成

3、TODO

🖋️
🖋️
✏️
🖋️