Redis网络相关的结构体 和 reactor模式

news/2024/5/16 18:27:54

目录

1. epoll的封装

结构体aeApiStae

 创建epoll fd的封装

epoll_ctl的封装

epoll_wait的封装

2. 结构体aeFileEvent、aeFiredEvent、aeTimeEvent

结构体aeFileEvent

结构体aeFiredEvent 

结构体aeTimeEvent

3. struct aeEventLoop

 aeEventLoop相关的函数

1. 创建eventloop

2. 创建aeFileEvent

3. 删除aeFileEvent

4. 开始事件循环

4.什么时候设置回调函数的?

1. 绑定SleepProc

2. 绑定服务端的读事件回调函数

3. 绑定客户端的读事件回调函数

4. 绑定客户端的写事件回调函数

5.绑定时间事件

5. 有了IO多路复用,为什么还需要reactor模式?

IO多路复用与事件驱动

IO同步与异步的判断

reactor模式的优点

6. 通过Redis网络部分,学到如何实现reactor模式


上一章节讲解了Redis的网络交互流程,没有对关于网络部分的结构体有具体的讲解,所以该文章就主要讲解Redis网络部分使用的结构体。

1. epoll的封装

epoll有三个函数调用:

  • epoll_create:创建epoll实例,返回一个epoll fd,即是用于管理待检测的文件描述符的集合。
  • epoll_ctl:管理红黑树实例上的节点,可以进行添加、修改、删除操作。
  • epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout)。等待就绪的fd,并返回就绪的fd,存储在events中,以及个数。

结构体aeApiStae

  • 其内部有变量epfd,即是调用epoll_create创建出来fd
  • *events即是就绪的fd存储的位置(数组)
typedef struct aeApiState {int epfd;struct epoll_event *events;
} aeApiState;

 创建epoll fd的封装

主要流程:

  1. 申请内存空间给结构体aeApieState,申请内存空间给成员变量events。
  2. 使用epoll_create创建出epfd
//现在还没了解到aeEventLoop,可以先不用管aeEventLoop相关的,不影响看代码。
static int aeApiCreate(aeEventLoop *eventLoop) {aeApiState *state = zmalloc(sizeof(aeApiState));if (!state) return -1;state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);if (!state->events) {zfree(state);return -1;}state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */if (state->epfd == -1) {zfree(state->events);zfree(state);return -1;}anetCloexec(state->epfd);eventLoop->apidata = state;return 0;
}//重置aeApiState的events的大小,即是重置存放就绪fd的空间大小
static int aeApiResize(aeEventLoop *eventLoop, int setsize) {aeApiState *state = eventLoop->apidata;state->events = zrealloc(state->events, sizeof(struct epoll_event)*setsize);return 0;
}//释放结构体aeApiState
static void aeApiFree(aeEventLoop *eventLoop) {aeApiState *state = eventLoop->apidata;close(state->epfd);zfree(state->events);zfree(state);
}

epoll_ctl的封装

主要是两个:添加和删除。

  • 添加的就是使用epoll_ctl(epfd,EPOLL_CTL_ADD ,....)
  • 删除时使用epoll_ctl(epfd,EPOLL_CTL_MOD,....)
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {aeApiState *state = eventLoop->apidata;struct epoll_event ee = {0}; /* avoid valgrind warning *//* If the fd was already monitored for some event, we need a MOD* operation. Otherwise we need an ADD operation. */int op = eventLoop->events[fd].mask == AE_NONE ?EPOLL_CTL_ADD : EPOLL_CTL_MOD;ee.events = 0;//得到要关注的事件类型maskmask |= eventLoop->events[fd].mask; /* Merge old events */if (mask & AE_READABLE) ee.events |= EPOLLIN;if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;ee.data.fd = fd;if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;return 0;
}static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask) {aeApiState *state = eventLoop->apidata;struct epoll_event ee = {0}; /* avoid valgrind warning */int mask = eventLoop->events[fd].mask & (~delmask);ee.events = 0;if (mask & AE_READABLE) ee.events |= EPOLLIN;if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;ee.data.fd = fd;if (mask != AE_NONE) {epoll_ctl(state->epfd,EPOLL_CTL_MOD,fd,&ee);} else {/* Note, Kernel < 2.6.9 requires a non null event pointer even for* EPOLL_CTL_DEL. */epoll_ctl(state->epfd,EPOLL_CTL_DEL,fd,&ee);}
}

epoll_wait的封装

即是调用epoll_wait(...)。

