前言:
此刻小伙伴们对“nginxredis关系”都比较关心,同学们都想要了解一些“nginxredis关系”的相关内容。那么小编也在网摘上网罗了一些关于“nginxredis关系””的相关文章,希望各位老铁们能喜欢,你们快快来学习一下吧!本篇文章将讲一下redis的网络架构,来揭秘redis接收数据的全过程
基础网络知识
再具体研究redis的代码之前,我们得首先得有一个大体的服务器如何接收到客户端数据的全过程。
因为redis 主要是用tcp在传输数据,所以是四层网络。
那么具体在应用层,我们又如何通过套接字接收一个客户端的数据了。
请看以下代码分析
redis的监听于地址绑定
server.c
/* Open the TCP listening socket for the user commands. */ //普通tcp的监听启动 if (server.port != 0 && //server.ipfd ip绑定地址 ipfd 数组长度为16,每一项表示一个绑定地址对应的句柄。 //ipfd_count 这里指的是监听地址的个数,具体跟你配置的地址有关系,一般我们配置127.0.0.1 那么他就是只会去和本机的服务进行通信 //ipfd 是一个整型数组,在这个方法里面会被赋值,ipfd[0] 等于文件描述符号,也可以理解为socket的id号 listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR) exit(1); //tls的监听启动 if (server.tls_port != 0 && listenToPort(server.tls_port,server.tlsfd,&server.tlsfd_count) == C_ERR) exit(1); /* Open the listening Unix domain socket. */ //适配unix if (server.unixsocket != NULL) { unlink(server.unixsocket); /* don't care if this fails */ server.sofd = anetUnixServer(server.neterr,server.unixsocket, server.unixsocketperm, server.tcp_backlog); if (server.sofd == ANET_ERR) { serverLog(LL_WARNING, "Opening Unix socket: %s", server.neterr); exit(1); } anetNonBlock(NULL,server.sofd); }
我们拿linux系统做为例子
要想一个客户端访问到服务器首先我们得使用socket 开放一个端口,告诉服务器现在这个端口再监听,当客户端访问该端口的时候,socket对应的文件句柄状态会发生改变。而每个端口又可以绑定多个地址,比如 127.0.0.1 我们只允许本地访问,比如0.0.0.0 表示所有ip都可以访问,等等。默认情况下redis 是用的0.0.0.0.
从第一个listenToPort进入后的代码如下:
server.cint listenToPort(int port, int *fds, int *count) { int j; /* Force binding of 0.0.0.0 if no bind address is specified, always * entering the loop if j == 0. */ //强制绑定 0.0.0.0 表示任何ip 可否问 if (server.bindaddr_count == 0) server.bindaddr[0] = NULL; //可以看到绑定的地址是一个集合 for (j = 0; j < server.bindaddr_count || j == 0; j++) { if (server.bindaddr[j] == NULL) { int unsupported = 0; /* Bind * for both IPv6 and IPv4, we enter here only if * server.bindaddr_count == 0. */ //如果绑定的地址为空,则初始化 ipv6 和 ipv4 //第一个是错误网络信息,提供的一个buffer //现实启动绑定ipv6的socket //fds 是一个整型数组, fds[*count] 表示这个地址下的,文件描述符 //server.tcp_backlog 指的是tcp协议里面,三次握手里面,服务端在tcp //握手的第三个阶段完成后,会将这些standby的 //连接放入这个accept queue 里面 //anetTcp6Server和anetTcpServer 会给这个监听的地址和端口分配一个fd fds[*count] = anetTcp6Server(server.neterr,port,NULL, server.tcp_backlog); if (fds[*count] != ANET_ERR) { //设置为非阻塞 //fds[*count] 应该等于设置的 server.tcp_backlog anetNonBlock(NULL,fds[*count]); (*count)++; } else if (errno == EAFNOSUPPORT) { unsupported++; serverLog(LL_WARNING,"Not listening to IPv6: unsupported"); } if (*count == 1 || unsupported) { /* Bind the IPv4 address as well. */ //下面的逻辑相同 fds[*count] = anetTcpServer(server.neterr,port,NULL, server.tcp_backlog); if (fds[*count] != ANET_ERR) { anetNonBlock(NULL,fds[*count]); (*count)++; } else if (errno == EAFNOSUPPORT) { unsupported++; serverLog(LL_WARNING,"Not listening to IPv4: unsupported"); } } /* Exit the loop if we were able to bind * on IPv4 and IPv6, * otherwise fds[*count] will be ANET_ERR and we'll print an * error and return to the caller with an error. */ if (*count + unsupported == 2) break; } //如果是ipv6的地址类型,单独做ipv6地址的监听 else if (strchr(server.bindaddr[j],':')) { /* Bind IPv6 address. */ fds[*count] = anetTcp6Server(server.neterr,port,server.bindaddr[j], server.tcp_backlog); } else { /* Bind IPv4 address. */ // ipv4的地址的绑定 fds[*count] = anetTcpServer(server.neterr,port,server.bindaddr[j], server.tcp_backlog); } if (fds[*count] == ANET_ERR) { //这里是打印日志的地方 serverLog(LL_WARNING, "Could not create server TCP listening socket %s:%d: %s", server.bindaddr[j] ? server.bindaddr[j] : "*", port, server.neterr); if (errno == ENOPROTOOPT || errno == EPROTONOSUPPORT || errno == ESOCKTNOSUPPORT || errno == EPFNOSUPPORT || errno == EAFNOSUPPORT || errno == EADDRNOTAVAIL) continue; return C_ERR; } anetNonBlock(NULL,fds[*count]); (*count)++; } return C_OK;}anet.cstatic int _anetTcpServer(char *err, int port, char *bindaddr, int af, int backlog){ int s = -1, rv; //最高的端口不超过65535 char _port[6]; /* strlen("65535") */ struct addrinfo hints, *servinfo, *p; snprintf(_port,6,"%d",port); memset(&hints,0,sizeof(hints)); //ipv6或者ipv4 两者主要区别在于,地址的位数不同,且格式也不同, //本质的用处都一样 hints.ai_family = af; //流式socket 即tcp 传输协议 hints.ai_socktype = SOCK_STREAM; //获取地址,即当bindaddr为null的时候,返回0.0.0.0 hints.ai_flags = AI_PASSIVE; /* No effect if bindaddr != NULL */ //IPv4中使用gethostbyname()函数完成主机名到地址解析,这个函数仅仅支持IPv4,且不允许调用者指定所需地址类型的任何信息,返回的结构只包含了用于存储IPv4地址的空间。 // IPv6中引入了getaddrinfo()的新API,它是协议无关的,既可用于IPv4也可用于IPv6。 // getaddrinfo函数能够处理名字到地址以及服务到端口这两种转换,返回的是一个addrinfo的结构(列表)指针而不是一个地址清单。 // 这些addrinfo结构随后可由套接口函数直接使用。如此以来,getaddrinfo函数把协议相关性安全隐藏在这个库函数内部。 // 应用程序只要处理由getaddrinfo函数填写的套接口地址结构。该函数在 POSIX规范中定义了。 if ((rv = getaddrinfo(bindaddr,_port,&hints,&servinfo)) != 0) { anetSetError(err, "%s", gai_strerror(rv)); return ANET_ERR; } for (p = servinfo; p != NULL; p = p->ai_next) { //如果socket 初始化不成功则跳过,比如端口被占用 if ((s = socket(p->ai_family,p->ai_socktype,p->ai_protocol)) == -1) continue; //如果ipv6初始化失败 而且仅仅只初始化ipv6 则进入error if (af == AF_INET6 && anetV6Only(err,s) == ANET_ERR) goto error; //设置地址复用,地址复用的意思就是可以允许进程复用端口 if (anetSetReuseAddr(err,s) == ANET_ERR) goto error; //绑定地址,初始化accept queue的长度 if (anetListen(err,s,p->ai_addr,p->ai_addrlen,backlog) == ANET_ERR) s = ANET_ERR; goto end; } if (p == NULL) { anetSetError(err, "unable to bind socket, errno: %d", errno); goto error; }error: if (s != -1) close(s); s = ANET_ERR;end: freeaddrinfo(servinfo); return s;}
上面的代码就是一个socket 初始化的过程,
获取一个socket 可认识的地址结构。然后调用socket方法,声明为tcp的网络模式还是udp的网络模式,返回得到一个文件描述符。设置绑定的地址,每个绑定地址都会对应一个fd(文件描述符),fd的表现形式就是一个int型的非负整数。这边要注意的是backlog这个参数,在上面代码给了比较详细的注释,backlog在并发比较高的状态下建议可以设置大一点。它一般和系统变量/proc/sys/net/core/somaxconn一起使用,两者取其小。
上面的整个过程就是,redis socket初始化的过程,下面要开始说redis socket 的监听环节。
推荐视频:
为什么C/C++程序员一定要阅读redis源码?腾讯面试教你做人
c++后端绕不开的7个开源项目,每一个源码值得深入研究
学习地址:C/C++Linux服务器开发/后台架构师【零声教育】-学习视频教程-腾讯课堂
需要更多C/C++ Linux服务器架构师学习资料加群812855908(资料包括C/C++,Linux,golang技术,内核,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK,ffmpeg,大厂面试题 等)
redis 的网络监听事件配置
上面的步骤可以让客户端发动信息到,redis绑定的端口上来了,但是连接千千万万,我们又如何采用合适的策略知道那些现在需要被处理的连接了,那就需要配置socket的相关监听了。
server.c //这里开始对我们监听的地址分配handler 事件 //一般情况下ipfd_count=1 for (j = 0; j < server.ipfd_count; j++) { //为这个socket创建一个只读事件 //并且为事件处理准备了一个handler,就是acceptTcpHandler if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL) == AE_ERR) { serverPanic( "Unrecoverable error creating server.ipfd file event."); } } int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData){ //fd 可以看成一个自增数, if (fd >= eventLoop->setsize) { errno = ERANGE; return AE_ERR; } //创建一个file event的引用 aeFileEvent *fe = &eventLoop->events[fd]; //将fd 放入到io监听事件里面去 if (aeApiAddEvent(eventLoop, fd, mask) == -1) return AE_ERR; fe->mask |= mask; //指定读handler和写handler if (mask & AE_READABLE) fe->rfileProc = proc; if (mask & AE_WRITABLE) fe->wfileProc = proc; //给予一个客户端data 的引用 fe->clientData = clientData; if (fd > eventLoop->maxfd) //更新当前最大文件描述符 eventLoop->maxfd = fd; return AE_OK;}/* Include the best multiplexing layer supported by this system. * The following should be ordered by performances, descending. */// 根据不同的系统 选用不同的io实现, 以下实现的性能按照上下排序, 1, ae_evport.c ,2 ae_epoll.c ,3 ae_kqueue.c ,4,ae_select.c// 在linux 系统会优先用到epoll#ifdef HAVE_EVPORT#include "ae_evport.c"#else #ifdef HAVE_EPOLL #include "ae_epoll.c" #else #ifdef HAVE_KQUEUE #include "ae_kqueue.c" #else #include "ae_select.c" #endif #endif#endif
这里面的知识点会比较多,比如epoll 的工作机制是如何,select 工作机制又是如何,但是这里重点讲的使用整个过程
可以看到这里我们会把刚刚的socket 相关的文件描述符放入到整个io事件监听体系里面去,在Linux 系统里面会默认用到epoll 这个模型,这里稍微再介绍下ae_epoll.c里面的几个方法
typedef struct aeApiState { int epfd; struct epoll_event *events;} aeApiState;// redis epoll的初始化static int aeApiCreate(aeEventLoop *eventLoop) { // epoll 模型结构分配空间 aeApiState *state = zmalloc(sizeof(aeApiState)); if (!state) return -1; //根据setSize 分配事件空间,一个事件对应一个fd state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize); if (!state->events) { zfree(state); return -1; } //创建一个epoll的模型, 1024 这个️代表的epoll 可能会处理的fd 数目,高核版本这个参数没有什么意义,epoll创建也会得到一个文件描述符,表示打开了一个文件,所以当不用的时候也是需要对其进行关闭。 state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */ if (state->epfd == -1) { zfree(state->events); zfree(state); return -1; } eventLoop->apidata = state; return 0;}//将目标fd的读写事件注册到epoll里面,让epoll的线程帮助那些可读可写的fd准备好,而不需要主线程阻塞式的去寻找。static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) { aeApiState *state = eventLoop->apidata; struct epoll_event ee = {0}; /* avoid valgrind warning */ /* If the fd was already monitored for some event, we need a MOD * operation. Otherwise we need an ADD operation. */ //判断当前文件描述符 是否已经被创建 int op = eventLoop->events[fd].mask == AE_NONE ? EPOLL_CTL_ADD : EPOLL_CTL_MOD; ee.events = 0; mask |= eventLoop->events[fd].mask; /* Merge old events */ if (mask & AE_READABLE) ee.events |= EPOLLIN; if (mask & AE_WRITABLE) ee.events |= EPOLLOUT; ee.data.fd = fd; //为这个fd 注册 epollin 或者 epollout 事件,如果之前已经创建过,则增加事件类型,正因为有修改和删除的操作,epoll 内部维护了一棵红黑树使得增加,修改,删除的事件复杂度为logn if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1; return 0;}//删除的逻辑,当可读,可写事件都存在的时候,删除操作也可以看作是一种修改。static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask) { aeApiState *state = eventLoop->apidata; struct epoll_event ee = {0}; /* avoid valgrind warning */ int mask = eventLoop->events[fd].mask & (~delmask); ee.events = 0; if (mask & AE_READABLE) ee.events |= EPOLLIN; if (mask & AE_WRITABLE) ee.events |= EPOLLOUT; ee.data.fd = fd; if (mask != AE_NONE) { //如果只是消除可读可写的事件某一件的时候那就是修改 epoll_ctl(state->epfd,EPOLL_CTL_MOD,fd,&ee); } else { /* Note, Kernel < 2.6.9 requires a non null event pointer even for * EPOLL_CTL_DEL. */ epoll_ctl(state->epfd,EPOLL_CTL_DEL,fd,&ee); }}//通过epoll wait 拉到对应的可被读写的fdstatic int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) { aeApiState *state = eventLoop->apidata; int retval, numevents = 0; //通过这个方法拉取到可读可写的fd, //第二个参数是拉取个中状态的事件,记住可以拉多种状态一次性 //第三个参数拉取事件最大数,一个fd对应一个事件,一个事件可以为即可读又可写的状态 //第四个参数为超时时间。-1 为一直等待。 retval = epoll_wait(state->epfd,state->events,eventLoop->setsize, tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1); if (retval > 0) { int j; numevents = retval; for (j = 0; j < numevents; j++) { int mask = 0; struct epoll_event *e = state->events+j; //可以看到c语言常用位数来表示状态,这样的话多状态就可以表现出来了。 if (e->events & EPOLLIN) mask |= AE_READABLE; if (e->events & EPOLLOUT) mask |= AE_WRITABLE; if (e->events & EPOLLERR) mask |= AE_WRITABLE|AE_READABLE; if (e->events & EPOLLHUP) mask |= AE_WRITABLE|AE_READABLE; eventLoop->fired[j].fd = e->data.fd; eventLoop->fired[j].mask = mask; } } return numevents;}epoll的使用总结
首先要明白epoll不是具体去与客户端的沟通的媒介, 而是一种对fd的监听架构,它主要运作在系统内核里面,当其监听下的fd的状态发生改变时,根据fd的注册事件类型,返回对应的fd,告诉主程序该fd需要处理。所以总结起来使用epoll 也非常简单,
1.初始化的时候,使用epoll_create
2.注册事件的时候,使用epoll_ctl
3. 拉取有状态改变的fd ,使用epoll_wait
events可以是以下几个宏的集合:
EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭);
EPOLLOUT:表示对应的文件描述符可以写;
EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);
EPOLLERR:表示对应的文件描述符发生错误;
EPOLLHUP:表示对应的文件描述符被挂断;
EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。
EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里
为了避免对redis代码理解引入新的问题,我们需要着重关注两个状态,即在redis里用到的EPOLLIN 和 EPOLLOUT。
其实看完上面的代码之后我们一定会有一个疑问,我们只注册了一个(默认情况下只有一个)fd到epoll 里面去,那我们为什么还要使用epoll了,下面继续分析
redis 的网络监听事件
首先我们回到server.c的代码
void aeMain(aeEventLoop *eventLoop) { eventLoop->stop = 0; //这里进入死循环 while (!eventLoop->stop) { //启动事件 aeProcessEvents(eventLoop, AE_ALL_EVENTS| AE_CALL_BEFORE_SLEEP| AE_CALL_AFTER_SLEEP); }}
可以看到上面进入aeMain 这个方法之后,主线程会循环调用处理事件方法。
/* Process every pending time event, then every pending file event * (that may be registered by time event callbacks just processed). * Without special flags the function sleeps until some file event * fires, or when the next time event occurs (if any). * * If flags is 0, the function does nothing and returns. * if flags has AE_ALL_EVENTS set, all the kind of events are processed. * if flags has AE_FILE_EVENTS set, file events are processed. * if flags has AE_TIME_EVENTS set, time events are processed. * if flags has AE_DONT_WAIT set the function returns ASAP until all * the events that's possible to process without to wait are processed. * if flags has AE_CALL_AFTER_SLEEP set, the aftersleep callback is called. * if flags has AE_CALL_BEFORE_SLEEP set, the beforesleep callback is called. * * The function returns the number of events processed. */ // 可以看到redis 是一个事件驱动型的设计模式,会循环调用时间事件 //(如cycle 之前提到的过期键的处理) 然后还有文件事件(其实就是处理客户端的请求), // 每次循环完还有一些回调的策略方法放在我们的beforesleep,aftersleep 里面。 // 下面这段代码,我们首先关注文件型事件。int aeProcessEvents(aeEventLoop *eventLoop, int flags){ int processed = 0, numevents; /* Nothing to do? return ASAP */ if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0; /* Note that we want call select() even if there are no * file events to process as long as we want to process time * events, in order to sleep until the next time event is ready * to fire. */ if (eventLoop->maxfd != -1 || ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) { int j; aeTimeEvent *shortest = NULL; struct timeval tv, *tvp; if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT)) shortest = aeSearchNearestTimer(eventLoop); if (shortest) { long now_sec, now_ms; aeGetTime(&now_sec, &now_ms); tvp = &tv; /* How many milliseconds we need to wait for the next * time event to fire? */ long long ms = (shortest->when_sec - now_sec)*1000 + shortest->when_ms - now_ms; if (ms > 0) { tvp->tv_sec = ms/1000; tvp->tv_usec = (ms % 1000)*1000; } else { tvp->tv_sec = 0; tvp->tv_usec = 0; } } else { /* If we have to check for events but need to return * ASAP because of AE_DONT_WAIT we need to set the timeout * to zero */ if (flags & AE_DONT_WAIT) { tv.tv_sec = tv.tv_usec = 0; tvp = &tv; } else { /* Otherwise we can block */ tvp = NULL; /* wait forever */ } } if (eventLoop->flags & AE_DONT_WAIT) { tv.tv_sec = tv.tv_usec = 0; tvp = &tv; } if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP) eventLoop->beforesleep(eventLoop); /* Call the multiplexing API, will return only on timeout or when * some event fires. */ // 获取事件个数 numevents = aeApiPoll(eventLoop, tvp); /* After sleep callback. */ if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP) eventLoop->aftersleep(eventLoop); //这里是开始处理网络请求的地方。 for (j = 0; j < numevents; j++) { aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd]; int mask = eventLoop->fired[j].mask; int fd = eventLoop->fired[j].fd; int fired = 0; /* Number of events fired for current fd. */ /* Normally we execute the readable event first, and the writable * event laster. This is useful as sometimes we may be able * to serve the reply of a query immediately after processing the * query. * * However if AE_BARRIER is set in the mask, our application is * asking us to do the reverse: never fire the writable event * after the readable. In such a case, we invert the calls. * This is useful when, for instance, we want to do things * in the beforeSleep() hook, like fsynching a file to disk, * before replying to a client. */ // 这个fe 是代表这个事件的总状态,首先要保证读事件,要before 于写事件,这样的好处在于我们能够将处理好的命令回滚, // 而不至于一些同步问题导致数据无法回滚,给予客户端错误的状态,比如我们 fsyching 数据到硬盘,我们想成功之后才将数据返回给客户端 int invert = fe->mask & AE_BARRIER; /* Note the "fe->mask & mask & ..." code: maybe an already * processed event removed an element that fired and we still * didn't processed, so we check if the event is still valid. * * Fire the readable event if the call sequence is not * inverted. */ if (!invert && fe->mask & mask & AE_READABLE) { //在这里调用我们的readder handler fe->rfileProc(eventLoop,fd,fe->clientData,mask); fired++; fe = &eventLoop->events[fd]; /* Refresh in case of resize. */ } /* Fire the writable event. */ if (fe->mask & mask & AE_WRITABLE) { if (!fired || fe->wfileProc != fe->rfileProc) { fe->wfileProc(eventLoop,fd,fe->clientData,mask); fired++; } } /* If we have to invert the call, fire the readable event now * after the writable one. */ if (invert) { fe = &eventLoop->events[fd]; /* Refresh in case of resize. */ if ((fe->mask & mask & AE_READABLE) && (!fired || fe->wfileProc != fe->rfileProc)) { fe->rfileProc(eventLoop,fd,fe->clientData,mask); fired++; } } processed++; } } /* Check time events */ if (flags & AE_TIME_EVENTS) processed += processTimeEvents(eventLoop); return processed; /* return the number of processed file/time events */}
从上面代码可以看到如何处理file event的过程
1.我们先调用到aeApiPoll,获取需要被处理的event个数,
2.我们传入的eventloop 里面放入了需要被处理的事件。
3.遍历eventloop里面的每一项。
4.对于可读事件调用read handler 来处理。
5.对于可写事件调用write handler 来处理。
上面还有一些回滚的处理流程,这个再后续功能模块再继续讨论。
初始化客户端连接的read handler处理流程
// 这个是服务端fd注册的读事件响应,它的作用主要用于接收到新的客户端连接,然后将它注册到epoll里面去,后面接收客户端数据的handler 就不在这边处理了。void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) { int cport, cfd, max = MAX_ACCEPTS_PER_CALL; char cip[NET_IP_STR_LEN]; UNUSED(el); UNUSED(mask); UNUSED(privdata); while(max--) { //跟客户端建立通道,为客户端分配一个fd. cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport); if (cfd == ANET_ERR) { if (errno != EWOULDBLOCK) serverLog(LL_WARNING, "Accepting client connection: %s", server.neterr); return; } serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport); //connCreateAcceptedSocket 主要用于初始化客户端的连接 //acceptCommonHandler 这个方法适用于接收数据的地方 acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip); }}//初始化一个连接,connection *connCreateAcceptedSocket(int fd) { connection *conn = connCreateSocket(); conn->fd = fd; conn->state = CONN_STATE_ACCEPTING; return conn;}//初始化连接connection *connCreateSocket() { //分配空间 connection *conn = zcalloc(sizeof(connection)); //CT_Socket 是一个结构体 conn->type = &CT_Socket; conn->fd = -1; return conn;}connection *connCreateSocket() { connection *conn = zcalloc(sizeof(connection)); conn->type = &CT_Socket; conn->fd = -1; return conn;}//可以看到新的socket 里面分配了各种handlerConnectionType CT_Socket = { .ae_handler = connSocketEventHandler, .close = connSocketClose, .write = connSocketWrite, .read = connSocketRead, .accept = connSocketAccept, .connect = connSocketConnect, .set_write_handler = connSocketSetWriteHandler, .set_read_handler = connSocketSetReadHandler, .get_last_error = connSocketGetLastError, .blocking_connect = connSocketBlockingConnect, .sync_write = connSocketSyncWrite, .sync_read = connSocketSyncRead, .sync_readline = connSocketSyncReadLine, .get_type = connSocketGetType};static void acceptCommonHandler(connection *conn, int flags, char *ip) {...... //为客户端连接分配一个接收数据的结构体 // 并将新的连接放入epoll 里面 if ((c = createClient(conn)) == NULL) { serverLog(LL_WARNING, "Error registering fd event for the new client: %s (conn: %s)", connGetLastError(conn), connGetInfo(conn, conninfo, sizeof(conninfo))); connClose(conn); /* May be already closed, just ignore errors */ return; }...... client *createClient(connection *conn) { client *c = zmalloc(sizeof(client)); /* passing NULL as conn it is possible to create a non connected client. * This is useful since all the commands needs to be executed * in the context of a client. When commands are executed in other * contexts (for instance a Lua script) we need a non connected client. */ if (conn) { connNonBlock(conn); connEnableTcpNoDelay(conn); if (server.tcpkeepalive) connKeepAlive(conn,server.tcpkeepalive); //设置readhandler ,readQueryFromClient // 就是我们上篇我们讲的io多线程读的地方了 connSetReadHandler(conn, readQueryFromClient); connSetPrivateData(conn, c); } ...... static inline int connSetReadHandler(connection *conn, ConnectionCallbackFunc func) { //这个地方将会调用到connSocketSetReadHandler, //func 就是上面的readQueryFromClient return conn->type->set_read_handler(conn, func);} //设置新的reader handlerstatic int connSocketSetReadHandler(connection *conn, ConnectionCallbackFunc func) { // if (func == conn->read_handler) return C_OK; //将readhandler 覆盖成readQueryFromClient conn->read_handler = func; if (!conn->read_handler) aeDeleteFileEvent(server.el,conn->fd,AE_READABLE); else //将新的fd放入到epoll里面 //新的fd有可读事件的时候回调函数是connSocketEventHandler if (aeCreateFileEvent(server.el,conn->fd, AE_READABLE,conn->type->ae_handler,conn) == AE_ERR) return C_ERR; return C_OK;} ...... //这个地方是接收数据处理 if (connAccept(conn, clientAcceptHandler) == C_ERR) { char conninfo[100]; if (connGetState(conn) == CONN_STATE_ERROR) serverLog(LL_WARNING, "Error accepting a client connection: %s (conn: %s)", connGetLastError(conn), connGetInfo(conn, conninfo, sizeof(conninfo))); freeClient(connGetPrivateData(conn)); return; } }....//这里会调用到CT_Socket 里面的connSocketAccept方法static inline int connAccept(connection *conn, ConnectionCallbackFunc accept_handler) { return conn->type->accept(conn, accept_handler);}static int connSocketAccept(connection *conn, ConnectionCallbackFunc accept_handler) { int ret = C_OK; //判断状态 if (conn->state != CONN_STATE_ACCEPTING) return C_ERR; conn->state = CONN_STATE_CONNECTED; connIncrRefs(conn); //这里又会调用到clientAcceptHandler // 而clientAcceptHandler 看作者注释主要对于一些安全模式的验证, // 在这个环节暂时不研究 if (!callHandler(conn, accept_handler)) ret = C_ERR; connDecrRefs(conn); return ret;}
以上代码总结下来主线主要就是看了以下几件事
1,当有新连接过来的时候,分配一个fd给这个客户端。
2,将这个新的fd加入到epoll体系里面,即上面的handler 只负责新的连接初始化的工作。
3,一些安全验证,还有redis额外模块加载(丰富其拓展性)等事情。
初始化新的连接后又可以回到下一轮的epoll_wait,这个时候就能从刚刚连接里面把数据给读上来了。
读客户端连接数据的handler处理流程
/** * 可以看到下面的流程就是标准的读写流程,这边暂时来研究读的这部分 * @param el * @param fd * @param clientData * @param mask */static void connSocketEventHandler(struct aeEventLoop *el, int fd, void *clientData, int mask){ UNUSED(el); UNUSED(fd);....... int call_write = (mask & AE_WRITABLE) && conn->write_handler; int call_read = (mask & AE_READABLE) && conn->read_handler; /* Handle normal I/O flows */ if (!invert && call_read) { //这里就会调用到readQueryFromClient if (!callHandler(conn, conn->read_handler)) return; } /* Fire the writable event. */ if (call_write) { if (!callHandler(conn, conn->write_handler)) return; } /* If we have to invert the call, fire the readable event now * after the writable one. */ if (invert && call_read) { if (!callHandler(conn, conn->read_handler)) return; }.......}
当新的fd有可读事件的时候,会从这里把数据读到client buffer
总结:
这篇文章讲述了从redis如何通过网络监听的方式到建立连接到读取数据的整个流程。能够学习到的知识点就是redis如何去使用网络初始化,如何去监听端口,又如何使用io多路复用的方式,全篇干货比较多,需要细心阅读,本篇文章最好能够对着代码,延着作者思路对照着读,文中也给出了比较详细的注释,希望能为你读源码的时候提供帮助
读源码小技巧分享
可以看到上面的文章里面其实还有一些判断和其它分支,并没有给出详细的解释,因为作者并不是redis的源码编写者,编写者本身对其有很多细节思考,但是我们不必对每个细节都去展开深入,这样会妨碍对于主线的探索,可以通过读懂其主线代码,然后再返回过去看其分支细节,这样就会有茅塞顿开的感觉。
标签: #nginxredis关系