Linux多线程——互斥锁的封装与生产消费模型的实现
文章目录
- 生产者消费者模型
- 互斥锁的封装
- 交互任务
- 阻塞队列
- 测试
生产者消费者模型
生产者消费者模型是一个多线程的应用模型
主要分为两个角色,生产者和消费者,这两个角色又可以用多个线程实际运作
这两者之间需要对生产者生产的产品,进行交互,具体来说就是使用阻塞队列来进行产品的交互
这里的产品可以是很多东西,我们可以认为产品是一个类,这个类可以是一切
在整个生产、消费的过程中,有三种关系
生产者与生产者之间的关系,我们不允许两个生产者共同写,他们之间是互斥关系
消费者与消费者的关系,我们也不允许两个消费者共同取,他们之间也是互斥关系
生产者与消费者之间的关系,不允许一边写一边取,这是互斥关系,并且要求先写后取,这是互斥关系
为了更方便的使用互斥锁,我们把系统提供的互斥锁接口进行封装,并且贯彻一下RAII思想,让我们的使用更加方便
互斥锁的封装
class Mutex // 互斥锁,具体的锁在外部定义,传进来进行调用
{
public:Mutex(pthread_mutex_t *lock): _lock(lock){}~Mutex() {}void Lock(){pthread_mutex_lock(_lock);}void UnLock(){pthread_mutex_unlock(_lock);}private:pthread_mutex_t *_lock;
};class LockGuard
{
public:LockGuard(pthread_mutex_t *lock): _mutex(lock){_mutex.Lock();}~LockGuard(){_mutex.UnLock();}private:Mutex _mutex;
};
这里主要封装了两个类,第一个是封装了一下Mutex,因为那两个函数实在太丑了
第二个类是用来实现RAII的,在类声明时自动上锁,在生命周期结束时自动解锁
交互任务
生产者与消费者交互的地点是阻塞队列,而他们交互的内容可以是数据、对象、任务
我们这样定义,生产者负责布置任务,然后通过阻塞队列,传递给消费者,让消费者完成任务
这里的任务可以是很多,例如抢票、数据处理等等
我们设置一个随机计算加减乘除的任务
#pragma once
#include <iostream>
#include <string>
#include <unistd.h>#define DEFAULT_VALUE 0enum
{OK = 0,div_zero,mod_zero,unknow
};const std::string operators = "+-*/%___"; // 加减乘除和模拟其他未知操作class Task
{
public:Task(){}Task(int x, int y, char op): data_x(x), data_y(y), Operator(op), result(DEFAULT_VALUE), code(OK){}void Run(){switch (Operator){case '+':result = data_x + data_y;break;case '-':result = data_x - data_y;break;case '*':result = data_x * data_y;break;case '/':if (data_y == 0)code = div_zero;elseresult = data_x / data_y;break;case '%':if (data_y == 0)code = mod_zero;elseresult = data_x % data_y;break;default:code = unknow;break;}}void operator()(){Run();sleep(2);}std::string PrintTask(){return std::to_string(data_x) + Operator + std::to_string(data_y) + "=?";}std::string PrintResult(){return std::to_string(data_x) + Operator + std::to_string(data_y) + "=" + std::to_string(result) + "[" + std::to_string(code) + "]";}~Task(){}private:int data_x;int data_y;char Operator;int result;int code; // 任务完成的怎么样,是否有错误
};
阻塞队列
接下来就是阻塞队列的实现,要注意分别提供给生产者和消费者的接口
#pragma once
#include <iostream>
#include <queue>
#include "LockGuard.hpp"const int TOP = 5;template <class T>
class BlockQueue
{
public:BlockQueue(int top = TOP): _capacity(TOP){}bool IsFull(){return _q.size() == _capacity;}bool IsEmpty(){return _q.size() == 0;}void Push(const T &in){LockGuard lockguard(&_mutex);while (IsFull){pthread_cond_wait(&_p_cond, &_mutex);}_q.push(in);pthread_cond_signal(&_c_cond);}void Pop(T *out){LockGuard lockguard(&_mutex);while (IsEmpty()){pthread_cond_wait(&_c_cond, &_mutex);}*out = _q.front();_q.pop();pthread_cond_signal(&_p_cond);}~BlockQueue(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_p_cond);pthread_cond_destroy(&_c_cond);}
private:std::queue<T> _q;int _capacity; // 阻塞队列上限pthread_mutex_t _mutex; // 互斥锁pthread_cond_t _p_cond; // 生产者信号量pthread_cond_t _c_cond; // 消费者信号量
};
测试
最后是测试程序
#pragma once
#include <iostream>
#include <queue>
#include "LockGuard.hpp"const int TOP = 5;template <class T>
class BlockQueue
{
public:BlockQueue(int top = TOP): _capacity(TOP){}bool IsFull(){return _q.size() == _capacity;}bool IsEmpty(){return _q.size() == 0;}void Push(const T &in){LockGuard lockguard(&_mutex);while (IsFull()){pthread_cond_wait(&_p_cond, &_mutex);}_q.push(in);pthread_cond_signal(&_c_cond);}void Pop(T *out){LockGuard lockguard(&_mutex);while (IsEmpty()){pthread_cond_wait(&_c_cond, &_mutex);}*out = _q.front();_q.pop();pthread_cond_signal(&_p_cond);}~BlockQueue(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_p_cond);pthread_cond_destroy(&_c_cond);}
private:std::queue<T> _q;int _capacity; // 阻塞队列上限pthread_mutex_t _mutex; // 互斥锁pthread_cond_t _p_cond; // 生产者信号量pthread_cond_t _c_cond; // 消费者信号量
};