static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {aeApiState *state = eventLoop->apidata;int retval, numevents = 0;//eventLoop->setsize就是state->evnets数组的元素个数retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,tvp ? (tvp->tv_sec*1000 + (tvp->tv_usec + 999)/1000) : -1);if (retval > 0) {int j;numevents = retval;for (j = 0; j < numevents; j++) {    //遍历就绪的fdint mask = 0;    //mask就是该fd就绪的事件struct epoll_event *e = state->events+j;if (e->events & EPOLLIN) mask |= AE_READABLE;if (e->events & EPOLLOUT) mask |= AE_WRITABLE;if (e->events & EPOLLERR) mask |= AE_WRITABLE|AE_READABLE;if (e->events & EPOLLHUP) mask |= AE_WRITABLE|AE_READABLE;//把就绪的fd存储到 eventLoop->fired[j]中//可以先不用关注eventLoop,这里就是把就绪的fd存储到另一地方嘛eventLoop->fired[j].fd = e->data.fd;eventLoop->fired[j].mask = mask;}}return numevents;
}

2. 结构体aeFileEvent、aeFiredEvent、aeTimeEvent

结构体aeFileEvent

一个客户端建立连接后,会把该客户端的fd添加到epoll上,并监听其关注的事件。假如当前fd需要监听读事件,那该fd读就绪,那对应的读事件函数就会被调用。

所以,可以把fd关注的事件fd就绪会执行的回调函数封装成一个结构体,Redis叫其是aeFileEvent。

  • mask就是fd关注的事件类型
  • rfileProc是读就绪会执行的回调函数wfileProc就是写就绪会执行的回调函数。这两个都需要后续去定义和注册
  • clientData就是回调函数中的参数
/* File event structure */
typedef struct aeFileEvent {int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */aeFileProc *rfileProc;aeFileProc *wfileProc;void *clientData;
} aeFileEvent;//回调函数的类型
typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask);//前面的形式可能不好看出,用c++11写的话,如下
//using aeFileProc=std::function<void(aeEventLoop* eventLoop,int fd,void* clientData,int mask)>;

 所以,客户端就会有自己的aeFileEvent,即是一个客户端就对应一个aeFileEvent。 当读或写就绪时,就会调用对应的函数

其实,服务端也是对应一个aeFileEvent,因为也要监听服务端的读事件。当客户端来连接,服务端读事件就绪,就会调用服务端的读事件回调函数进行accept。

所以就会有创建aeFileEvent,每个客户端或者一个服务端都会使用函数aeCreateFileEvent。(后续会详讲,先留个印象)

看到这里读者可能会有疑惑,不是说要对某个fd进行注册监听的吗,怎么该结构体没有fd的呢

那是因为aeFileEvent是存储在数组内。数组的下标就是该aeFileEvent对应的fd,也即是客户端对应的fd。(后续从代码中可以看出的)

结构体aeFiredEvent 

如名字一样,其就是就绪事件。一个就绪事件,就肯定是可以从中知道是哪个fd就绪,还有知道是哪种类型事件就绪。

  • fd表示epoll_wait返回的就绪fd
  • mask表示epol_wait返回的事件类型

epoll_wait返回的一个就绪fd就对应一个aeFireEvent。 

aeApiPoll就是把epoll_wait返回的就绪fd和事件类型 逐个赋值给aeFireEvent。

/* A fired event */
typedef struct aeFiredEvent {int fd;int mask;
} aeFiredEvent;//把epoll_wait返回的就绪fd和事件赋值给fired[j]
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {aeApiState *state = eventLoop->apidata;int retval, numevents = 0;//eventLoop->setsize就是state->evnets数组的元素个数retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,tvp ? (tvp->tv_sec*1000 + (tvp->tv_usec + 999)/1000) : -1);if (retval > 0) {numevents = retval;for (int j = 0; j < numevents; j++) {    //遍历就绪的fdint mask = 0;    //mask就是该fd就绪的事件struct epoll_event *e = state->events+j;if (e->events & EPOLLIN) mask |= AE_READABLE;if (e->events & EPOLLOUT) mask |= AE_WRITABLE;if (e->events & EPOLLERR) mask |= AE_WRITABLE|AE_READABLE;if (e->events & EPOLLHUP) mask |= AE_WRITABLE|AE_READABLE;//把就绪的fd存储到 eventLoop->fired[j]中//可以先不用关注eventLoop,这里就是把就绪的fd存储到另一地方嘛eventLoop->fired[j].fd = e->data.fd;eventLoop->fired[j].mask = mask;}}return numevents;
}

结构体aeTimeEvent

其是时间事件,可以认为是个定时器,定时会执行给定的函数。

