C++ IO多路复用 epoll模型
原文链接:C++ IO多路复用 epoll模型
预备知识
IO控制:fcntl库:IO控制库
多线程:C++ Linux多线程同步通信-信号量
socket:C++ Linux多进程Socket通信
select模型:C++ IO多路复用 select模型
poll模型:C++ IO多路复用 poll模型
epoll模型
特性
原理
epoll是多路IO复用一种相比select和poll更高效的模型,
epoll底层一般是用红黑树和链表等结构实现的,使用红黑树对IO插入查找或删除, 当有活动事件时放到链表中
优点
通过红黑树和链表的结合,epoll 能够在高并发的场景下高效地管理IO. 处理大量连接优于select和poll
缺点
epoll数据结构更复杂,在连接数量少时,在维护epoll本身上的开销较大
库函数<sys/epoll.h>
# include <sys/epoll.h>
#include <errno.h> // epoll实例创建
int epoll_create(int size)size: 预先期望分配的io数,不是必须的,Linux 2.6.8后该参数没有作用return >0(对应epoll实例的描述符fd) -1(错误)// epoll实例创建
int epoll_create1(int flags)flags: 0无标志 EPOLL_CLOEXEC 标志新进程创建时会自动关闭该epoll的fdreturn >0(对应epoll实例的描述符fd) -1(错误)epoll_create和epoll_create1都需要主动close,但是epoll_create1能控制在子进程中自动关闭使用epoll_create1相比epoll_create更安全// epoll实例管理
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);epfd: epoll_create返回的描述符op: EPOLL_CTL_ADD(添加监听IO) EPOLL_CTL_MOD(修改监听IO) EPOLL_CTL_DEL(删除监听IO)fd: 对应的IO fdevent: epoll_event结构体,保存事件和数据消息return 0(成功) -1(错误)
// event结构体
struct epoll_event {__uint32_t events; /* Epoll 事件 */epoll_data_t data; /* 用户数据 */
};// events事件参数:EPOLLIN 读事件EPOLLPRI 紧急数据EPOLLOUT 写事件EPOLLHUP IO关闭EPOLLET 边缘触发模式,事件状态变化时才会提醒,后续不提醒,但是可以后续处理EPOLLONESHOT 更极端的模式,事件只会通知一次(需要ctl重新注册添加),当前epoll轮次未处理,下轮不会返回EPOLLRDHUP IO半关闭,该IO不会再发送数据,但是还能给其写数据(TCP的半关闭状态)EPOLLERR IO错误// 用户数据
typedef union epoll_data
{void* ptr; //指定与fd相关的用户数据int fd; //指定事件所从属的目标文件描述符uint32_t u32;uint64_t u64;
} epoll_data_t;用户数据在复杂的通信设计里面很有效,例如作为一个指针,指向存放该IO的相关信息如IDint epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout); // epoll_wait会将活动事件放在前面,可以顺序处理epfd: create的fdevents: 注册事件的数组maxevents: 最大监听事件数量timeout: -1(阻塞) 0(非阻塞) >0(时间上限)return >0(事件数) 0(超时) -1(错误)
实例: epoll实现一个并发测试服务器: 测试(IO复用+线程池)Reactor模型的性能
通信数据结构设计
在这种高并发响应场景中,消息的类型实际上会影响实际处理程序. 为了简化项目,我们统一数据大小16B,包含一个long型和double型数据.
测试端 开m个进程,每个线程池一直发送n次+1数据,最终得到两个和.m(1+n)n/2,服务器完成所有请求后销毁线程池,并验证结构是否正确.
struct MessageBodyData{ //16Blong c1;double c2;
}
线程池
参考 C++实现一个线程池
程序
感觉写的很垃圾,没体现epoll优势,只是在使用线程池,不得不说线程池是真好用.
server.cpp: 使用线程池实现一个echo服务器,测试服务器响应能力和稳定性
// threadpools
#include<thread>
#include<functional>
#include<future>
#include<condition_variable>
#include<memory>
#include<iostream>
#include<mutex>
#include<atomic>
#include<vector>
#include<queue>
#include<type_traits>
#include<cstring>#include<sys/epoll.h>
#include<sys/socket.h>
#include<sys/socket.h>
#include<arpa/inet.h>
#include<climits>
#include<unistd.h>#include<chrono>#define N_REQUESTS 10000
#define M_PROCESSES 1
#define MAX_CONNS 1
#define TOTAL_REQUESES N_REQUESTS*M_PROCESSESstruct MessageData{ //16Blong c1;double c2;
};long LongSum=0;
double DoubleSum=0;bool finish=false;class ThreadPools{
public:ThreadPools(int n_threads): n_threads(n_threads),stop(false){for(int i=0;i<n_threads;i++){workers.emplace_back([this](){while(true){std::packaged_task<void()> task;{std::unique_lock<std::mutex> lock(this->mtx);this->cv.wait(lock,[this](){return this->stop || !this->tasks.empty();});if(this->stop && this->tasks.empty()) return ;task=std::move(this->tasks.front());this->tasks.pop();task();}//std::cout<<"run a task, "<<"current tasks size="<<tasks.size()<<"\n";}});}}~ThreadPools(){{std::unique_lock<std::mutex> lock(mtx);stop=true;}cv.notify_all();for(std::thread &worker : workers){worker.join();}//std::cout<<"thread pools terminated\n";}template<typename F>auto enqueue(F&& f)->std::future<typename std::result_of<F()>::type>;private:int n_threads;std::atomic<bool> stop;std::vector<std::thread> workers;std::queue<std::packaged_task<void()>> tasks;std::condition_variable cv;std::mutex mtx;
};template<typename F>
auto ThreadPools::enqueue(F&& f)->std::future<typename std::result_of<F()>::type>{using return_type=typename std::result_of<F()>::type;auto task=std::make_shared<std::packaged_task<return_type()>>(std::forward<F>(f));std::future<return_type> res=task->get_future();{std::unique_lock<std::mutex> lock(mtx);if(stop){throw std::runtime_error("enqueue on stopped ThreadPool");}tasks.emplace([task](){(*task)();});}cv.notify_one();//std::cout<<"push a task, "<<"current tasks size="<<tasks.size()<<"\n";return res;
}int main(){int epoll_fd=epoll_create1(EPOLL_CLOEXEC);if(epoll_fd==-1){close(epoll_fd);perror("epoll_create1");}std::string server_ip="127.0.0.1";uint16_t server_port=8001;int server_fd=socket(AF_INET,SOCK_STREAM,0);if(server_fd==-1){close(epoll_fd);close(server_fd);perror("socket");exit(EXIT_FAILURE);}struct sockaddr_in server_addr;server_addr.sin_family=AF_INET;server_addr.sin_port=htons(server_port);inet_pton(AF_INET,server_ip.c_str(),(struct sockaddr*)&server_addr.sin_addr.s_addr);if(bind(server_fd,(const struct sockaddr*)&server_addr,sizeof(server_addr))==-1){close(epoll_fd);close(server_fd);perror("bind");exit(EXIT_FAILURE);}if(listen(server_fd,10)==-1){close(epoll_fd);close(server_fd);perror("listen");exit(EXIT_FAILURE);}ThreadPools tp(5);struct epoll_event epoll_fds[MAX_CONNS+1];struct epoll_event ev;ev.events=EPOLLIN|EPOLLPRI;ev.data.fd=server_fd;if(epoll_ctl(epoll_fd,EPOLL_CTL_ADD,server_fd,&ev)==-1){close(epoll_fd);close(server_fd);perror("epoll_ctl");exit(EXIT_FAILURE);}int cnt=0;auto start = std::chrono::high_resolution_clock::now();while(!finish){if(std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now()-start).count()>1000){perror("test out off 1000ms, terminating");finish=true;}if(cnt>=TOTAL_REQUESES){finish=true;}int nevents=epoll_wait(epoll_fd,epoll_fds,MAX_CONNS+1,0);for(int i=0;i<nevents;i++){// 新连接if(epoll_fds[i].data.fd==server_fd){int new_fd=accept(server_fd,nullptr,nullptr);if(new_fd==-1){perror("accept");}else{ev.events = EPOLLIN; //只读不写ev.data.fd = new_fd;epoll_ctl(epoll_fd, EPOLL_CTL_ADD, new_fd, &ev);}}else{struct MessageData msg;ssize_t size=recv(epoll_fds[i].data.fd,&msg,sizeof(msg),0);if(size<=0){perror("recv");}else{cnt++;auto lam=std::bind([](struct MessageData msg){LongSum+=msg.c1;DoubleSum+=msg.c2; },msg);tp.enqueue(lam);}}}}std::cout<<"get result:"<<"LongSum="<<LongSum<<" DoubleSum="<<DoubleSum<<"\t(correct="<<M_PROCESSES*N_REQUESTS*(N_REQUESTS+1)/2<<")\n";
}
client.cpp: 客户端:20个线程循环发送10000条消息请求.
// threadpools
#include<thread>
#include<functional>
#include<future>
#include<condition_variable>
#include<memory>
#include<iostream>
#include<mutex>
#include<atomic>
#include<vector>
#include<queue>
#include<type_traits>#include<sys/socket.h>
#include<arpa/inet.h>
#include<climits>
#include<unistd.h>
#include<fcntl.h>
#include<chrono>#define N_REQUESTS 10000
#define M_PROCESSES 1struct MessageData{ //16Blong c1;double c2;
};class ThreadPools{
public:ThreadPools(int n_threads): n_threads(n_threads),stop(false){for(int i=0;i<n_threads;i++){workers.emplace_back([this](){while(true){std::packaged_task<void()> task;{std::unique_lock<std::mutex> lock(this->mtx);this->cv.wait(lock,[this](){return this->stop || !this->tasks.empty();});if(this->stop && this->tasks.empty()) return ;task=std::move(this->tasks.front());this->tasks.pop();task();}//std::cout<<"run a task, "<<"current tasks size="<<tasks.size()<<"\n";}});}}~ThreadPools(){{std::unique_lock<std::mutex> lock(mtx);stop=true;}cv.notify_all();for(std::thread &worker : workers){worker.join();}//std::cout<<"thread pools terminated\n";}template<typename F>auto enqueue(F&& f)->std::future<typename std::result_of<F()>::type>;private:int n_threads;std::atomic<bool> stop;std::vector<std::thread> workers;std::queue<std::packaged_task<void()>> tasks;std::condition_variable cv;std::mutex mtx;
};template<typename F>
auto ThreadPools::enqueue(F&& f)->std::future<typename std::result_of<F()>::type>{using return_type=typename std::result_of<F()>::type;auto task=std::make_shared<std::packaged_task<return_type()>>(std::forward<F>(f));std::future<return_type> res=task->get_future();{std::unique_lock<std::mutex> lock(mtx);if(stop){throw std::runtime_error("enqueue on stopped ThreadPool");}tasks.emplace([task](){(*task)();});}cv.notify_one();//std::cout<<"push a task, "<<"current tasks size="<<tasks.size()<<"\n";return res;
}int main(){std::string server_ip="127.0.0.1";uint16_t server_port=8001;int server_fd=socket(AF_INET,SOCK_STREAM,0);if(server_fd==-1){close(server_fd);perror("socket");exit(EXIT_FAILURE);}struct sockaddr_in server_addr;server_addr.sin_family=AF_INET;server_addr.sin_port=htons(server_port);inet_pton(AF_INET,server_ip.c_str(),(struct sockaddr*)&server_addr.sin_addr.s_addr);if(connect(server_fd,(sockaddr*)&server_addr,sizeof(server_addr))==-1){perror("connect");exit(EXIT_FAILURE);}int flags = fcntl(server_fd, F_GETFL, 0);fcntl(server_fd, F_SETFL, flags | O_NONBLOCK);ThreadPools tp(20);int cnt=0;struct MessageData msg;for(int i=0;i<N_REQUESTS;i++){msg.c1=i;msg.c2=i;auto req=std::bind([](int i,int fd,int& cnt,struct MessageData msg){ssize_t size=send(fd,&msg,sizeof(msg),0);if(size<=0) cnt++;},i,server_fd,std::ref(cnt),msg);tp.enqueue(req);}std::cout<<"send finish, error counts="<<cnt<<"\n";
}