另外要说明的是,本系列文章对HAProxy的分析都基于目前的稳定版本HAProxy 1.4,而目前的开发版本HAProxy 1.5和1.4相比,重构力度比较大,不少函数名称和结构体都发生变化,但是关键流程还是基本一致的,请各位读者留意。
1. session
1 2 3 4 5 6 7 8 9 10 11 12 13 | struct session { struct list list; /* position in global sessions list */ struct task *task; /* the task associated with this session */ int conn_retries; /* number of connect retries left */ int flags; /* some flags describing the session */ struct buffer *req; /* request buffer */ struct buffer *rep; /* response buffer */ struct stream_interface si[2]; /* client and server stream interfaces */ struct sockaddr_storage cli_addr; /* the client address */ struct sockaddr_in srv_addr; /* the address to connect to */ struct server *srv; /* the server the session will be running or has been running on */ struct server *prev_srv; /* the server the was running on, after a redispatch, otherwise NULL */ }; |
list也是一个结构体,包含n和p两个指向struct list类型的指针,详见mini-clist.h。它在这里的作用是把各个session结构体串起来,形成 一个双向链表,如下图所示。这是包括HAProxy和Linux内核广泛使用的一种数据结构。我认为用在这里倒不是很必要,不过仍然是一种值得学习的技巧。
2. task
1 2 3 4 5 6 7 8 | struct task { struct eb32_node wq; /* ebtree node used to hold the task in the wait queue */ struct eb32_node rq; /* ebtree node used to hold the task in the run queue */ int state; /* task state : bit field of TASK_* */ int expire; /* next expiration date for this task, in ticks */ struct task * (*process)(struct task *t); /* the function which processes the task */ void *context; /* the task's context */ }; |
1 2 3 4 | static inline int task_in_rq(struct task *t) { return t->rq.node.leaf_p != NULL; } |
1 2 3 4 | static inline int task_in_wq(struct task *t) { return t->wq.node.leaf_p != NULL; } |
1 2 3 4 5 | static inline struct task *__task_unlink_rq(struct task *t) { eb32_delete(&t->rq); return t; } |
1 2 3 4 5 6 7 | static inline struct task *__task_unlink_wq(struct task *t) { eb32_delete(&t->wq); if (last_timer == &t->wq) last_timer = NULL; return t; } |
1 2 3 4 5 6 7 8 | extern struct task *__task_wakeup(struct task *t) { t->rq.key = ++rqueue_ticks; // clear state flags at the same time t->state &= ~TASK_WOKEN_ANY; eb32_insert(&rqueue, &t->rq); return t; } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 | extern void __task_queue(struct task *task) { if (likely(task_in_wq(task))) __task_unlink_wq(task); /* the task is not in the queue now */ task->wq.key = task->expire; if (likely(last_timer && last_timer->node.bit < 0 && last_timer->key == task->wq.key && last_timer->node.node_p)) { eb_insert_dup(&last_timer->node, &task->wq.node); if (task->wq.node.bit < last_timer->node.bit) last_timer = &task->wq; return; } eb32_insert(&timers, &task->wq); /* Make sure we don't assign the last_timer to a node-less entry */ if (task->wq.node.node_p && (!last_timer || (task->wq.node.bit < last_timer->node.bit))) last_timer = &task->wq; return; } |
3. stream interface
stream interface结构体的主要成员如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | struct stream_interface { unsigned int state; /* SI_ST* */ unsigned int prev_state; /* SI_ST*, copy of previous state */ void *owner; /* generally a (struct task*) */ int fd; /* file descriptor for a stream driver when known */ unsigned int flags; unsigned int exp; /* wake up time for connect, queue, turn-around, ... */ void (*update)(struct stream_interface *); /* I/O update function */ void (*shutr)(struct stream_interface *); /* shutr function */ void (*shutw)(struct stream_interface *); /* shutw function */ void (*chk_rcv)(struct stream_interface *); /* chk_rcv function */ void (*chk_snd)(struct stream_interface *); /* chk_snd function */ int (*connect)(struct stream_interface *, struct proxy *, struct server *, struct sockaddr *, struct sockaddr *); /* connect function if any */ void (*iohandler)(struct stream_interface *); /* internal I/O handler when embedded */ struct buffer *ib, *ob; /* input and output buffers */ unsigned int err_type; /* first error detected, one of SI_ET_* */ void *err_loc; /* commonly the server, NULL when SI_ET_NONE */ void *private; /* may be used by any function above */ unsigned int st0, st1; /* may be used by any function above */ }; |
update指向的函数(stream_sock_data_finish)用于更新stream interface的fd的读写状态和相关标志位。
shutr和shutw指向的函数(stream_sock_shutr和stream_sock_shutw)分别用于关闭stream interface上的读和写。
当si是客户端stream interface时,connect为空,因为客户端连接显然已经建立。
当si是服务端stream interface时,connect指向HAProxy与服务端建立连接的那个函数(tcpv4_connect_server),而它会在backend.c的connect_server执行时被调用。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | struct task *process_session(struct task *t) { ... /* Call the second stream interface's I/O handler if it's embedded. * Note that this one may wake the task up again. */ if (s->req->cons->iohandler) { s->req->cons->iohandler(s->req->cons); if (task_in_rq(t)) { /* If we woke up, we don't want to requeue the * task to the wait queue, but rather requeue * it into the runqueue ASAP. */ t->expire = TICK_ETERNITY; return t; } } ... } |
4. buffer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | struct buffer { unsigned int flags; /* BF_* */ int rex; /* expiration date for a read, in ticks */ int wex; /* expiration date for a write or connect, in ticks */ int rto; /* read timeout, in ticks */ int wto; /* write timeout, in ticks */ int cto; /* connect timeout, in ticks */ unsigned int l; /* data length */ char *r, *w, *lr; /* read ptr, write ptr, last read */ unsigned int size; /* buffer size in bytes */ unsigned int send_max; /* number of bytes the sender can consume om this buffer, <= l */ unsigned int to_forward; /* number of bytes to forward after send_max without a wake-up */ unsigned int analysers; /* bit field indicating what to do on the buffer */ int analyse_exp; /* expiration date for current analysers (if set) */ void (*hijacker)(struct session *, struct buffer *); /* alternative content producer */ unsigned char xfer_large; /* number of consecutive large xfers */ unsigned char xfer_small; /* number of consecutive small xfers */ unsigned long long total; /* total data read */ struct stream_interface *prod; /* producer attached to this buffer */ struct stream_interface *cons; /* consumer attached to this buffer */ struct pipe *pipe; /* non-NULL only when data present */ char data[0]; /* <size> bytes */ }; |
可以看到,HAProxy的buffer结构体也由许多成员组成,常规buffer该有的一个都不少,例如size、data、l、r、w、lr等等,除此之外,还有具有HAProxy特有的生产者、消费者stream interface指针(prod和cons),还有众多表示读写过期时间、超时时间的成员,还有指示具体请求响应解析过程的标志位(analysers),还有回调函数hijacker(不过看起来没有使用),还有表示该buffer最多能向其消费者发送多少字节的send_max,还有在HTTP chunked格式转发时才有意义的to_forward,还有连续满负荷传输数据和连续低负荷传输数据的计数器(xfer_large和xfer_small),以触发判断是否使用splice系统调用。面对把这么多功能揽于一身的buffer,看你晕不晕!
5. pipe
1 2 3 4 5 6 | struct pipe { int data; /* number of bytes present in the pipe */ int prod; /* FD the producer must write to ; -1 if none */ int cons; /* FD the consumer must read from ; -1 if none */ struct pipe *next; }; |
因为splice系统调用要求输入和输出至少必须有一个描述符是管道符,所以,HAProxy准备了一对管道符(prod和cons)。当使用splice读数据时,HAProxy从源socket描述符(对于请求,就是连接客户端与HAProxy的socket描述符,对于响应,则是连接服务端与HAProxy的socket 描述符;下面的目的socket描述符刚好相反)splice到管道符prod;当使用splice写数据时,HAProxy从管道符cons splice到目的socket描述符。详见stream_socket.c的stream_sock_splice_in函数和stream_sock_write_loop函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | static int stream_sock_splice_in(struct buffer *b, struct stream_interface *si) { ... ret = splice(fd, NULL, b->pipe->prod, NULL, max, SPLICE_F_MOVE|SPLICE_F_NONBLOCK); ... } static int stream_sock_write_loop(struct stream_interface *si, struct buffer *b) { ... ret = splice(b->pipe->cons, NULL, si->fd, NULL, b->pipe->data, SPLICE_F_MOVE|SPLICE_F_NONBLOCK); ... } |
6. 小结
五个我认为比较重要的结构体就介绍到这里。当然,HAProxy还有许多结构体,例如proxy、server、listener等等,不过,这些结构体,要么比较容易看懂,要么网上已经有比较齐全的资料,要么可以陆续在后面的文章中单独说明。而session、task、stream interface、buffer和pipe这五个结构体,连同第一篇介绍的ebtree,向我们展现了HAProxy作为一个高性能代理服务器的底层数据组织和一些重要的处理细节。
- 使用HAProxy对MySQL进行负载均衡和状态监控 (阅读:5718)
- C语言结构体里的成员数组和指针 (阅读:5082)
- 结构体初始化的方法 (阅读:2371)
- c、cpp中使用匿名结构体、类定义数组 (阅读:2212)
- mysql innodb 文件相关的三个重要结构体 (阅读:2166)
- 根据成员地址获取结构体变量 (阅读:1750)
- HAProxy的event_accept函数源码分析 (阅读:1723)
- 作者:若羽 来源: UC技术博客
- 标签: HAProxy 结构体
- 发布时间:2013-07-29 23:14:57
[67] Java开发岗位面试题归类汇总
[64] Go Reflect 性能
[63] IOS安全–浅谈关于IOS加固的几种方法
[62] 【社会化设计】自我(self)部分――欢迎区
[61] android 开发入门
[61] 如何拿下简短的域名
[57] Oracle MTS模式下 进程地址与会话信
[49] 图书馆的世界纪录
[49] WEB系统需要关注的一些点
[44] Twitter/微博客的学习摘要