从代码可以看到其是个双向链表。 

  • 每个时间事件都有一个事件id,aeEventLoop中的timeEventNextId是下一个要注册的时间事件id。
  • when_sec、when_ms是时间事件(定时器任务)的发生时间
  • timeProc是时间事件的处理回调函数。说明:aeTimeProc 需要返回一个 int 值,代表下次该超时事件触发的时间间隔。如果返回 - 1,则说明超时时间不需要再触发了,标记为删除即可
  • finalizerProc是时间事件要删除时的处理函数
//时间事件结构
typedef struct aeTimeEvent {long long id; //时间事件的唯一标识符,自增//事件的到达时间(即是执行时间)long when_sec; /* seconds */long when_ms; /* milliseconds *///事件处理函数 (到期执行的回调函数)aeTimeProc *timeProc;//事件释放函数 (回调函数)aeEventFinalizerProc *finalizerProc;void *clientData;               //多路复用库的私有数据//双向链表struct aeTimeEvent *prev;   struct aeTimeEvent *next;int refcount;     //以防止计时器事件在递归时间事件调用中释放
} aeTimeEvent;//时间事件回调函数类型
typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData);//删除定时事件的回调函数类型
typedef void aeEventFinalizerProc(struct aeEventLoop *eventLoop, void *clientData);

3. struct aeEventLoop

EventLoop是个事件循环,其主要功能就是实现 while(1){ epoll_wait(....) }。所以说是个事件循环,这也对epoll再进一步封装。

aeEventLoop是Reactor模型的具体抽象,把网络读写事件时间事件(定时器任务)可以统一处理。

//ae.h
// 事件循环
typedef struct aeEventLoop {int maxfd;       //已经注册的文件描述符的最大值int setsize;     //setsize是能注册的fd的最大值(epoll_wait函数中的参数maxevents)long long timeEventNextId;    //下一个要注册的时间事件idtime_t lastTime;      //最后一次执行时间事件的时间aeFileEvent *events;     //是数组,已注册的文件事件 (就是IO event)aeFiredEvent *fired;     //数组,已就绪的文件事件aeTimeEvent *timeEventHead;    //定时器链表的头结点(头结点不一定是最早超时的任务)int stop;    //eventLoop的开关void *apidata; //具体的IO多路复用实现,即是前面讲的结构体aeApiState变量。aeBeforeSleepProc *beforesleep;//在处理事件前要执行的回调函数(即是在执行epoll_wait()之前)aeBeforeSleepProc *aftersleep;//在处理事件后要执行的回调函数(即是在执行epoll_wait()之后)int flags;          //设置的标识位
} aeEventLoop;//进入循环等待之前的回调函数类型
typedef void aeBeforeSleepProc(struct aeEventLoop *eventLoop);

从上面的注释知道 aeEventLoop的成员变量的用处。所以这里就只重点讲解两个变量*events 和 *fired

  • *event就是个数组,setsize是数组的元素个数。events的元素是aeFileEvent。前面说了一个客户端对应一个aeFileEvent。所以,events就是存储需要被监听的客户端。events数组下标是aeFileEvent中的fd。
  • fired数组是读写已就绪的 网络事件 数组。数组元素是记录了就绪的fd及其epoll_wait返回的事件类型

 aeEventLoop相关的函数

1. 创建eventloop

aeEventLoop *aeCreateEventLoop(int setsize) {aeEventLoop *eventLoop;int i;//分配内存给eventLoopeventLoop = zmalloc(sizeof(*eventLoop))//分配内存给eventLoop中的events 和 fired 数组eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);//设置其一些成员变量eventLoop->setsize = setsize;eventLoop->timeEventHead = NULL;eventLoop->timeEventNextId = 0;eventLoop->stop = 0;eventLoop->maxfd = -1;eventLoop->beforesleep = NULL;eventLoop->aftersleep = NULL;eventLoop->flags = 0;//创建epoll fd,即是调用epoll_create, 内部把创建出来的apiState赋值给eventLoop->apidataaeApiCreate(eventLoop)//当前每个aeFileEvent都不关注任何事件for (i = 0; i < setsize; i++)eventLoop->events[i].mask = AE_NONE;return eventLoop;//省略了些内存申请失败的处理................
}

2. 创建aeFileEvent

内部会调用aeApiAddEvent

