IT技术博客大学习 共学习 共进步
全部 移动开发 后端 数据库 AI 算法 安全 DevOps 前端 设计 开发者

Nginx源码分析-事件循环

淘宝数据平台与产品部官方博客 tbdata.org 2010-12-21 01:55:06 累计浏览 6,204 次
本机暂存

事件循环这个概念貌似在windows编程中提得更多,Linux程序却很少提及这个概念。本文所提及的事件循环其实就是worker cycle,由于此处将关注的不再是worker进程,而是worker进程在循环过程中关于事件处理的环节,因此就盗用了事件循环这个概念。在具体看代码前,先看一下这个“循环”的概貌:
原图已失效

经过前面相关博文的介绍,我们了解到master进程创建好一个worker进程后,worker进程还会进行一个初始化工作,然后才会陷入“死”循环中。这个“死循环”也就是本文将谈及的事件循环,也就是上图中的黄色部分。整个黄色部份是由一个循环构成的,实际上,这个循环里将会做很多的事情,但本文将只关注图中红色标注的事件部分――ngx_process_events_and_timers。ngx_process_events_and_timers是一个函数(定义在src/event/ngx_event.c中)。接下来,就从这个函数开始进入事件驱动的核心。

void
ngx_process_events_and_timers(ngx_cycle_t *cycle)
{
    ngx_uint_t  flags;
    ngx_msec_t  timer, delta;
    if (ngx_timer_resolution) {
        timer = NGX_TIMER_INFINITE;
        flags = 0;
    } else {
        timer = ngx_event_find_timer();
        flags = NGX_UPDATE_TIME;
    }
	/*ngx_use_accept_mutex变量代表是否使用accept互斥体
	 默认是使用,accept_mutex off;指令关闭。
	 accept mutex的作用就是避免惊群,同时实现负载均衡。
	 */
    if (ngx_use_accept_mutex) {

    	/*
    	 ngx_accept_disabled变量在ngx_event_accept函数中计算。
    	 如果ngx_accept_disabled大于了0,就表示该进程接受的
    	 连接过多,因此就放弃一次争抢accept mutex的机会,同时将
    	 自己减1。然后,继续处理已有连接上的事件。Nginx就借用
    	 此变量实现了进程关于连接的基本负载均衡。
    	 */
        if (ngx_accept_disabled > 0) {
            ngx_accept_disabled--;
        } else {
        	/* ngx_accept_disabled小于0,连接数没超载*/

        	/*尝试锁accept mutex,只有成功获取锁的进程,才会将listen
        	  套接字放入epoll中。因此,这就保证了只有一个进程拥有
        	  监听套接口,故所有进程阻塞在epoll_wait时,不会出现惊群现象。
        	*/
            if (ngx_trylock_accept_mutex(cycle) == NGX_ERROR) {
                return;
            }
            if (ngx_accept_mutex_held) {
            	/*获取锁的进程,将添加一个NGX_POST_EVENTS标志,
            	  此标志的作用是将所有产生的事件放入一个队列中,
            	  等释放锁后,再慢慢来处理事件。因为,处理事件可能
            	  会很耗时,如果不先释放锁再处理的话,该进程就长
            	  时间霸占了锁,导致其他进程无法获取锁,这样accept
            	  的效率就低了。
            	*/
                flags |= NGX_POST_EVENTS;
            } else {

            	/*没有获得锁的进程,当然不需要NGX_POST_EVENTS标志了。
            	  但需要设置最长延迟多久,再次去争抢锁。
            	*/
                if (timer == NGX_TIMER_INFINITE
                    || timer > ngx_accept_mutex_delay)
                {
                    timer = ngx_accept_mutex_delay;
                }
            }
        }
    }

    delta = ngx_current_msec;
	/*epoll开始wait事件了,ngx_process_events的具体实现是对应到
	  epoll模块中的ngx_epoll_process_events函数。单独分析epoll
	  模块的时候,再具体看看。
	*/
    (void) ngx_process_events(cycle, timer, flags);
    delta = ngx_current_msec - delta; /*统计本次wait事件的耗时*/
	/*ngx_posted_accept_events是一个事件队列
	  暂存epoll从监听套接口wait到的accept事件。
	  前文提到的NGX_POST_EVENTS标志被使用后,就会将
	  所有的accept事件暂存到这个队列。

	  这里完成对队列中的accept事件的处理,实际就是调用
	  ngx_event_accept函数来获取一个新的连接,然后放入
	  epoll中。
	*/
    if (ngx_posted_accept_events) {
        ngx_event_process_posted(cycle, &ngx_posted_accept_events);
    }
	/*所有accept事件处理完成,如果拥有锁的话,就赶紧释放了。
	  其他进程还等着抢了。
	*/
    if (ngx_accept_mutex_held) {
        ngx_shmtx_unlock(&ngx_accept_mutex);
    }
	/*delta是上文对epoll wait事件的耗时统计,存在毫秒级的耗时
	  就对所有事件的timer进行检查,如果time out就从timer rbtree中,
	  删除到期的timer,同时调用相应事件的handler函数完成处理。
	*/
    if (delta) {
        ngx_event_expire_timers();
    }
	/*处理普通事件(连接上获得的读写事件)队列上的所有事件,
	  因为每个事件都有自己的handler方法,该怎么处理事件就
	  依赖于事件的具体handler了。
	*/
    if (ngx_posted_events) {
        if (ngx_threaded) {
            ngx_wakeup_worker_thread(cycle);
        } else {
            ngx_event_process_posted(cycle, &ngx_posted_events);
        }
    }
}

