当前位置: 首页 > news >正文

C++ IO多路复用 epoll模型

原文链接:C++ IO多路复用 epoll模型

预备知识

IO控制:fcntl库:IO控制库

多线程:C++ Linux多线程同步通信-信号量

socket:C++ Linux多进程Socket通信

select模型:C++ IO多路复用 select模型

poll模型:C++ IO多路复用 poll模型

epoll模型

特性
原理

epoll是多路IO复用一种相比select和poll更高效的模型,

epoll底层一般是用红黑树和链表等结构实现的,使用红黑树对IO插入查找或删除, 当有活动事件时放到链表中

优点

通过红黑树和链表的结合,epoll 能够在高并发的场景下高效地管理IO. 处理大量连接优于select和poll

缺点

epoll数据结构更复杂,在连接数量少时,在维护epoll本身上的开销较大

库函数<sys/epoll.h>
# include <sys/epoll.h>
#include <errno.h> // epoll实例创建
int epoll_create(int size)size: 预先期望分配的io数,不是必须的,Linux 2.6.8后该参数没有作用return >0(对应epoll实例的描述符fd) -1(错误)// epoll实例创建
int epoll_create1(int flags)flags: 0无标志 EPOLL_CLOEXEC 标志新进程创建时会自动关闭该epoll的fdreturn >0(对应epoll实例的描述符fd) -1(错误)epoll_create和epoll_create1都需要主动close,但是epoll_create1能控制在子进程中自动关闭使用epoll_create1相比epoll_create更安全// epoll实例管理
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);epfd: epoll_create返回的描述符op: EPOLL_CTL_ADD(添加监听IO) EPOLL_CTL_MOD(修改监听IO) EPOLL_CTL_DEL(删除监听IO)fd: 对应的IO fdevent: epoll_event结构体,保存事件和数据消息return 0(成功) -1(错误)
// event结构体
struct epoll_event {__uint32_t events;  /* Epoll 事件 */epoll_data_t data;  /* 用户数据 */
};// events事件参数:EPOLLIN 读事件EPOLLPRI 紧急数据EPOLLOUT 写事件EPOLLHUP IO关闭EPOLLET 边缘触发模式,事件状态变化时才会提醒,后续不提醒,但是可以后续处理EPOLLONESHOT 更极端的模式,事件只会通知一次(需要ctl重新注册添加),当前epoll轮次未处理,下轮不会返回EPOLLRDHUP IO半关闭,该IO不会再发送数据,但是还能给其写数据(TCP的半关闭状态)EPOLLERR IO错误// 用户数据
typedef union epoll_data
{void* ptr;              //指定与fd相关的用户数据int fd;                 //指定事件所从属的目标文件描述符uint32_t u32;uint64_t u64;
} epoll_data_t;用户数据在复杂的通信设计里面很有效,例如作为一个指针,指向存放该IO的相关信息如IDint epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout); // epoll_wait会将活动事件放在前面,可以顺序处理epfd: create的fdevents: 注册事件的数组maxevents: 最大监听事件数量timeout: -1(阻塞) 0(非阻塞) >0(时间上限)return >0(事件数) 0(超时) -1(错误)
实例: epoll实现一个并发测试服务器: 测试(IO复用+线程池)Reactor模型的性能
通信数据结构设计

在这种高并发响应场景中,消息的类型实际上会影响实际处理程序. 为了简化项目,我们统一数据大小16B,包含一个long型和double型数据.
测试端 开m个进程,每个线程池一直发送n次+1数据,最终得到两个和.m(1+n)n/2,服务器完成所有请求后销毁线程池,并验证结构是否正确.

struct MessageBodyData{ //16Blong c1;double c2;
}
线程池

参考 C++实现一个线程池

程序

感觉写的很垃圾,没体现epoll优势,只是在使用线程池,不得不说线程池是真好用.