int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,aeFileProc *proc, void *clientData)
{if (fd >= eventLoop->setsize) {    //判断客户端fd是否大于events数组元素个数errno = ERANGE;return AE_ERR;        //若是大于,返回错误}//从events数组中获取一个元素aeFileEvent *fe = &eventLoop->events[fd]; //从这可以看出events的下标就是客户端的fdif (aeApiAddEvent(eventLoop, fd, mask) == -1)    //添加fd到epoll上,并关注事件maskreturn AE_ERR;fe->mask |= mask;    //设置结构体aeFileEvent关注的事件//设置回调函数if (mask & AE_READABLE) fe->rfileProc = proc;if (mask & AE_WRITABLE) fe->wfileProc = proc;fe->clientData = clientData;    //设置回调函数的参数//更新已监听的fd的最大值if (fd > eventLoop->maxfd)eventLoop->maxfd = fd;return AE_OK;
}//创建时间事件,即是定时器
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,aeTimeProc *proc, void *clientData,aeEventFinalizerProc *finalizerProc)
{long long id = eventLoop->timeEventNextId++;aeTimeEvent *te;te = zmalloc(sizeof(*te));if (te == NULL) return AE_ERR;te->id = id;te->when = getMonotonicUs() + milliseconds * 1000;//设置定时器的回调函数te->timeProc = proc;te->finalizerProc = finalizerProc;te->clientData = clientData;    //设置回调函数的参数te->prev = NULL;te->next = eventLoop->timeEventHead;    //从这句代码可以看出其是往头部插入的te->refcount = 0;if (te->next)te->next->prev = te;eventLoop->timeEventHead = te;    //更新链表头部为新插入的节点return id;
}

3. 删除aeFileEvent

通过传入的fd找到evnets数组中的元素,之后调用aeApiDelEvent进行删除。之后吧该位置的

void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask)
{if (fd >= eventLoop->setsize) return;//通过传入的fd找到events数组中的元素aeFileEvent *fe = &eventLoop->events[fd];if (fe->mask == AE_NONE) return;/* We want to always remove AE_BARRIER if set when AE_WRITABLE* is removed. */if (mask & AE_WRITABLE) mask |= AE_BARRIER;aeApiDelEvent(eventLoop, fd, mask);    //进行删除fe->mask = fe->mask & (~mask);    //取消监听mask事件if (fd == eventLoop->maxfd && fe->mask == AE_NONE) {/* Update the max fd */    int j;for (j = eventLoop->maxfd-1; j >= 0; j--)if (eventLoop->events[j].mask != AE_NONE) break;eventLoop->maxfd = j;    //更新已注册的fd的最大值}
}

4. 开始事件循环

aeMain就是一个while()循环,循环内部是aeProcessEvents函数。

void aeMain(aeEventLoop *eventLoop) {eventLoop->stop = 0;while (!eventLoop->stop) {aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_BEFORE_SLEEP|AE_CALL_AFTER_SLEEP);}
}

aeProcessEvents的主要步骤:

回调函数beforsleep和aftersleep可以先省略不关注的。因为目前其还没有用途,先知道有这两个回调函数就行。

  • 计算epoll_wait()需要的阻塞时间。
  • 执行beforsleep
  • epoll_wait等待事件发生。
  • 处理发生的IO事件,根据发生的事件类型来调用对应的回调函数:
    • 若是AE_READABLE类型调用rfileProc
    • 若是AE_WRITABLE类型调用wfileProc
  • 处理时间事件。若该时间事件是周期性的,执行完后会再添加到时间事件链表的。
//为了能更好理解读懂,该函数只展示了主体流程,有些细节是没有展示
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{int processed = 0, numevents;if (eventLoop->maxfd != -1 || ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))){struct timeval tv, *tvp;//省略了epoll_wait函数参数timeout的计算过程,其不是框架的重点,后续可以再详细了解...............................//执行befroesleep回调函数,这是在初始化server时绑定的,可以先不关注if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)eventLoop->beforesleep(eventLoop);//即是调用epoll_wait, 等待就绪的fdnumevents = aeApiPoll(eventLoop, tvp);...................//遍历执行已就绪的fd的回调函数for (int j = 0; j < numevents; j++) {//在aeApiPoll中把就绪的fd和事件赋值给了fired[j]aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];//得到就绪的fd和事件类型int mask = eventLoop->fired[j].mask;int fd = eventLoop->fired[j].fd;int fired = 0; /* Number of events fired for current fd. *///下面就执行对应的回调函数if (!invert && fe->mask & mask & AE_READABLE) {  //读就绪,执行读回调函数fe->rfileProc(eventLoop,fd,fe->clientData,mask);fired++;fe = &eventLoop->events[fd]; /* Refresh in case of resize. */}if (fe->mask & mask & AE_WRITABLE) {    //写就绪,执行写回调函数if (!fired || fe->wfileProc != fe->rfileProc) {fe->wfileProc(eventLoop,fd,fe->clientData,mask);fired++;}}......................processed++;}}if (flags & AE_TIME_EVENTS)    //flags有需要,就执行时间事件,即是执行定时器任务processed += processTimeEvents(eventLoop);return processed; /* return the number of processed file/time events */
}

