技术头条 - 一个快速在微博传播文章的方式     搜索本站
您现在的位置首页 --> 源码分析 --> HAProxy的event_accept函数源码分析

HAProxy的event_accept函数源码分析

浏览:1691次  出处信息

   相信不少阅读过HAProxy代码的同学都感到头疼吧?说实话,HAProxy的代码风格属于比较烂的一种,一个函数大几百行,甚至几千行的情况比比皆是。可能是Willy Tarreau本人把精力集中在算法方面,对其它方面没那么重视的缘故吧。如果想把HAProxy的主要逻辑看明白,或者把文章写清楚,我建议要对它进行一些删减,最好能重构一下。下面,以event_accept()函数为例,尝试对其进行简单的分析,各位读者可以对照原来的函数,看看是不是更清楚明了一些。

   开始的这一段基本上和原来的一样。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
extern int event_accept(int fd)
{
	struct listener *l = fdtab[fd].owner;
	struct session *s;
	struct task *t;
	int cfd;
	int max_accept = global.tune.maxaccept;
 
	while (actconn < global.maxconn && max_accept--) {
		struct sockaddr_storage addr;
		socklen_t laddr = sizeof(addr);
 
		if ((cfd = accept(fd, (struct sockaddr *)&addr, &laddr)) == -1) {
			return 0;
		}

   接收到新的连接之后,首先是检查连接数和fd是否超出范围。

1
2
3
4
5
6
7
8
		// step 1, check number of connection and client socket fd
		if (l->nbconn >= l->maxconn) {
			goto out_close;
		}
 
		if (cfd >= global.maxsock) {
			goto out_close;
		}

   然后,设置socket的属性,本来这一段比较靠后,为了避免混乱,我把它提前到accept()之后。由于重构了一把,部分变量的意义也发生了变化,不过应该不影响理解,以下不再赘述。

1
2
3
4
5
6
7
8
9
10
11
12
		// step 2, set the client socket's attribute
		if ((fcntl(cfd, F_SETFL, O_NONBLOCK) == -1) ||
		    (setsockopt(cfd, IPPROTO_TCP, TCP_NODELAY,
				(char *) &one, sizeof(one)) == -1)) {
			goto out_close;
		}
 
		if (proxy.options & PR_O_TCP_CLI_KA)
			setsockopt(cfd, SOL_SOCKET, SO_KEEPALIVE, (char *) &one, sizeof(one));
 
		if (proxy.options & PR_O_TCP_NOLING)
			setsockopt(cfd, SOL_SOCKET, SO_LINGER, (struct linger *) &nolinger, sizeof(struct linger));

   接着,从pool2_session内存池中分配一个新的session,加入全局sessions链表,并设置初始值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
		// step 3, allocate a new session and add it into session list
		if ((s = pool_alloc2(pool2_session)) == NULL) {
			disable_listener(l);
			proxy.state = PR_STIDLE;
			goto out_close;
		}
 
		LIST_ADDQ(&sessions, &s->list);
 
		s->flags        = 0;
		s->cli_addr     = addr;
		s->listener     = l;
		s->conn_retries = proxy.conn_retries;
		s->srv_error    = default_srv_error;
		s->srv          = NULL;
		s->prev_srv     = NULL;
		s->srv_conn     = NULL;
 
		s->logs.tv_accept = now;
		s->logs.t_queue   = -1;
		s->logs.t_connect = -1;
		s->logs.t_close   = 0;

   第四步,从pool2_task内存池中分配一个新的task,设置其处理函数(t->process),并链接到刚刚分配的那个session(t->context和s->task)。在原来的代码中,这一段和前面三段是混杂在一起,显得没什么章法。

1
2
3
4
5
6
7
8
9
10
11
12
13
		// step 4, allocate a new task, attach the handler(process_session) to the task
		// more over, attach the task to the newly allocated session
		if ((t = task_new()) == NULL) {
			disable_listener(l);
			proxy.state = PR_STIDLE;
			goto out_free_session;
		}
 
		t->process = l->handler;
		t->context = s;
		t->expire  = TICK_ETERNITY;
 
		s->task    = t;

   第五步,指派该session的客户端stream interface(s->si[0]),新分配的那个task也同时链接到该stream interface(s->si[0].owner)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
		// step 5, assign the session's client stream interface, newly allocated task also linked
		s->si[0].state      = SI_ST_EST;
		s->si[0].prev_state = SI_ST_EST;
		s->si[0].err_type   = SI_ET_NONE;
		s->si[0].err_loc    = NULL;
		s->si[0].owner      = t;
		s->si[0].update     = stream_sock_data_finish;
		s->si[0].shutr      = stream_sock_shutr;
		s->si[0].shutw      = stream_sock_shutw;
		s->si[0].chk_rcv    = stream_sock_chk_rcv;
		s->si[0].chk_snd    = stream_sock_chk_snd;
		s->si[0].connect    = NULL;
		s->si[0].iohandler  = NULL;
		s->si[0].fd         = cfd;
		s->si[0].flags      = SI_FL_NONE;
		s->si[0].exp        = TICK_ETERNITY;

   第六步,指派该session的服务端stream interface(s->si[1]),新分配的那个task也同时链接到该stream interface(s->s[1].owner)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
		// step 6, assign the session's server stream interface, newly allocated task also linked
		s->si[1].state      = SI_ST_INI;
		s->si[1].prev_state = SI_ST_INI;
		s->si[1].err_type   = SI_ET_NONE;
		s->si[1].err_loc    = NULL;
		s->si[1].owner      = t;
		s->si[1].update     = stream_sock_data_finish;
		s->si[1].shutr      = stream_sock_shutr;
		s->si[1].shutw      = stream_sock_shutw;
		s->si[1].chk_rcv    = stream_sock_chk_rcv;
		s->si[1].chk_snd    = stream_sock_chk_snd;
		s->si[1].connect    = tcpv4_connect_server;
		s->si[1].iohandler  = NULL;
		s->si[1].fd         = -1;
		s->si[1].flags      = SI_FL_NONE;
		s->si[1].exp        = TICK_ETERNITY;

   第七步,从pool2_buffer内存池中为该session分配一个请求的缓冲区,初始化,并链接到客户端stream interface的输入缓冲区(s->si[0].ib)和服务端stream interface的输出缓冲区(s->si[1].ob)。还有,设置该缓冲区的生产者(s->req->prod)是客户端stream interface,消费者(s->req->cons)是服务端stream interface。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
		// step 7, allocate request buffer,
		// link to client's input buffer and server's output buffer
		if ((s->req = pool_alloc2(pool2_buffer)) == NULL)
			goto out_fail_req;
 
		buffer_init(s->req);
		s->req->size = global.tune.bufsize;
		s->req->prod = &s->si[0];
		s->req->cons = &s->si[1];
		s->req->rto  = proxy.timeout.client;
		s->req->wto  = proxy.timeout.server;
		s->req->cto  = proxy.timeout.connect;
		s->req->rex  = TICK_ETERNITY;
		s->req->wex  = TICK_ETERNITY;
 
		s->si[0].ib  = s->req;
		s->si[1].ob  = s->req;
 
		s->req->flags |= BF_READ_ATTACHED | BF_AUTO_CONNECT | BF_AUTO_CLOSE;

   第八步,从pool2_buffer内存池中为该session分配一个响应的缓冲区,初始化,并链接到客户端stream interface的输出缓冲区(s->si[0].ob)和服务端stream interface的输入缓冲区(s->si[1].ib)。还有,设置该缓冲区的生产者(s->rep->prod)是服务端stream interface,消费者(s->rep->cons)是客户端stream interface。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
		// step 8, allocate response buffer,
		// link to client's output buffer and server's input buffer
		if ((s->rep = pool_alloc2(pool2_buffer)) == NULL)
			goto out_fail_rep; /* no memory */
 
		buffer_init(s->rep);
		s->rep->size = global.tune.bufsize;
		s->rep->prod = &s->si[1];
		s->rep->cons = &s->si[0];
		s->rep->rto  = proxy.timeout.server;
		s->rep->wto  = proxy.timeout.client;
		s->rep->cto  = TICK_ETERNITY;
		s->rep->rex  = TICK_ETERNITY;
		s->rep->wex  = TICK_ETERNITY;
 
		s->si[0].ob  = s->rep;
		s->si[1].ib  = s->rep;

   经过以上的处理,HAProxy在客户端和服务端之间初步建立了一条请求-响应的链路,如下图所示。从客户端到HAProxy的请求和从HAProxy到服务端的请求链接到同一段缓冲区,从服务端到HAProxy的响应和从HAProxy与客户端的响应也链接到同一段缓冲区,避免了内存拷贝。

   client_haproxy_server

   第九步,设置fdtab。

1
2
3
4
5
6
7
8
9
10
11
12
		// step 9, insert the client socket fd into fd table
		fd_insert(cfd);
		fdtab[cfd].owner        = &s->si[0];
		fdtab[cfd].state        = FD_STREADY;
		fdtab[cfd].cb[DIR_RD].f = l->proto->read;
		fdtab[cfd].cb[DIR_RD].b = s->req;
		fdtab[cfd].cb[DIR_WR].f = l->proto->write;
		fdtab[cfd].cb[DIR_WR].b = s->rep;
		fdinfo[cfd].peeraddr    = (struct sockaddr *)&s->cli_addr;
		fdinfo[cfd].peerlen     = sizeof(s->cli_addr);
 
		EV_FD_SET(cfd, DIR_RD);

   最后,唤醒该task。

1
2
		// step 10, call wakeup function
		task_wakeup(t, TASK_WOKEN_INIT);

   还有更改计数器等收尾工作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
		l->nbconn++;
		if (l->nbconn >= l->maxconn) {
			EV_FD_CLR(l->fd, DIR_RD);
			l->state = LI_FULL;
		}
 
		actconn++;
		totalconn++;
	}
	return 0;
 
 out_fail_rep:
	pool_free2(pool2_buffer, s->req);
 out_fail_req:
	task_free(t);
 out_free_session:
	LIST_DEL(&s->list);
	pool_free2(pool2_session, s);
 out_close:
	close(cfd);
	return 0;
}

   至此,本文算是从上到下把event_accept()函数的处理粗略地过了一遍,重点是理顺流程,忽略一些次要的地方,不过,遗留下内存池、任务调度、事件响应等机制,以及session、task、stream interface、buffer、fdtab等重要数据结构没有深入分析,这些都计划在以后的文章中完成。

建议继续学习:

  1. 使用HAProxy对MySQL进行负载均衡和状态监控    (阅读:5689)
  2. HAProxy几个重要的结构体    (阅读:1740)
QQ技术交流群:445447336,欢迎加入!
扫一扫订阅我的微信号:IT技术博客大学习
<< 前一篇:kmemcache源码浅析
后一篇:FUSE源码剖析 >>
© 2009 - 2025 by blogread.cn 微博:@IT技术博客大学习

京ICP备15002552号-1