技术头条 - 一个快速在微博传播文章的方式     搜索本站
您现在的位置首页 --> 算法 --> Redis的事件循环与定时器模型

Redis的事件循环与定时器模型

浏览:3202次  出处信息

假期的最后一天,简单翻阅了下Redis的源码,读一款server软件的源码我一般是从进程/线程模型开始的,Redis让我有些诧异,它采用了单进程单线程的模型,一般的server软件都会采用多进程或者多线程再或者多线程多进程混合的模型来设计,从而充分利用多核处理器的并行计算能力来提高软件的性能,Redis这种模型我只能推断程序的可并行化程度不高,顺序计算反而能省去多线程同步和维护线程池/进程池的开销,我对于数据库server端的设计没有什么经验也没有太多的理解,如有谬误欢迎大家指正。

    当然,这里要写的不是关于Redis的进程模型,而是Redis的事件模型和定时器模型。

    Redis没有依赖libevent,而是自己通过IO多路复用的方式来实现了事件循环和定时器,不像nginx或者apache有多种多路复用方式可供选择,Redis只采用了三种:epoll/kqueue/select,默认采用epoll,在linux环境下最优的方式当然是epoll,当在FreeBSD平台下epoll不存在时则使用kqueue,当然若两种方式都未定义则使用性能最差的select,我只阅读了跟epoll相关的代码。

    在main()函数的最后调用了aeMain()这个函数进入Redis的事件循环,这个函数的很简单,循环调用aeProcessEvents()来对事件进行处理:

void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);
        aeProcessEvents(eventLoop, AE_ALL_EVENTS);
    }
}

    在此之前Redis做了很多初始化的工作,这些工作大多是在initServer()这个函数中执行的,初始化一些相关的list,dict等,调用aeCreateEventLoop()初始化eventloop,这个函数初始化eventloop相关的数据结构,并最终调用了epoll_create()函数,对epoll上下文进行初始化。紧接着Redis创建了用于listen的socket对象,并调用aeCreateFileEvent()把该socket描述符的读事件加入到事件池中去,另外,还调用了aeCreateTimeEvent()函数来初始化一下定时器,定期地执行serverCron()这个函数,接下来看一下aeCreateFileEvent()aeCreateTimeEvent()这两个函数。

    aeCreateFileEvent()这个函数初始化aeFileEvent结构(该结构保存事件的一些状态,以及事件的文件描述符等),并调用aeApiAddEvent()函数将描述符相关的事件添加到事件池中,对于epoll它的实现如下:

static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
    aeApiState *state = eventLoop->apidata;
    struct epoll_event ee;
    /* If the fd was already monitored for some event, we need a MOD
     * operation. Otherwise we need an ADD operation. */
    int op = eventLoop->events[fd].mask == AE_NONE ?
            EPOLL_CTL_ADD : EPOLL_CTL_MOD;
 
    ee.events = 0;
    mask |= eventLoop->events[fd].mask; /* Merge old events */
    if (mask & AE_READABLE) ee.events |= EPOLLIN;
    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
    ee.data.u64 = 0; /* avoid valgrind warning */
    ee.data.fd = fd;
    if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
    return 0;
}

    非常简洁,这个函数只不过是把epoll_ctl()相关的操作做了一下封装,至此描述符已经加入到事件池中进行监听了,接着看aeCreateTimeEvent()这个函数。

long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
        aeTimeProc *proc, void *clientData,
        aeEventFinalizerProc *finalizerProc)
{
    long long id = eventLoop->timeEventNextId++;
    aeTimeEvent *te;
 
    te = zmalloc(sizeof(*te));
    if (te == NULL) return AE_ERR;
    te->id = id;
    aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms);
    te->timeProc = proc;
    te->finalizerProc = finalizerProc;
    te->clientData = clientData;
    te->next = eventLoop->timeEventHead;
    eventLoop->timeEventHead = te;
    return id;
}

    同样是初始化数据结构,但没有调用aeApiAddEvent()这个函数,当然,定时器又不需要文件描述符,当然不需要添加相关事件,定时器的实现只是使用了epoll_wait()的定时功能,aeAddMillisecondsToNow()这个函数顾名思义是把当前时间加上一个给定的毫秒数,然后算出一个when_sec和when_ms,eventloop对象的timeEventHead实际上是一个单向链表,它用于保存所有的定时器事件,当添加一个定时器事件时其实只是向该链表头中插入了一个元素,其会后由aeProcessEvents()这个函数遍历该链表取出超时的事件进行处理,接着我们看下这个事件处理里面最核心的函数。