ngx_process_events_and_timers一做完工作,就又回到了事件循环中去了,上图示;但会很快又会回到事件处理中来。

上文中,分析了事件循环中有关事件处理的过程;在分析的过程中,我们有提到对accept事件的处理,accept事件就是监听套接口上有新的连接到来的事件;接下来,我们分析一下accept事件的handler方法,看看accept事件的处理过程是如何的。accept事件的handler方法是ngx_event_accept(位于src/event/ngx_event_accept.c中),代码分析如下:

void
ngx_event_accept(ngx_event_t *ev)
{
    socklen_t          socklen;
    ngx_err_t          err;
    ngx_log_t         *log;
    ngx_socket_t       s;
    ngx_event_t       *rev, *wev;
    ngx_listening_t   *ls;
    ngx_connection_t  *c, *lc;
    ngx_event_conf_t  *ecf;
    u_char             sa[NGX_SOCKADDRLEN];
	。。。。。。。。。。。。。。。。。

    lc = ev->data;
    ls = lc->listening;
    ev->ready = 0;
    do {
        socklen = NGX_SOCKADDRLEN;
		/*accept一个新的连接*/
        s = accept(lc->fd, (struct sockaddr *) sa, &socklen);
		/*连接的错误处理*/
        if (s == -1) {
            err = ngx_socket_errno;
            if (err == NGX_EAGAIN) {
                ngx_log_debug0(NGX_LOG_DEBUG_EVENT, ev->log, err,
                               "accept() not ready");
                return;
            }
            ngx_log_error((ngx_uint_t) ((err == NGX_ECONNABORTED) ?
                                             NGX_LOG_ERR : NGX_LOG_ALERT),
                          ev->log, err, "accept() failed");
            if (err == NGX_ECONNABORTED) {
                if (ngx_event_flags & NGX_USE_KQUEUE_EVENT) {
                    ev->available--;
                }
                if (ev->available) {
                    continue;
                }
            }
            return;
        }
		/*accept到一个新的连接后,就重新计算ngx_accept_disabled的值
		 ngx_accept_disabled已经提及过了,它主要用来做负载均衡之用。

		 这里,我们能够看到它的求值方式是“总连接数的八分之一,减去
		 剩余的连接数”。总连接数是指每个进程设定的最大连接数,这个数字
		 可以在配置文件中指定。由此处的计算方式,可以看出:每个进程accept
		 到总连接数的7/8后,ngx_accept_disabled就大于0了,连接也就
		 超载了。
		*/
        ngx_accept_disabled = ngx_cycle->connection_n / 8
                              - ngx_cycle->free_connection_n;
		/*从connections数组中获取一个connecttion slot来维护新的连接*/
        c = ngx_get_connection(s, ev->log);
        if (c == NULL) {
            if (ngx_close_socket(s) == -1) {
                ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_socket_errno,
                              ngx_close_socket_n " failed");
            }
            return;
        }
		/*为新的连接创建起一个memory pool,连接关闭的时候,才释放这个pool*/
        c->pool = ngx_create_pool(ls->pool_size, ev->log);
        if (c->pool == NULL) {
            ngx_close_accepted_connection(c);
            return;
        }
		。。。。。。。。。。。。。。
        /* set a blocking mode for aio and non-blocking mode for others */
        if (ngx_inherited_nonblocking) {
            if (ngx_event_flags & NGX_USE_AIO_EVENT) {
                if (ngx_blocking(s) == -1) {
                    ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_socket_errno,
                                  ngx_blocking_n " failed");
                    ngx_close_accepted_connection(c);
                    return;
                }
            }
        } else {
        	/*我们使用的epoll模型,在这里设置连接为nonblocking*/
            if (!(ngx_event_flags & (NGX_USE_AIO_EVENT|NGX_USE_RTSIG_EVENT))) {
                if (ngx_nonblocking(s) == -1) {
                    ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_socket_errno,
                                  ngx_nonblocking_n " failed");
                    ngx_close_accepted_connection(c);
                    return;
                }
            }
        }
        *log = ls->log;
		/*初始化新连接*/
        c->recv = ngx_recv;
        c->send = ngx_send;
        c->recv_chain = ngx_recv_chain;
        c->send_chain = ngx_send_chain;
        c->log = log;
        c->pool->log = log;
        c->socklen = socklen;
        c->listening = ls;
        c->local_sockaddr = ls->sockaddr;
        c->local_socklen = ls->socklen;
        c->unexpected_eof = 1;
        rev = c->read;
        wev = c->write;
        wev->ready = 1;
        if (ngx_event_flags & (NGX_USE_AIO_EVENT|NGX_USE_RTSIG_EVENT)) {
            /* rtsig, aio, iocp */
            rev->ready = 1;
        }
        if (ev->deferred_accept) {
            rev->ready = 1;
        }
        rev->log = log;
        wev->log = log;
        /*
         * TODO: MT: - ngx_atomic_fetch_add()
         *             or protection by critical section or light mutex
         *
         * TODO: MP: - allocated in a shared memory
         *           - ngx_atomic_fetch_add()
         *             or protection by critical section or light mutex
         */
        c->number = ngx_atomic_fetch_add(ngx_connection_counter, 1);
		。。。。。。。。。。。。。。。。。。。
        if (ngx_add_conn && (ngx_event_flags & NGX_USE_EPOLL_EVENT) == 0) {
            if (ngx_add_conn(c) == NGX_ERROR) {
                ngx_close_accepted_connection(c);
                return;
            }
        }
        log->data = NULL;
        log->handler = NULL;
		/*这里的listen handler很重要,它将完成新连接的最后初始化工作
		  同时将accept到的新连接放入epoll中;挂在这个handler上的函数
		  就是ngx_http_init_connection(位于src/http/ngx_http_request.c中);
		  这个函数放在分析http模块的时候再看吧。
		*/
        ls->handler(c);
        if (ngx_event_flags & NGX_USE_KQUEUE_EVENT) {
            ev->available--;
        }
    } while (ev->available);
}

