HAProxy的event_accept函数源码分析
相信不少阅读过HAProxy代码的同学都感到头疼吧?说实话,HAProxy的代码风格属于比较烂的一种,一个函数大几百行,甚至几千行的情况比比皆是。可能是Willy Tarreau本人把精力集中在算法方面,对其它方面没那么重视的缘故吧。如果想把HAProxy的主要逻辑看明白,或者把文章写清楚,我建议要对它进行一些删减,最好能重构一下。下面,以event_accept()函数为例,尝试对其进行简单的分析,各位读者可以对照原来的函数,看看是不是更清楚明了一些。
开始的这一段基本上和原来的一样。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | extern int event_accept(int fd) { struct listener *l = fdtab[fd].owner; struct session *s; struct task *t; int cfd; int max_accept = global.tune.maxaccept; while (actconn < global.maxconn && max_accept--) { struct sockaddr_storage addr; socklen_t laddr = sizeof(addr); if ((cfd = accept(fd, (struct sockaddr *)&addr, &laddr)) == -1) { return 0; } |
接收到新的连接之后,首先是检查连接数和fd是否超出范围。
1 2 3 4 5 6 7 8 | // step 1, check number of connection and client socket fd if (l->nbconn >= l->maxconn) { goto out_close; } if (cfd >= global.maxsock) { goto out_close; } |
然后,设置socket的属性,本来这一段比较靠后,为了避免混乱,我把它提前到accept()之后。由于重构了一把,部分变量的意义也发生了变化,不过应该不影响理解,以下不再赘述。
1 2 3 4 5 6 7 8 9 10 11 12 | // step 2, set the client socket's attribute if ((fcntl(cfd, F_SETFL, O_NONBLOCK) == -1) || (setsockopt(cfd, IPPROTO_TCP, TCP_NODELAY, (char *) &one, sizeof(one)) == -1)) { goto out_close; } if (proxy.options & PR_O_TCP_CLI_KA) setsockopt(cfd, SOL_SOCKET, SO_KEEPALIVE, (char *) &one, sizeof(one)); if (proxy.options & PR_O_TCP_NOLING) setsockopt(cfd, SOL_SOCKET, SO_LINGER, (struct linger *) &nolinger, sizeof(struct linger)); |
接着,从pool2_session内存池中分配一个新的session,加入全局sessions链表,并设置初始值。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 | // step 3, allocate a new session and add it into session list if ((s = pool_alloc2(pool2_session)) == NULL) { disable_listener(l); proxy.state = PR_STIDLE; goto out_close; } LIST_ADDQ(&sessions, &s->list); s->flags = 0; s->cli_addr = addr; s->listener = l; s->conn_retries = proxy.conn_retries; s->srv_error = default_srv_error; s->srv = NULL; s->prev_srv = NULL; s->srv_conn = NULL; s->logs.tv_accept = now; s->logs.t_queue = -1; s->logs.t_connect = -1; s->logs.t_close = 0; |
第四步,从pool2_task内存池中分配一个新的task,设置其处理函数(t->process),并链接到刚刚分配的那个session(t->context和s->task)。在原来的代码中,这一段和前面三段是混杂在一起,显得没什么章法。
1 2 3 4 5 6 7 8 9 10 11 12 13 | // step 4, allocate a new task, attach the handler(process_session) to the task // more over, attach the task to the newly allocated session if ((t = task_new()) == NULL) { disable_listener(l); proxy.state = PR_STIDLE; goto out_free_session; } t->process = l->handler; t->context = s; t->expire = TICK_ETERNITY; s->task = t; |
第五步,指派该session的客户端stream interface(s->si[0]),新分配的那个task也同时链接到该stream interface(s->si[0].owner)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | // step 5, assign the session's client stream interface, newly allocated task also linked s->si[0].state = SI_ST_EST; s->si[0].prev_state = SI_ST_EST; s->si[0].err_type = SI_ET_NONE; s->si[0].err_loc = NULL; s->si[0].owner = t; s->si[0].update = stream_sock_data_finish; s->si[0].shutr = stream_sock_shutr; s->si[0].shutw = stream_sock_shutw; s->si[0].chk_rcv = stream_sock_chk_rcv; s->si[0].chk_snd = stream_sock_chk_snd; s->si[0].connect = NULL; s->si[0].iohandler = NULL; s->si[0].fd = cfd; s->si[0].flags = SI_FL_NONE; s->si[0].exp = TICK_ETERNITY; |
第六步,指派该session的服务端stream interface(s->si[1]),新分配的那个task也同时链接到该stream interface(s->s[1].owner)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | // step 6, assign the session's server stream interface, newly allocated task also linked s->si[1].state = SI_ST_INI; s->si[1].prev_state = SI_ST_INI; s->si[1].err_type = SI_ET_NONE; s->si[1].err_loc = NULL; s->si[1].owner = t; s->si[1].update = stream_sock_data_finish; s->si[1].shutr = stream_sock_shutr; s->si[1].shutw = stream_sock_shutw; s->si[1].chk_rcv = stream_sock_chk_rcv; s->si[1].chk_snd = stream_sock_chk_snd; s->si[1].connect = tcpv4_connect_server; s->si[1].iohandler = NULL; s->si[1].fd = -1; s->si[1].flags = SI_FL_NONE; s->si[1].exp = TICK_ETERNITY; |
第七步,从pool2_buffer内存池中为该session分配一个请求的缓冲区,初始化,并链接到客户端stream interface的输入缓冲区(s->si[0].ib)和服务端stream interface的输出缓冲区(s->si[1].ob)。还有,设置该缓冲区的生产者(s->req->prod)是客户端stream interface,消费者(s->req->cons)是服务端stream interface。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | // step 7, allocate request buffer, // link to client's input buffer and server's output buffer if ((s->req = pool_alloc2(pool2_buffer)) == NULL) goto out_fail_req; buffer_init(s->req); s->req->size = global.tune.bufsize; s->req->prod = &s->si[0]; s->req->cons = &s->si[1]; s->req->rto = proxy.timeout.client; s->req->wto = proxy.timeout.server; s->req->cto = proxy.timeout.connect; s->req->rex = TICK_ETERNITY; s->req->wex = TICK_ETERNITY; s->si[0].ib = s->req; s->si[1].ob = s->req; s->req->flags |= BF_READ_ATTACHED | BF_AUTO_CONNECT | BF_AUTO_CLOSE; |
第八步,从pool2_buffer内存池中为该session分配一个响应的缓冲区,初始化,并链接到客户端stream interface的输出缓冲区(s->si[0].ob)和服务端stream interface的输入缓冲区(s->si[1].ib)。还有,设置该缓冲区的生产者(s->rep->prod)是服务端stream interface,消费者(s->rep->cons)是客户端stream interface。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | // step 8, allocate response buffer, // link to client's output buffer and server's input buffer if ((s->rep = pool_alloc2(pool2_buffer)) == NULL) goto out_fail_rep; /* no memory */ buffer_init(s->rep); s->rep->size = global.tune.bufsize; s->rep->prod = &s->si[1]; s->rep->cons = &s->si[0]; s->rep->rto = proxy.timeout.server; s->rep->wto = proxy.timeout.client; s->rep->cto = TICK_ETERNITY; s->rep->rex = TICK_ETERNITY; s->rep->wex = TICK_ETERNITY; s->si[0].ob = s->rep; s->si[1].ib = s->rep; |
经过以上的处理,HAProxy在客户端和服务端之间初步建立了一条请求-响应的链路,如下图所示。从客户端到HAProxy的请求和从HAProxy到服务端的请求链接到同一段缓冲区,从服务端到HAProxy的响应和从HAProxy与客户端的响应也链接到同一段缓冲区,避免了内存拷贝。
第九步,设置fdtab。
1 2 3 4 5 6 7 8 9 10 11 12 | // step 9, insert the client socket fd into fd table fd_insert(cfd); fdtab[cfd].owner = &s->si[0]; fdtab[cfd].state = FD_STREADY; fdtab[cfd].cb[DIR_RD].f = l->proto->read; fdtab[cfd].cb[DIR_RD].b = s->req; fdtab[cfd].cb[DIR_WR].f = l->proto->write; fdtab[cfd].cb[DIR_WR].b = s->rep; fdinfo[cfd].peeraddr = (struct sockaddr *)&s->cli_addr; fdinfo[cfd].peerlen = sizeof(s->cli_addr); EV_FD_SET(cfd, DIR_RD); |
最后,唤醒该task。
1 2 | // step 10, call wakeup function task_wakeup(t, TASK_WOKEN_INIT); |
还有更改计数器等收尾工作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 | l->nbconn++; if (l->nbconn >= l->maxconn) { EV_FD_CLR(l->fd, DIR_RD); l->state = LI_FULL; } actconn++; totalconn++; } return 0; out_fail_rep: pool_free2(pool2_buffer, s->req); out_fail_req: task_free(t); out_free_session: LIST_DEL(&s->list); pool_free2(pool2_session, s); out_close: close(cfd); return 0; } |
至此,本文算是从上到下把event_accept()函数的处理粗略地过了一遍,重点是理顺流程,忽略一些次要的地方,不过,遗留下内存池、任务调度、事件响应等机制,以及session、task、stream interface、buffer、fdtab等重要数据结构没有深入分析,这些都计划在以后的文章中完成。
建议继续学习:
扫一扫订阅我的微信号:IT技术博客大学习
- 作者:若羽 来源: UC技术博客
- 标签: event_accept HAProxy
- 发布时间:2013-07-28 15:37:51
- [67] Oracle MTS模式下 进程地址与会话信
- [65] 如何拿下简短的域名
- [65] Go Reflect 性能
- [59] 图书馆的世界纪录
- [59] android 开发入门
- [59] 【社会化设计】自我(self)部分――欢迎区
- [58] IOS安全–浅谈关于IOS加固的几种方法
- [53] 视觉调整-设计师 vs. 逻辑
- [47] 读书笔记-壹百度:百度十年千倍的29条法则
- [46] 界面设计速成