构建基于 阻塞队列 / 环形队列 的高效生产消费者模型系统
1. 生产者-消费者问题 概述
生产-消费者模型
:一个或多个 生产者线程 产生数据并将其放入共享缓冲区,同时一个或多个 消费者线程 从该缓冲区中读取数据进行操作的情景。
缓冲区
是一个用于存储生产者产生数据的中间容器;缓冲区
的容量通常是有限的,只能容纳一定数量的数据项。
生产 - 消费者模型的核心问题: 1. 线程与线程之间的 同步
与 互斥
关系;生产者与消费者之间的 等待
和 唤醒
机制。
2. 阻塞队列
2.1 工作原理
-
队列为空时,消费者从队列中消费数据时,会被阻塞挂起,直至队列中有元素可用;
-
队列为满时,生产者向队列中生产数据时,会被阻塞挂起,直至队列中有空闲位置;
-
使用 条件变量(pthread_cond_t 类型)+ 锁(pthread_mutex_t 类型)实现 等待 和 唤醒 机制。
2.2 框架
#include <iostream>
#include <queue>
#include <pthread.h>template<class T>
class BlockQueue
{
public:BlockQueue(int cap) :_cap(cap), _producer_wait_num(0), _consumer_wait_num(0){pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_producer_cond, nullptr);pthread_cond_init(&_consumer_cond, nullptr);}~BlockQueue() {pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_producer_cond);pthread_cond_destroy(&_consumer_cond);}
private:queue<T> _block_queue; // 生产者生产任务,消费者处理任务int _cap; // 容量pthread_mutex_t _mutex; // 保护共享资源的互斥锁pthread_cond_t _producer_cond; // 用于唤醒生产者生产pthread_cond_t _consumer_cond; // 用于唤醒消费者消费int _producer_wait_num; // 记录阻塞挂起的生产者数量int _consumer_wait_num; // 记录阻塞挂起的消费者数量
};
pthread_mutex_t _mutex; // 保护共享资源的互斥锁
解释 “为什么消费者和生产者共用一把锁” :
-
阻塞队列是一个共享资源,生产者和消费者都需要访问同一段内存资源来插入和删除元素;为了防止数据竞争和不一致,必须保证同一时间只有一个线程可以访问队列。
-
使用单个锁可以确保生产者和消费者访问队列时不会发生冲突。
2.3 生产
与 消费
- 生产
template<class T>
class BlockQueue
{
public:void Enqueue(T& in){phtread_mutex_lock(&_mutex); // 加锁while (_block_queue.size() == _cap) // 阻塞队列为满{++_producer_wait_num;pthread_cond_signal(&_consumer_cond); // 唤醒消费者线程pthread_cond_wait(&_producer_cond, &_mutex); // 阻塞挂起, 释放锁--_producer_wait_num;}_block_queue.push(in);if (_consumer_wait_num > 0) // 唤醒 阻塞挂起的消费者线程pthread_cond_signal(&_consumer_cond); pthread_mutex_unlock(&_mutex); // 解锁}
};
- 消费
template<class T>
class BlockQueue
{
public:void Pop(T* out){pthread_mutex_lock(&mutex);while (_block_queue.empty()){++_consumer_wait;pthread_cond_signal(&_producer_cond); // 唤醒生产者进行生产phread_cond_wait(&consumer_cond, &mutex);--_consumer_wait;}*out = _block_queue.front();_block_queue.pop();if (_producer_wait_num > 0)pthread_cond_signal(&_producer_cond);pthread_mutex_unlock(&mutex);}
};
3. 环形队列 + 信号量
3.1 信号量
信号量(semaphore)
是一种用于控制多个线程对共享资源的访问的同步机制。
举个形象的例子:
如果把阻塞队列比作电影院,那么信号量就能把电影院的座位划分为可用座位(空位)和已占用座位(已填位),供线程同步使用。
进一步解释:
阻塞队列确保同一时间只有一个线程可以访问电影院的入口门(即队列的互斥访问);信号量的出现,使得不同线程可以访问不同的座位(即队列的不同位置),从而实现高效的同步和资源管理。
// 头文件
#include <semaphore.h>// 信号量类型: sem_tint sem_init(sem_t* sem, int pshared, unsigned int value);
// sem: 指向要初始化的信号量的指针;
// pshared: 指定信号量是否可以在多个进程间共享;设置为 0,信号量仅在当前进程(各个线程之间)中共享;
// value: 初始化信号量的值,这个值表示信号量的初始计数值,用于控制资源的数量。int sem_destroy(sem_t* sem);int sem_wait(sem_t* sem); // P 操作
// 调用 sem_wait 时,如果信号量的值大于 0,函数会立即将信号量的值-1 并返回;
// 如果信号量的值为 0,调用该函数的线程会被阻塞挂起,直至信号量的值大于 0。int sem_post(sem_t* sem); // V 操作
// 调用 sem_post 时,信号量的值会 +1;
// 如果此前有线程因调用 sem_wait 而被阻塞,那么其中一个被阻塞的线程会被唤醒并继续执行。// 返回值:success -> 0 ; fail -> -1, 并设置相应的错误码
引入信号量,可以让环形队列中同时存在两把互斥锁,用于处理多生产多消费的场景,而不用担心生产者与消费者线程之间出现数据竞争和不一致的情况。
定义信号量: _room —— 生产者申请空间资源 ;_data —— 消费者申请数据资源
初始化: sem_init(&_room, 0, _cap); sem_init(&_data, 0, 0 );
不考虑环形队列中空出一个位置作为分隔,当 生产者 和 消费者 指向同一个位置时,只可能是以下两种情况:
-
队列为空。
消费者无法申请到当前位置的数据资源,只能被阻塞挂起;
生产者可以在当前位置申请到空间资源。
-
队列为满。
生产者无法申请到当前位置的空间资源,只能被阻塞挂起;
消费者可以在当前位置申请到数据资源。
3.2 框架
template <class T>
class RingQueue
{
public:RingQueue(int cap) :_cap(cap), _ring_queue(cap), _producer_pos(0), _consumer_pos(0){pthread_mutex_init(&_producer_mutex, nullptr);pthread_mutex_init(&_producer_mutex, nullptr);sem_init(&_producer_sem, 0, cap);sem_init(&_consumer_sem, 0, 0);}~RingQueue(){sem_destroy(&_producer_sem);sem_destroy(&_consumer_sem);pthread_mutex_destroy(&_producer_mutex);pthread_mutex_destroy(&_consumer_mutex);}
private:vector<T> _ring_queue;int _cap;int _producer_pos;int _consumer_pos;sem_t _producer_sem;sem_t _consumer_sem;pthread_mutex_t _producer_mutex;pthread_mutex_t _consumer_mutex;
};
3.3 生产 与 消费
template <class T>
class RingQueue
{
private:int P(sem_t& sem){return sem_wait(&sem);}int V(sem_t& sem){return sem_post(&sem);}
public:void Enqueue(T& in){P(_producer_sem); // 先申请资源,再申请锁// 运行到这里,就一定申请到了合法资源!!pthread_mutex_lock(&_producer_mutex);_ring_queue[_producer_pos++] = in;_producer_pos %= _cap;pthread_mutex_unlock(&_producer_mutex);V(_consumer_sem);}void Pop(T* out){p(_consumer_sem);pthread_mutex_lock(&_consumer_mutex);*out = _ring_queue[_consumer_pos++];_consumer_pos %= _cap;pthread_mutex_unlock(&_consumer_mutex);V(_producer_sem);}
};
4. 使用阻塞队列 / 环形队列构建生产消费者模型
4.1 生产-消费者模型
概述
生产者-消费者模型是一类多线程编程问题,涉及一次性创建多个生产者线程和消费者线程,并将其高效组织起来;
生产者线程负责生产任务并将其放入共享队列中,消费者线程负责将任务取出并执行;
阻塞队列 / 环形队列 作为底层容器,用于安全地存储和传输这些任务,确保生产者与消费者之间的同步和互斥。
生产-消费者模型的宏观逻辑如下:
- 创建阻塞队列 / 环形队列
- 创建生产者线程
- 创建消费者线程
- 启动所有线程
- 等待线程终止
template<class T> using blockqueue_t = blockqueue<T>;int main() {blockqueue_t<int> *bq = new blockqueue_t<int>(5);vector<Thread<blockqueue_t<int>>> threads; // Thread 为封装的类Producer(threads, *bq, 1);Consumer(threads, *bq, 1);StartAll(threads);WaitAll(threads);return 0; }
其中,最重要的是确保所有生产者线程和消费者线程都能访问同一份共享资源。
以创建生产者线程为示例,展示如何确保所有生产者线程能够访问同一份共享资源。
4.2 创建生产者线程
Producer(threads, *bq, 1);
// main() 创建生产者线程
在 Producer
函数的形参中,要通过引用传递 threads
和 bq
,确保所有生产者线程可以访问同一个共享队列 。
void ProducerCore(blockqueue_t<int>& bq)
{int cnt = 10;while (true){bq.Enqueue(cnt);cout << "Producer" << " send a task : " << cnt-- << endl;sleep(1);}
}void Producer(vector<Thread<blockqueue_t<int>>> &threads, blockqueue_t<int> &bq, int threadnum)
{for (int num = 0; num < threadnum; num++){string name = "producer-" + to_string(num + 1);threads.emplace_back(bq, ProducerCore, name);}
}
4.3 完整代码
#include "Thread.hpp" // Thread.hpp 为封装的线程类型
using namespace MyThread;
#include <iostream>
using namespace std;
#include <vector>
#include <string>
#include <unistd.h>
#include "BlockQueue.hpp"template<class T>
using blockqueue_t = blockqueue<T>;void ProducerCore(blockqueue_t<int>& bq)
{int cnt = 10;while (true){bq.Enqueue(cnt);cout << "Producer" << " send a task : " << cnt-- << endl;sleep(1);}
}void ConsumerCore(blockqueue_t<int>& bq)
{sleep(3);while (true){int data = 0;bq.Pop(&data);cout << "Consumer" << " execute the task : " << data << endl;sleep(1);}
}void Producer(vector<Thread<blockqueue_t<int>>> &threads, blockqueue_t<int> &bq, int threadnum)
{for (int num = 0; num < threadnum; num++){string name = "producer-" + to_string(num + 1);threads.emplace_back(bq, ProducerCore, name);}
}void Consumer(vector<Thread<blockqueue_t<int>>> &threads, blockqueue_t<int> &bq, int threadnum)
{for (int num = 0; num < threadnum; num++){string name = "consumer-" + to_string(num + 1);threads.emplace_back(bq, ConsumerCore, name);}
}void StartAll(vector<Thread<blockqueue_t<int>>> &threads)
{for (auto &thread : threads){thread.Start();}
}void WaitAll(vector<Thread<blockqueue_t<int>>> &threads)
{for (auto &thread : threads){thread.Join();}
}int main()
{blockqueue_t<int> *bq = new blockqueue_t<int>(5);vector<Thread<blockqueue_t<int>>> threads;Producer(threads, *bq, 1);Consumer(threads, *bq, 1);StartAll(threads);WaitAll(threads);return 0;
}