server.cpp: 使用线程池实现一个echo服务器,测试服务器响应能力和稳定性
// threadpools
#include<thread>
#include<functional>
#include<future>
#include<condition_variable>
#include<memory>
#include<iostream>
#include<mutex>
#include<atomic>
#include<vector>
#include<queue>
#include<type_traits>
#include<cstring>#include<sys/epoll.h>
#include<sys/socket.h>
#include<sys/socket.h>
#include<arpa/inet.h>
#include<climits>
#include<unistd.h>#include<chrono>#define N_REQUESTS 10000
#define M_PROCESSES 1
#define MAX_CONNS 1
#define TOTAL_REQUESES N_REQUESTS*M_PROCESSESstruct MessageData{ //16Blong c1;double c2;
};long LongSum=0;
double DoubleSum=0;bool finish=false;class ThreadPools{
public:ThreadPools(int n_threads): n_threads(n_threads),stop(false){for(int i=0;i<n_threads;i++){workers.emplace_back([this](){while(true){std::packaged_task<void()> task;{std::unique_lock<std::mutex> lock(this->mtx);this->cv.wait(lock,[this](){return this->stop || !this->tasks.empty();});if(this->stop && this->tasks.empty()) return ;task=std::move(this->tasks.front());this->tasks.pop();task();}//std::cout<<"run a task, "<<"current tasks size="<<tasks.size()<<"\n";}});}}~ThreadPools(){{std::unique_lock<std::mutex> lock(mtx);stop=true;}cv.notify_all();for(std::thread &worker : workers){worker.join();}//std::cout<<"thread pools terminated\n";}template<typename F>auto enqueue(F&& f)->std::future<typename std::result_of<F()>::type>;private:int n_threads;std::atomic<bool> stop;std::vector<std::thread> workers;std::queue<std::packaged_task<void()>> tasks;std::condition_variable cv;std::mutex mtx;
};template<typename F>
auto ThreadPools::enqueue(F&& f)->std::future<typename std::result_of<F()>::type>{using return_type=typename std::result_of<F()>::type;auto task=std::make_shared<std::packaged_task<return_type()>>(std::forward<F>(f));std::future<return_type> res=task->get_future();{std::unique_lock<std::mutex> lock(mtx);if(stop){throw std::runtime_error("enqueue on stopped ThreadPool");}tasks.emplace([task](){(*task)();});}cv.notify_one();//std::cout<<"push a task, "<<"current tasks size="<<tasks.size()<<"\n";return res;
}int main(){int epoll_fd=epoll_create1(EPOLL_CLOEXEC);if(epoll_fd==-1){close(epoll_fd);perror("epoll_create1");}std::string server_ip="127.0.0.1";uint16_t server_port=8001;int server_fd=socket(AF_INET,SOCK_STREAM,0);if(server_fd==-1){close(epoll_fd);close(server_fd);perror("socket");exit(EXIT_FAILURE);}struct sockaddr_in server_addr;server_addr.sin_family=AF_INET;server_addr.sin_port=htons(server_port);inet_pton(AF_INET,server_ip.c_str(),(struct sockaddr*)&server_addr.sin_addr.s_addr);if(bind(server_fd,(const struct sockaddr*)&server_addr,sizeof(server_addr))==-1){close(epoll_fd);close(server_fd);perror("bind");exit(EXIT_FAILURE);}if(listen(server_fd,10)==-1){close(epoll_fd);close(server_fd);perror("listen");exit(EXIT_FAILURE);}ThreadPools tp(5);struct epoll_event epoll_fds[MAX_CONNS+1];struct epoll_event ev;ev.events=EPOLLIN|EPOLLPRI;ev.data.fd=server_fd;if(epoll_ctl(epoll_fd,EPOLL_CTL_ADD,server_fd,&ev)==-1){close(epoll_fd);close(server_fd);perror("epoll_ctl");exit(EXIT_FAILURE);}int cnt=0;auto start = std::chrono::high_resolution_clock::now();while(!finish){if(std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now()-start).count()>1000){perror("test out off 1000ms, terminating");finish=true;}if(cnt>=TOTAL_REQUESES){finish=true;}int nevents=epoll_wait(epoll_fd,epoll_fds,MAX_CONNS+1,0);for(int i=0;i<nevents;i++){// 新连接if(epoll_fds[i].data.fd==server_fd){int new_fd=accept(server_fd,nullptr,nullptr);if(new_fd==-1){perror("accept");}else{ev.events = EPOLLIN; //只读不写ev.data.fd = new_fd;epoll_ctl(epoll_fd, EPOLL_CTL_ADD, new_fd, &ev);}}else{struct MessageData msg;ssize_t size=recv(epoll_fds[i].data.fd,&msg,sizeof(msg),0);if(size<=0){perror("recv");}else{cnt++;auto lam=std::bind([](struct MessageData msg){LongSum+=msg.c1;DoubleSum+=msg.c2; },msg);tp.enqueue(lam);}}}}std::cout<<"get result:"<<"LongSum="<<LongSum<<" DoubleSum="<<DoubleSum<<"\t(correct="<<M_PROCESSES*N_REQUESTS*(N_REQUESTS+1)/2<<")\n";
}
client.cpp: 客户端:20个线程循环发送10000条消息请求.
// threadpools
#include<thread>
#include<functional>
#include<future>
#include<condition_variable>
#include<memory>
#include<iostream>
#include<mutex>
#include<atomic>
#include<vector>
#include<queue>
#include<type_traits>#include<sys/socket.h>
#include<arpa/inet.h>
#include<climits>
#include<unistd.h>
#include<fcntl.h>
#include<chrono>#define N_REQUESTS 10000
#define M_PROCESSES 1struct MessageData{ //16Blong c1;double c2;
};class ThreadPools{
public:ThreadPools(int n_threads): n_threads(n_threads),stop(false){for(int i=0;i<n_threads;i++){workers.emplace_back([this](){while(true){std::packaged_task<void()> task;{std::unique_lock<std::mutex> lock(this->mtx);this->cv.wait(lock,[this](){return this->stop || !this->tasks.empty();});if(this->stop && this->tasks.empty()) return ;task=std::move(this->tasks.front());this->tasks.pop();task();}//std::cout<<"run a task, "<<"current tasks size="<<tasks.size()<<"\n";}});}}~ThreadPools(){{std::unique_lock<std::mutex> lock(mtx);stop=true;}cv.notify_all();for(std::thread &worker : workers){worker.join();}//std::cout<<"thread pools terminated\n";}template<typename F>auto enqueue(F&& f)->std::future<typename std::result_of<F()>::type>;private:int n_threads;std::atomic<bool> stop;std::vector<std::thread> workers;std::queue<std::packaged_task<void()>> tasks;std::condition_variable cv;std::mutex mtx;
};template<typename F>
auto ThreadPools::enqueue(F&& f)->std::future<typename std::result_of<F()>::type>{using return_type=typename std::result_of<F()>::type;auto task=std::make_shared<std::packaged_task<return_type()>>(std::forward<F>(f));std::future<return_type> res=task->get_future();{std::unique_lock<std::mutex> lock(mtx);if(stop){throw std::runtime_error("enqueue on stopped ThreadPool");}tasks.emplace([task](){(*task)();});}cv.notify_one();//std::cout<<"push a task, "<<"current tasks size="<<tasks.size()<<"\n";return res;
}int main(){std::string server_ip="127.0.0.1";uint16_t server_port=8001;int server_fd=socket(AF_INET,SOCK_STREAM,0);if(server_fd==-1){close(server_fd);perror("socket");exit(EXIT_FAILURE);}struct sockaddr_in server_addr;server_addr.sin_family=AF_INET;server_addr.sin_port=htons(server_port);inet_pton(AF_INET,server_ip.c_str(),(struct sockaddr*)&server_addr.sin_addr.s_addr);if(connect(server_fd,(sockaddr*)&server_addr,sizeof(server_addr))==-1){perror("connect");exit(EXIT_FAILURE);}int flags = fcntl(server_fd, F_GETFL, 0);fcntl(server_fd, F_SETFL, flags | O_NONBLOCK);ThreadPools tp(20);int cnt=0;struct MessageData msg;for(int i=0;i<N_REQUESTS;i++){msg.c1=i;msg.c2=i;auto req=std::bind([](int i,int fd,int& cnt,struct MessageData msg){ssize_t size=send(fd,&msg,sizeof(msg),0);if(size<=0) cnt++;},i,server_fd,std::ref(cnt),msg);tp.enqueue(req);}std::cout<<"send finish, error counts="<<cnt<<"\n";
}

http://www.mrgr.cn/news/52424.html

相关文章:

  • Qml-Item的构造和显示顺序
  • RISC-V笔记——显式同步
  • [openwrt-21.02]openwrt-21.02 升级nat46软件包操作说明
  • 链式法则 反向传播
  • 牵手App红娘来助力,打造线上交友“好管家”
  • JDK、JRE、JVM相关知识点
  • 【IC设计】复旦微行业分析
  • 再见了 印象笔记!推荐一个非常好用的开源笔记系统,8.6k Start(带私活源码)
  • HashMap常见面试题(超全面):实现原理、扩容机制、链表何时升级为红黑树、死循环
  • 028 elasticsearch索引管理-ElasticsearchRestTemplate
  • 七、【智能体】扣子人设:智能体背后的关键设计,你真的了解吗?
  • 纯css实现瀑布流! 附源码!!!
  • JavaSE——方法引用
  • packaged_task 异步调用函数打包
  • AI驱动的零售未来:打造无缝、智能、个性化的购物新世界
  • Gitea迁移外部代码仓库
  • 嵌入式C语言面试相关知识——常见的四种通信协议:I²C、SPI、USART、CAN;一种数据通信机制:DMA
  • Mamba学习笔记(2)—序列数据处理基础
  • 建筑工程管理软件推荐,2024年最佳选择
  • Linux网络命令:轻量级的、用户友好的、监视每个进程或应用程序网络带宽使用的工具nethogs详解