IT技术博客大学习 共学习 共进步
全部 移动开发 后端 数据库 AI 算法 安全 DevOps 前端 设计 开发者

HAProxy的event_accept函数源码分析

UC技术博客 2013-07-28 15:37:51 累计浏览 2,829 次
本机暂存

   相信不少阅读过HAProxy代码的同学都感到头疼吧?说实话,HAProxy的代码风格属于比较烂的一种,一个函数大几百行,甚至几千行的情况比比皆是。可能是Willy Tarreau本人把精力集中在算法方面,对其它方面没那么重视的缘故吧。如果想把HAProxy的主要逻辑看明白,或者把文章写清楚,我建议要对它进行一些删减,最好能重构一下。下面,以event_accept()函数为例,尝试对其进行简单的分析,各位读者可以对照原来的函数,看看是不是更清楚明了一些。

   开始的这一段基本上和原来的一样。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
extern int event_accept(int fd)
{
	struct listener *l = fdtab[fd].owner;
	struct session *s;
	struct task *t;
	int cfd;
	int max_accept = global.tune.maxaccept;
 
	while (actconn < global.maxconn && max_accept--) {
		struct sockaddr_storage addr;
		socklen_t laddr = sizeof(addr);
 
		if ((cfd = accept(fd, (struct sockaddr *)&addr, &laddr)) == -1) {
			return 0;
		}

   接收到新的连接之后,首先是检查连接数和fd是否超出范围。

1
2
3
4
5
6
7
8
		// step 1, check number of connection and client socket fd
		if (l->nbconn >= l->maxconn) {
			goto out_close;
		}
 
		if (cfd >= global.maxsock) {
			goto out_close;
		}

   然后,设置socket的属性,本来这一段比较靠后,为了避免混乱,我把它提前到accept()之后。由于重构了一把,部分变量的意义也发生了变化,不过应该不影响理解,以下不再赘述。

1
2
3
4
5
6
7
8
9
10
11
12
		// step 2, set the client socket's attribute
		if ((fcntl(cfd, F_SETFL, O_NONBLOCK) == -1) ||
		    (setsockopt(cfd, IPPROTO_TCP, TCP_NODELAY,
				(char *) &one, sizeof(one)) == -1)) {
			goto out_close;
		}
 
		if (proxy.options & PR_O_TCP_CLI_KA)
			setsockopt(cfd, SOL_SOCKET, SO_KEEPALIVE, (char *) &one, sizeof(one));
 
		if (proxy.options & PR_O_TCP_NOLING)
			setsockopt(cfd, SOL_SOCKET, SO_LINGER, (struct linger *) &nolinger, sizeof(struct linger));

   接着,从pool2_session内存池中分配一个新的session,加入全局sessions链表,并设置初始值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
		// step 3, allocate a new session and add it into session list
		if ((s = pool_alloc2(pool2_session)) == NULL) {
			disable_listener(l);
			proxy.state = PR_STIDLE;
			goto out_close;
		}
 
		LIST_ADDQ(&sessions, &s->list);
 
		s->flags        = 0;
		s->cli_addr     = addr;
		s->listener     = l;
		s->conn_retries = proxy.conn_retries;
		s->srv_error    = default_srv_error;
		s->srv          = NULL;
		s->prev_srv     = NULL;
		s->srv_conn     = NULL;
 
		s->logs.tv_accept = now;
		s->logs.t_queue   = -1;
		s->logs.t_connect = -1;
		s->logs.t_close   = 0;

   第四步,从pool2_task内存池中分配一个新的task,设置其处理函数(t->process),并链接到刚刚分配的那个session(t->context和s->task)。在原来的代码中,这一段和前面三段是混杂在一起,显得没什么章法。

1
2
3
4
5
6
7
8
9
10
11
12
13
		// step 4, allocate a new task, attach the handler(process_session) to the task
		// more over, attach the task to the newly allocated session
		if ((t = task_new()) == NULL) {
			disable_listener(l);
			proxy.state = PR_STIDLE;
			goto out_free_session;
		}
 
		t->process = l->handler;
		t->context = s;
		t->expire  = TICK_ETERNITY;
 
		s->task    = t;

   第五步,指派该session的客户端stream interface(s->si[0]),新分配的那个task也同时链接到该stream interface(s->si[0].owner)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
		// step 5, assign the session's client stream interface, newly allocated task also linked
		s->si[0].state      = SI_ST_EST;
		s->si[0].prev_state = SI_ST_EST;
		s->si[0].err_type   = SI_ET_NONE;
		s->si[0].err_loc    = NULL;
		s->si[0].owner      = t;
		s->si[0].update     = stream_sock_data_finish;
		s->si[0].shutr      = stream_sock_shutr;
		s->si[0].shutw      = stream_sock_shutw;
		s->si[0].chk_rcv    = stream_sock_chk_rcv;
		s->si[0].chk_snd    = stream_sock_chk_snd;
		s->si[0].connect    = NULL;
		s->si[0].iohandler  = NULL;
		s->si[0].fd         = cfd;
		s->si[0].flags      = SI_FL_NONE;
		s->si[0].exp        = TICK_ETERNITY;

   第六步,指派该session的服务端stream interface(s->si[1]),新分配的那个task也同时链接到该stream interface(s->s[1].owner)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
		// step 6, assign the session's server stream interface, newly allocated task also linked
		s->si[1].state      = SI_ST_INI;
		s->si[1].prev_state = SI_ST_INI;
		s->si[1].err_type   = SI_ET_NONE;
		s->si[1].err_loc    = NULL;
		s->si[1].owner      = t;
		s->si[1].update     = stream_sock_data_finish;
		s->si[1].shutr      = stream_sock_shutr;
		s->si[1].shutw      = stream_sock_shutw;
		s->si[1].chk_rcv    = stream_sock_chk_rcv;
		s->si[1].chk_snd    = stream_sock_chk_snd;
		s->si[1].connect    = tcpv4_connect_server;
		s->si[1].iohandler  = NULL;
		s->si[1].fd         = -1;
		s->si[1].flags      = SI_FL_NONE;
		s->si[1].exp        = TICK_ETERNITY;

   第七步,从pool2_buffer内存池中为该session分配一个请求的缓冲区,初始化,并链接到客户端stream interface的输入缓冲区(s->si[0].ib)和服务端stream interface的输出缓冲区(s->si[1].ob)。还有,设置该缓冲区的生产者(s->req->prod)是客户端stream interface,消费者(s->req->cons)是服务端stream interface。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
		// step 7, allocate request buffer,
		// link to client's input buffer and server's output buffer
		if ((s->req = pool_alloc2(pool2_buffer)) == NULL)
			goto out_fail_req;
 
		buffer_init(s->req);
		s->req->size = global.tune.bufsize;
		s->req->prod = &s->si[0];
		s->req->cons = &s->si[1];
		s->req->rto  = proxy.timeout.client;
		s->req->wto  = proxy.timeout.server;
		s->req->cto  = proxy.timeout.connect;
		s->req->rex  = TICK_ETERNITY;
		s->req->wex  = TICK_ETERNITY;
 
		s->si[0].ib  = s->req;
		s->si[1].ob  = s->req;
 
		s->req->flags |= BF_READ_ATTACHED | BF_AUTO_CONNECT | BF_AUTO_CLOSE;

   第八步,从pool2_buffer内存池中为该session分配一个响应的缓冲区,初始化,并链接到客户端stream interface的输出缓冲区(s->si[0].ob)和服务端stream interface的输入缓冲区(s->si[1].ib)。还有,设置该缓冲区的生产者(s->rep->prod)是服务端stream interface,消费者(s->rep->cons)是客户端stream interface。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
		// step 8, allocate response buffer,
		// link to client's output buffer and server's input buffer
		if ((s->rep = pool_alloc2(pool2_buffer)) == NULL)
			goto out_fail_rep; /* no memory */
 
		buffer_init(s->rep);
		s->rep->size = global.tune.bufsize;
		s->rep->prod = &s->si[1];
		s->rep->cons = &s->si[0];
		s->rep->rto  = proxy.timeout.server;
		s->rep->wto  = proxy.timeout.client;
		s->rep->cto  = TICK_ETERNITY;
		s->rep->rex  = TICK_ETERNITY;
		s->rep->wex  = TICK_ETERNITY;
 
		s->si[0].ob  = s->rep;
		s->si[1].ib  = s->rep;

   经过以上的处理,HAProxy在客户端和服务端之间初步建立了一条请求-响应的链路,如下图所示。从客户端到HAProxy的请求和从HAProxy到服务端的请求链接到同一段缓冲区,从服务端到HAProxy的响应和从HAProxy与客户端的响应也链接到同一段缓冲区,避免了内存拷贝。

   client_haproxy_server

   第九步,设置fdtab。

1
2
3
4
5
6
7
8
9
10
11
12
		// step 9, insert the client socket fd into fd table
		fd_insert(cfd);
		fdtab[cfd].owner        = &s->si[0];
		fdtab[cfd].state        = FD_STREADY;
		fdtab[cfd].cb[DIR_RD].f = l->proto->read;
		fdtab[cfd].cb[DIR_RD].b = s->req;
		fdtab[cfd].cb[DIR_WR].f = l->proto->write;
		fdtab[cfd].cb[DIR_WR].b = s->rep;
		fdinfo[cfd].peeraddr    = (struct sockaddr *)&s->cli_addr;
		fdinfo[cfd].peerlen     = sizeof(s->cli_addr);
 
		EV_FD_SET(cfd, DIR_RD);

   最后,唤醒该task。

1
2
		// step 10, call wakeup function
		task_wakeup(t, TASK_WOKEN_INIT);

   还有更改计数器等收尾工作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
		l->nbconn++;
		if (l->nbconn >= l->maxconn) {
			EV_FD_CLR(l->fd, DIR_RD);
			l->state = LI_FULL;
		}
 
		actconn++;
		totalconn++;
	}
	return 0;
 
 out_fail_rep:
	pool_free2(pool2_buffer, s->req);
 out_fail_req:
	task_free(t);
 out_free_session:
	LIST_DEL(&s->list);
	pool_free2(pool2_session, s);
 out_close:
	close(cfd);
	return 0;
}

   至此,本文算是从上到下把event_accept()函数的处理粗略地过了一遍,重点是理顺流程,忽略一些次要的地方,不过,遗留下内存池、任务调度、事件响应等机制,以及session、task、stream interface、buffer、fdtab等重要数据结构没有深入分析,这些都计划在以后的文章中完成。

同分类推荐文章

  1. 等了十年的 Go 链式管道,终于来了:seq 让你像写 Scala 一样写 Go (2026-06-25 18:38:18)
  2. Go 实验特性详解 (2026-06-21 10:05:27)
  3. amd64 微架构级别对 Go 程序性能提升多少? (2026-06-21 09:38:49)

查看更多 后端 文章 →

建议继续学习

  1. 解析nginx负载均衡 (累计阅读 16,622)
  2. libcurl的使用总结(二) (累计阅读 15,083)
  3. Facebook 网站架构 (累计阅读 11,112)
  4. 使用Apache 和Passenger来运行puppetmaster (累计阅读 8,316)
  5. 浅析linux kernel network之socket创建 (累计阅读 6,740)
  6. 使用HAProxy对MySQL进行负载均衡和状态监控 (累计阅读 6,733)
  7. TCP链接主动关闭不发fin包奇怪行为分析 (累计阅读 6,710)
  8. TCP之close_wait (累计阅读 6,543)
  9. LVS hash size解决4096个并发的问题 (累计阅读 6,410)
  10. 由12306.cn谈谈网站性能技术 (累计阅读 6,398)