int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    int processed = 0, numevents;
 
    /* Nothing to do? return ASAP */
    if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
 
    /* Note that we want call select() even if there are no
     * file events to process as long as we want to process time
     * events, in order to sleep until the next time event is ready
     * to fire. */
    if (eventLoop->maxfd != -1 ||
        ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
        int j;
        aeTimeEvent *shortest = NULL;
        struct timeval tv, *tvp;
 
        if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
            shortest = aeSearchNearestTimer(eventLoop);
        if (shortest) {
            long now_sec, now_ms;
 
            /* Calculate the time missing for the nearest
             * timer to fire. */
            aeGetTime(&now_sec, &now_ms);
            tvp = &tv;
            tvp->tv_sec = shortest->when_sec - now_sec;
            if (shortest->when_ms < now_ms) {
                tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000;
                tvp->tv_sec --;
            } else {
                tvp->tv_usec = (shortest->when_ms - now_ms)*1000;
            }
            if (tvp->tv_sec < 0) tvp->tv_sec = 0;
            if (tvp->tv_usec < 0) tvp->tv_usec = 0;
        } else {
            /* If we have to check for events but need to return
             * ASAP because of AE_DONT_WAIT we need to se the timeout
             * to zero */
            if (flags & AE_DONT_WAIT) {
                tv.tv_sec = tv.tv_usec = 0;
                tvp = &tv;
            } else {
                /* Otherwise we can block */
                tvp = NULL; /* wait forever */
            }
        }
 
        numevents = aeApiPoll(eventLoop, tvp);
        for (j = 0; j < numevents; j++) {
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int rfired = 0;
 
	    /* note the fe->mask & mask & ... code: maybe an already processed
             * event removed an element that fired and we still didn\'t
             * processed, so we check if the event is still valid. */
            if (fe->mask & mask & AE_READABLE) {
                rfired = 1;
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
            }
            if (fe->mask & mask & AE_WRITABLE) {
                if (!rfired || fe->wfileProc != fe->rfileProc)
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
            }
            processed++;
        }
    }
    /* Check time events */
    if (flags & AE_TIME_EVENTS)
        processed += processTimeEvents(eventLoop);
 
    return processed; /* return the number of processed file/time events */
}

    这其中的aeApiPoll()这个函数其实就是对epoll_wait()操作的一个封装,epoll_wait()的最后一个参数是一个毫秒级的超时时间,Redis充分利用了这个时间在对IO事件进行监听的同时来实现了定时。这个函数的前面一大部分代码是在计算这个超时时间的,它调用aeSearchNearestTimer()这个函数来获取最近要超时的一个定时器对象,如何获取的呢?就是遍历刚才的提到的那个timeEventHead链表,来找出时间值最小的一个,注意是遍历,因为链表中的定时器也是无序的,不过我相信作者有一天会把它换成红黑树或者其它的数据结构吧。如果找到一个将要超时的定时器,则将它与当前时间进行比较,如果当前时间大于定时器时间则表示定时器已超时,将超时时间设为0,若当前时间小于定时器时间,则将超时时间设为两者之差。如果定时器队列为空,或者说没有任何定时器事件,则可以根据AE_DONT_WAIT这个标志来决定epoll_wait()是non-blocking立即返回,还是一直阻塞在那里。

    aeApiPoll()函数返回时,有两种情况,一种是IO事件被触发,另一种是定时器超时,当IO事件被触发时,遍历所有活跃描述符并调用相关的回调函数对其进行处理。当没有IO事件被触发,而是超时时,则返回值numevents为0,函数会转向processTimeEvents()来遍历定时器列表,调用定时器回调函数处理定时器事件,当IO事件被触发而并没有定时器超时时,如果设置了AE_TIME_EVENTS标志则也会对定时器列表进行遍历,主循环便是如此,我认为这会多少对效率有一定的影响,当然可能现在的Redis定时器列表并不太大,所以效率问题也可以忽略。

    以上是简单地对今天的工作做的总结,欢迎大家批评指教。

建议继续学习:

  1. Nginx源码分析-事件循环    (阅读:5055)
  2. Hive的入口 -- Hive源码解析    (阅读:4882)
  3. Storm源码浅析之topology的提交    (阅读:4530)
  4. Hive源码解析-之-语法解析器    (阅读:4378)
  5. Nginx源码分析-内存池    (阅读:4257)
  6. Nginx源码分析-Epoll模块    (阅读:4028)
  7. Lua GC 的源码剖析 (2)    (阅读:4006)
  8. Lua GC 的源码剖析 (4)    (阅读:3540)
  9. ExtJS源码研究笔记之总评    (阅读:3163)
  10. Lua GC 的源码剖析 (1)    (阅读:3163)
QQ技术交流群:445447336,欢迎加入!
扫一扫订阅我的微信号:IT技术博客大学习
© 2009 - 2025 by blogread.cn 微博:@IT技术博客大学习

京ICP备15002552号-1