4.什么时候设置回调函数的?

看aeProcessEvents时候可能会有疑惑,调用fe->rfileProc,但是都不知道该函数是什么内容的。

因为这是通过绑定的。就是把一个函数赋值给fe->rfileProc。

从main()入手。

//server.c
int main(int argc, char **argv) {.......................initServer();
}//initServer函数中就有绑定回调函数的
void initServer(void) {//省略了很多其他部分的内容//socket(...),bind(...),listen(...),创建服务端的流程................................................................aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL);/* Create an event handler for accepting new connections in TCP  sockets. */createSocketAcceptHandler(&server.ipfd, acceptTcpHandler)/* Register before and after sleep handlers (note this needs to be done* before loading persistence since it is used by processEventsWhileBlocked. */aeSetBeforeSleepProc(server.el,beforeSleep);aeSetAfterSleepProc(server.el,afterSleep);................................
}

1. 绑定SleepProc

void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep) {eventLoop->beforesleep = beforesleep;
}void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *aftersleep) {eventLoop->aftersleep = aftersleep;
}

在服务器初始化时候就绑定了这两个函数,那么在运行aeProcessEvents时候,就会调用函数beforesleepaftersleep

2. 绑定服务端的读事件回调函数

就是在createSocketAcceptHandler,其就是调用aeCreateFileEvent。sfd->fd[j]就是服务端fd,AE_READABLE就是需要监听的事件,即是监听服务端的读事件。回调函数是accept_handler,即是把accept_handler赋值给fe->rfileProc。

从函数aeProcessEvents,会返回就绪的fd和就绪的事件类型。当服务端fd返回的时候,是读事件就绪,就会调用fe->rfileProc而这时rfileProc就是accpet_handler,即是会调用accpet_handler。

//sfd是个数组,我们看源码的话,目前就当其是一个元素的即可,就不用使用for
//就把这个函数当做就是使用aeCreateFileEvent即可。
int createSocketAcceptHandler(socketFds *sfd, aeFileProc *accept_handler) {for (int j = 0; j < sfd->count; j++) {aeCreateFileEvent(server.el, sfd->fd[j], AE_READABLE, accept_handler,NULL)}//错误处理没有展示return C_OK;
}int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,aeFileProc *proc, void *clientData)
{................fe->mask |= mask;if (mask & AE_READABLE) fe->rfileProc = proc;if (mask & AE_WRITABLE) fe->wfileProc = proc;fe->clientData = clientData;.....................
}

回调函数acceptTcpHandler很明显会调用accpet去对客户建立连接的。

3. 绑定客户端的读事件回调函数

继续跟着函数acceptTcpHandler。