accept事件的handler方法也就是如此了。剩下的就是每个连接上的读写事件的handler方法没有分析了,这一部分的内容将直接带领我们进入http模块中,所以等我们把epoll看完了,再开始http模块的分析吧。

备注:今天太阳真好,趁太阳还没有下山,就此收尾,然后出去享受周末的阳光。

同分类推荐文章

  1. Vibe新开源项目 - Vaala AI Gateway (2026-05-17 02:10:19)
  2. SmartPerfetto 架构文章 Q&A:8 个深度技术问答 (2026-04-10 11:00:00)
  3. 让 AI 把我的 PHP 博客重写成 Go (2026-03-27 18:33:54)

查看更多 后端 文章 →

建议继续学习

  1. 配置Nginx+uwsgi更方便地部署python应用 (累计阅读 106,963)
  2. 搜狐闪电邮箱的 Nginx/Postfix 使用模式 (累计阅读 33,821)
  3. 记录一个软中断问题 (累计阅读 16,885)
  4. 解析nginx负载均衡 (累计阅读 16,503)
  5. server日志的路径分析 (累计阅读 11,181)
  6. Nginx模块开发入门 (累计阅读 11,101)
  7. 检查nginx配置,重载配置以及重启的方法 (累计阅读 10,781)
  8. Cacti 添加 Nginx 监控 (累计阅读 10,521)
  9. 使用Squid缓存视频 (累计阅读 10,280)
  10. fsockopen 异步处理 (累计阅读 10,280)