Memcache源代码分析之网络处理
memcache是什么说略过,大家都懂。这篇文章主要分析memcache的网络处理,memcache的网络处理是基于libevent的库写的,下面依据源代码来一步一步讲解。
1、初始化event、链接池、内存块等
[c]
/* initialize main thread libevent instance */
main_base = event_init(); //memcache.c line:4583
/* initialize other stuff */
stats_init();
assoc_init();
conn_init();
slabs_init(settings.maxbytes, settings.factor, preallocate);
[/c]
2、 初始化线程,调用函数thread_init(settings.num_threads, main_base),内部实现如下:
a)创建num_threads个线程处理对像(包括读写管道、客户端队列等,thread.c line:608
[c]
for (i = 0; i < nthreads; i++) {
int fds[2];
if (pipe(fds)) {//创建读写管道
perror(“Can’t create notify pipe”);
exit(1);
}
threads[i].notify_receive_fd = fds[0];
threads[i].notify_send_fd = fds[1];
//为读取管道绑定事件(当有可读数据时,以事件方式通知thread_libevent_process函数处理),并且初始化客户端队列
setup_thread(&threads[i]);
}
[/c]
b)创建线程,thread.c line:622
[c]
for (i = 0; i < nthreads; i++) {
create_worker(worker_libevent, &threads[i]);//对创建成功的线程进行计数,同时开始进入事件循环
}
[/c]
3、创建套接字,memcache.c line:4643
[c]
if (settings.port && server_socket(settings.port, tcp_transport,
portnumber_file)) {
vperror(“failed to listen on TCP port %d”, settings.port);
exit(EX_OSERR);
}
[/c]
4、在server_socket调用conn_new创建一个链接对像,内部实现如下:
a)更新状态为conn_listening,memcache.c line:400
[c]
c->state = init_state;
[/c]
b)绑定事件,当套接字有可读数据时以事件方式通知event_handler处理(memcache.c line:420)
[c]
event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
event_base_set(base, &c->event);
[/c]
c)进入主事件循环,memcache.c line:4675
[c]
event_base_loop(main_base, 0);
[/c]
5、当有用户请求时即进行event_handler的conn_listening分支,接受请求后调用dispatch_conn_new函数(memcache.c line:3450),内部实现过程如下:
a)从链接队列中取出一个空闲的对象,(thread.c line:496)
[c]
CQ_ITEM *item = cqi_new();
[/c]
b)以轮询的方式访问线程一个线程,(thread.c line:297)
[c]
iint tid = (last_thread + 1) % settings.num_threads;
LIBEVENT_THREAD *thread = threads + tid;
last_thread = tid;
[/c]
c)将状态置为conn_new_cmd,(thread.c line:304)
[c]
item->init_state = init_state
[/c]
d)然后将这个对象加入到线程的客户端链表中(thread.c line:309)
[c]
cq_push(thread->new_conn_queue, item);
[/c]
e)然后通知线程处理事件绑定等工作(原理是向线程的写入管道中发送一个字节,告诉线程有客户端来请求了),(thread.c line:312)
[c]
if (write(thread->notify_send_fd, “”, 1) != 1) {
perror(“Writing to thread notify pipe”);
}
[/c]
6、当某一线程的管道有可读数据时,调用thread_libevent_process进行处理,内部实现过程如下:
a)读取一个字节,即取出一个客户端列表,(thread.c line:259)
[c]
if (read(fd, buf, 1) != 1)
if (settings.verbose > 0)
fprintf(stderr, “Can’t read from libevent pipe\\n”);
item = cq_pop(me->new_conn_queue);//从客户端队列中取出一个对象
[/c]
b)调用conn_new创建一个链接对像,并将初始状态设为conn_new_cmd,同时为套接定绑定事件(thread.c line:266)
[c]
conn *c = conn_new(item->sfd, item->init_state, item->event_flags,
item->read_buffer_size, item->transport, me->base);
[/c]
c)在主线程循环中进入conn_new_cmd分支,(memcache.c line:3494);
[c]
case conn_new_cmd:
/* Only process nreqs at a time to avoid starving other
connections */
-nreqs;
if (nreqs >= 0) {
reset_cmd_handler(c);
[/c]
d)判断该套接字是否还有未处理数据,有未处理数据就更新状态为conn_parse_cmd,没有就更新为conn_waiting,(memcache.c line:2075)
[c]
if (c->rbytes > 0) {
conn_set_state(c, conn_parse_cmd);
} else {
conn_set_state(c, conn_waiting);
}
[/c]
e)当有未处理数据时进入conn_parse_cmd分支,尝试去处理(具体请看第7大步),如果内容不满足一条完整的数据就更新为conn_waiting(memcache.c line:3486)
[c]
case conn_parse_cmd :
if (try_read_command(c) == 0) {
/* wee need more data! */
conn_set_state(c, conn_waiting);
}
[/c]
f)当套接字有数据时,更新状态为conn_read,(memcache.c line:3461)
[c]
case conn_waiting:
if (!update_event(c, EV_READ | EV_PERSIST)) {
if (settings.verbose > 0)
fprintf(stderr, “Couldn’t update event\\n”);
conn_set_state(c, conn_closing);
break;
}
conn_set_state(c, conn_read);
[/c]
g)进入conn_read分支以udp或者tcp的方式开始读取数据,当读取数据成功后
更新状态为conn_parse_cmd(即下一次循环跳到第e步开始执行)(memcache.c line:3467)
[c]
case conn_read:
res = IS_UDP(c->transport) ? try_read_udp(c) : try_read_network(c);
switch (res) {
case READ_NO_DATA_RECEIVED:
conn_set_state(c, conn_waiting);
break;
case READ_DATA_RECEIVED:
conn_set_state(c, conn_parse_cmd);
break;
case READ_ERROR:
conn_set_state(c, conn_closing);
break;
case READ_MEMORY_ERROR: /* Failed to allocate more memory */
/* State already set by try_read_network */
break;
}
break;
[/c]
h)在try_read_network内部,首先是将没有处理掉的数据前置(memcache.c line:3247),然后循环读取客户端发来的所有数据(memcache.c line:3253),这里要详细讲一下客户端数据的存储,对于每个请求的客户端,都会分配数据缓冲区(rbuf)、当前数据处理偏移项(rcurr)、缓冲区长度(rsize)、已占用字节数(rbytes),当有消息时,先看看缓存区能不能存放下,如果不能,那么将数据缓冲区将以2倍的长度开始增长,然后将接受的数据存下;
7、处理消息
a)当有消息时会调用try_read_command来解析命令行,一般情况下是使用ascii传输数据,那么在函数内部会直接跳到memcache.c的3145行,然后判断数据包的完整性,在数据包完整的情况下,会直接调用process_command函数(memcache.c line:3181)
[c]
process_command(c, c->rcurr);
[/c]
b)在process_command内部先是调用tokenize_command函数来计算参数总数量,参数值长度,参数值(memcache.c line:2967)
[c]
ntokens = tokenize_command(command, tokens, MAX_TOKENS);
[/c]
c)根据参数总数量,参数值长度和参数值来解析命令,然后将数据更新到HashTable,包括:get,set,update等(memcache.c line:2968)
[c]
if (ntokens >= 3 &&
((strcmp(tokens[COMMAND_TOKEN].value, “get”) == 0) ||
(strcmp(tokens[COMMAND_TOKEN].value, “bget”) == 0))) {
process_get_command(c, tokens, ntokens, false);
[/c]
整个关于tcp处理这块分析完毕,比较简单,有错误之处请回复告之。
建议继续学习:
- 关于memcache分布式一致性hash (阅读:10694)
- 看源代码那些事 (阅读:9455)
- 警惕 Chrome 的查看源代码 (View Page Source) 功能 (阅读:5843)
- Memcache分布式部署方案 (阅读:5448)
- 我自己研究开源项目源代码的两个重要习惯 (阅读:4861)
- 关于session和memcache的若干问题 (阅读:4283)
- Memcache源代码分析之数据存储 (阅读:3907)
- Memcache mutex设计模式 (阅读:3760)
- 关于Memcache长连接自动重连的问题 (阅读:3672)
- Memcache协议的学习 (阅读:3620)
扫一扫订阅我的微信号:IT技术博客大学习
- 作者:IM鑫爷 来源: IM鑫爷
- 标签: Memcache 源代码
- 发布时间:2011-09-19 23:59:19
- [54] IOS安全–浅谈关于IOS加固的几种方法
- [52] android 开发入门
- [52] 如何拿下简短的域名
- [51] 图书馆的世界纪录
- [49] Oracle MTS模式下 进程地址与会话信
- [49] Go Reflect 性能
- [47] 【社会化设计】自我(self)部分――欢迎区
- [46] 读书笔记-壹百度:百度十年千倍的29条法则
- [36] 程序员技术练级攻略
- [29] 视觉调整-设计师 vs. 逻辑