void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {int cport, cfd, max = MAX_ACCEPTS_PER_CALL;while(max--) {//调用acceptcfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);............................acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);}
}static void acceptCommonHandler(connection *conn, int flags, char *ip) {................./* Create connection and client */client *c = createClient(conn))/* Last chance to keep flags */c->flags |= flags;
}client *createClient(connection *conn) {client *c = zmalloc(sizeof(client));if (conn) {connSetReadHandler(conn, readQueryFromClient);    //设置客户端的读回调函数connSetPrivateData(conn, c);}...................if (conn) linkClient(c);    //把该客户端添加到服务器server.client链表中保存
}static inline int connSetReadHandler(connection *conn, ConnectionCallbackFunc func) {return conn->type->set_read_handler(conn, func);
}

set_read_handler也是个回调函数,其设置成了是函数connSocketSetReadHandler。

这里set_read_handler也是个回调函数是因为有两种类型的connection,Redis是把一个客户端封装成一个connnection(该结构体后续会讲解)

//当前就认为conn->type->ae_handler是参数func即可,内部有很多兜兜转转
static int connSocketSetReadHandler(connection *conn, ConnectionCallbackFunc func) {if (func == conn->read_handler) return C_OK;conn->read_handler = func;if (!conn->read_handler)aeDeleteFileEvent(server.el,conn->fd,AE_READABLE);elseif (aeCreateFileEvent(server.el,conn->fd,AE_READABLE,conn->type->ae_handler,conn) == AE_ERR) return C_ERR;return C_OK;
}

这样就把readQueryFromClient设置给了fe->rfileProc。当epoll_wait返回就绪的fd和事件,若该fd是客户端,也是读事件,那就会执行fe->rfileProc,即是执行readQueryFromClient

4. 绑定客户端的写事件回调函数

这个是在绑定的beforeSleep函数中。

void beforeSleep(struct aeEventLoop *eventLoop) {........................../* Handle writes with pending output buffers. */handleClientsWithPendingWritesUsingThreads();.............................
}int handleClientsWithPendingWritesUsingThreads(void) {...................listRewind(server.clients_pending_write,&li);while((ln = listNext(&li))) {client *c = listNodeValue(ln);//如果缓冲区中还有回复客户端的数据,就需要设置写回调函数,当epoll_wait返回客户端fd的写事件时候,就会执行sendReplyToClientif (clientHasPendingReplies(c) &&connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR){}}......................
}static inline int connSetWriteHandler(connection *conn, ConnectionCallbackFunc func) {return conn->type->set_write_handler(conn, func, 0);
}

和set_read_handler一样,set_write_handler也是后面设置的。该函数是connSocketSetWriteHandler。

static int connSocketSetWriteHandler(connection *conn, ConnectionCallbackFunc func, int barrier) {if (func == conn->write_handler) return C_OK;conn->write_handler = func;if (barrier)conn->flags |= CONN_FLAG_WRITE_BARRIER;elseconn->flags &= ~CONN_FLAG_WRITE_BARRIER;if (!conn->write_handler)aeDeleteFileEvent(server.el,conn->fd,AE_WRITABLE);elseif (aeCreateFileEvent(server.el,conn->fd,AE_WRITABLE,conn->type->ae_handler,conn) == AE_ERR) return C_ERR;return C_OK;
}

这样就把 sendReplyToClient设置给了fe->wfileProc。当epoll_wait返回客户端fd的写事件时候,就会调用fe->wfileProc,即是执行sendReplyToClient。

5.绑定时间事件

使用aeCreateTimeEvent来绑定时间事件的回调函数。在aeProcessEvents函数内部会调用processTimeEvents去执行定时器任务。

long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,aeTimeProc *proc, void *clientData,aeEventFinalizerProc *finalizerProc)
{long long id = eventLoop->timeEventNextId++;aeTimeEvent *te;te = zmalloc(sizeof(*te));if (te == NULL) return AE_ERR;te->id = id;te->when = getMonotonicUs() + milliseconds * 1000;//设置事件的回调函数te->timeProc = proc;te->finalizerProc = finalizerProc;te->clientData = clientData;te->prev = NULL;te->next = eventLoop->timeEventHead;te->refcount = 0;if (te->next)te->next->prev = te;eventLoop->timeEventHead = te;return id;
}

5. 有了IO多路复用,为什么还需要reactor模式?

IO多路复用与事件驱动

首先要明确一点,reactor模式就是基于IO多路复用的。事件驱动也是IO多路复用的,不是说使用了reactor模式才是使用了事件驱动。

以事件为连接点,当有IO事件准备就绪时,就会通知用户,并且告知用户返回的是什么类型的事件,进而用户可以执行相对应的任务。这样就不用在IO等待上浪费资源,这便是事件驱动的核心思想。

比如你点了两份外卖,外卖A,外卖B。之后你无需时刻打电话去问外卖到了没。外卖到的时候,外卖员会打电话通知你。这中途你就可以做自己的事,不用纯纯等待。还有可以知道是外卖A到了还是外卖B到了,外卖员会告知是哪个外卖到的。

这个就是事件驱动。IO事件准备就绪时,会自动通知用户,并会告知其事件类型。

所以应该是,IO多路复用 + 回调机制 构成了 reactor模式

IO同步与异步的判断

还有reactor模式是同步的。因为其是使用IO多路复用的,而IO多路复用是同步的。

IO操作是同步还是异步,关键看数据在内核空间与用户空间的拷贝过程(数据读写的IO操作)

reactor模式的优点

网上很多说其可以很好处理高并发,但是我觉得IO多路复用也可以处理的。

还有说可扩展性,通过增加Reactor实例个数来充分利用CPU资源;那通过在其他线程再创建epoll也行的。

所以,我觉得一个很大的优势是:

  • 应用处理请求的逻辑,与事件分发框架完全分离,即是解耦,有很高的复用性

即写应用处理时,只需关注如何处理业务逻辑。Reactor框架本身与具体事件处理逻辑无关。

假如是把reactor模式做成一个网络库给用户使用。那用户就只需要关注处理请求的逻辑即可。该网络库对外开放setCallback函数。

假设,用户写服务器服务,收到客户端发来的数据(一串数字),想对数字做加法 或者想对数字做乘法都行。只要使用setCallback函数设置好处理请求的逻辑就行。这就是应用处理请求的逻辑,与事件分发框架完全分离,这是很方便的。

假如该框架是不对外开放的(就是用户不能使用setCallback),像Redis一样,为什么还需要使用reactor模式,为什么不可以只用IO多路复用,不用回调机制呢?

我认为,也是解耦的好处。Redis编写人员需要改变处理请求的逻辑时候,就只改变某个函数即可,不需要深入到epoll框架去改变,这就是很方便的。

6. 通过Redis网络部分,学到如何实现reactor模式

  1. 先对epoll进行封装,方便后续使用
  2. 需要对fd进行监听,并注册需要关注的事件类型,并注册关注的事件类型就绪时,需要执行的函数。所以,可以把fd关注的事件类型事件就绪执行的函数三者封装在一个结构体aeFileEvent内。
  3. 我们简单使用epoll时候,肯定是会写 while(1){ epoll_wait(....); }。这就是个事件循环,所以可以再封装个结构体EventLoop, 其也是对epoll的再次封装,即是会调用前面封装好的epoll的函数。EventLoop内部应该有存储aeFileEvent的数组。
    1. 在调用epoll_wait时,会返回就绪的fd和事件类型,把返回的(就绪fd和事件类型)数组 赋值给EventLoop的aeFileEvent数组。之后就是使用该aeFileEvent,根据事件类型执行对应的回调函数
  4. 提供setCallback函数,设置好对应的回调函数,即是把需要执行的函数 赋值给 aeFileEvent中的 事件就绪执行函数

Redis的reactor模式没有使用到结构体event_data_t的指针变量ptr。

若是想逐步实现recactor模式,这里推荐下本人写的0.仿造muduo,实现linux服务器开发思路

该文章专栏有详细的逐步实现的reactor模式的代码流程。欢迎查看。 


http://www.mrgr.cn/p/14658460

相关文章

uniapp app权限说明弹框2024.4.23更新

华为上架被拒绝 用uni-app开发的app&#xff0c;上架华为被拒&#xff0c;问题如下&#xff1a; 您的应用在运行时&#xff0c;未见向用户告知权限申请的目的&#xff0c;向用户索取&#xff08;电话、相机、存储&#xff09;等权限&#xff0c;不符合华为应用市场审核标准。…

iOS - 多线程-atomic

文章目录 iOS - 多线程-atomic1. 源码分析1.1 get方法1.2 set方法 2. 一般不使用atomic的原因 iOS - 多线程-atomic atomic用于保证属性setter、getter的原子性操作&#xff0c;相当于在getter和setter内部加了线程同步的锁可以参考源码objc4的objc-accessors.mm它并不能保证使…

数据结构单链表”质检员“*2

1.随机链表的逻辑结构复制 原题链接&#xff1a; . - 力扣&#xff08;LeetCode&#xff09;. - 备战技术面试&#xff1f;力扣提供海量技术面试资源&#xff0c;帮助你高效提升编程技能,轻松拿下世界 IT 名企 Dream Offer。https://leetcode-cn.com/problems/copy-list-with…

智慧旅游引领旅游行业创新发展:借助智能科技的力量,推动旅游服务的个性化、精准化,提升游客的满意度和忠诚度

随着信息技术的迅猛发展和广泛应用&#xff0c;智慧旅游已成为旅游行业创新发展的重要引擎。智慧旅游借助智能科技的力量&#xff0c;推动旅游服务的个性化、精准化&#xff0c;不仅提升了游客的满意度和忠诚度&#xff0c;也为旅游行业的可持续发展注入了新的活力。本文将从智…

《Vid2Seq》论文笔记

原文链接 [2302.14115] Vid2Seq: Large-Scale Pretraining of a Visual Language Model for Dense Video Captioning (arxiv.org) 原文笔记 What&#xff1a; 《Vid2Seq: Large-Scale Pretraining of a Visual Language Model for Dense Video Captioning》 作者提出一种多…

《软件设计师教程:数据库系统基础知识大总结》

​ 个人主页&#xff1a;李仙桎 &#x1f525; 个人专栏: 《软件设计师》 ⛺️生活的理想&#xff0c;就是为了理想的生活! ​ ⛺️前言&#xff1a;各位铁汁们好啊&#xff01;&#xff01;&#xff01;今天继续正式学习中级软件设计师考试相关的内容&#xff0c;后续不断更新…

Ansible 自动化运维

一、介绍 1、定义&#xff1a; ansible是自动化运维工具&#xff0c;基于Python开发&#xff0c;具有批量系统配置、批量程序部署、批量运行命令等功能。 ansible是基于 paramiko&#xff08;框架&#xff09; 开发的&#xff0c;并且基于模块化工作&#xff0c;本身没有批量…

node.js egg.js

Egg 是 Node.js 社区广泛使用的框架&#xff0c;简洁且扩展性强&#xff0c;按照固定约定进行开发&#xff0c;低协作成本。 在Egg.js框架中&#xff0c;ctx 是一个非常核心且常用的对象&#xff0c;全称为 Context&#xff0c;它代表了当前 HTTP 请求的上下文。ctx 对象封装了…

java中serverlet的体系结构

GenericServlet继承三个接口,HttpServerlet继承GenericServlet

React-editor-js not showing up in a function component

React-editor-js not showing up in a function component react-editor-js 在react 函数组件中显示不出来 真的&#xff0c;我马上就想放弃它了。但是看它周下载量还挺多&#xff0c;我不信别人没遇到过。于是我继续在网络上挖呀挖。只是我一开始的方向错了。我一直以为我的写…

Profinet转Modbus网关接称重设备与1200PLC通讯

Profinet转Modbus网关(XD-MDPN100)是一种能够实现Modbus协议和Profinet协议之间转换的设备。Profinet转Modbus网关可提供单个或多个RS485接口,使用Profinet转Modbus网关将称重设备与西门子1200 PLC进行通讯,可以避免繁琐的编程和配置过程,节省了工程师的时间和精力。其次,…

自动驾驶横向控制算法

本文内容来源是B站——忠厚老实的老王&#xff0c;侵删。 三个坐标系和一些有关的物理量 使用 frenet坐标系可以实现将车辆纵向控制和横向控制解耦&#xff0c;将其分开控制。使用右手系来进行学习。 一些有关物理量的基本概念&#xff1a; 运动学方程 建立微分方程 主要是弄…

给Qt搭建一个简单的Json服务器用于软件调试

一. vscode+nodejs+npm安装 二. nodejs服务器开启打开vscode - 终端 - 新建终端进入json_server目录 cd D:\json_server运行启动命令, 启动json-server服务器 npm run json:server效果如下: PS D:\json_server> npm run json:server> jsonserver@1.0.0 json:server > …

酒店订单管理系统搭建教程

1、演示环境配置 centos7.9、mysql5.7、php7.2 2、宝塔创建站点记录创建站点时候创建的数据库信息3、上传fastadmin压缩包点击开始上传 4、解压上传的fastadmin5、配置网站目录和运行目录运行目录选择public点击保存即可 6、配置伪静态选择thinkphp 7、直接访问域名根据自己的实…

加速软件定义汽车进程:安波福推出全栈式软硬件平台

随着智能汽车行业的飞速发展&#xff0c;“软件定义汽车”也得到了越来越多行业人士的认可&#xff0c;成为了汽车行业的大势所趋。为了推动和加速软件定义汽车的进程&#xff0c;也有越来越多的科技企业在为其不断添砖加瓦。 2024北京国际车展期间&#xff0c;安波福正式对外展…

14、pWnOS_v2.0(VulnHub)

pWnOS_v2.0 一、nmap 靶机ip找不见的自行上网查找解决办法。二、web渗透 目录爆破/blog/whatweb/search.php/register.php qwe 123qwe点击给定的链接兔子洞,无法登入?一直卡在这个界面wfuzz貌似没什么用nmap -> 目录Simple PHP Blog 0.4.0perl 1191.pl如果出现运行报错Can…

SQL Server实战三:数据库表完整性约束及索引、视图的创建、编辑与删除

本文介绍基于Microsoft SQL Server软件,实现数据库表完整性约束、索引与视图的创建、编辑与删除等操作的方法~本文介绍基于Microsoft SQL Server软件,实现数据库表完整性约束、索引与视图的创建、编辑与删除等操作的方法。 目录1 交互式为数据库表S创建PRIMARY KEY约束2 交互…

上位机开发-PyQt5

PyQt是一套Python的GUI开发框架&#xff0c;即图形用户界面开发框架 其中PyQt是Qt(C语言实现的)为Python专门提供的扩展 PySide官网&#xff1a;Qt for Python 插件安装 pip install 插件名字 # 安装 pip uninstall 插件名字 # 卸载 pip install 插件名字 -i 指定下载的镜…

vue.js 3 初学经验:开发环境搭建,Windows,nginx

Windows 11 nginx-1.20.0 "vue": "^3.4.21" ---序章 vue3 开发,不需要后端服务业是可以的。 在需要后端服务时,使用 nginx 来转发请求是很好的(个人开发者)。注,还有什么其它方式吗? 注,本文的后端服务 是使用 Java 开发的 HTTP 接口。 注,参考资料…

怎么给字符串字段加索引?

怎么给字符串字段加索引&#xff1f; 现在&#xff0c;几乎所有的系统都支持邮箱登录&#xff0c;如何在邮箱这样的字段上建立合理的索引&#xff0c;是我们今天要讨论的问题。 假设&#xff0c;你现在维护一个支持邮箱登录的系统&#xff0c;用户表是这么定义的&#xff1a; …