IT技术博客大学习 共学习 共进步

Memcache源代码分析之网络处理

IM鑫爷 2011-09-19 23:59:19 浏览 4,545 次

    memcache是什么说略过,大家都懂。这篇文章主要分析memcache的网络处理,memcache的网络处理是基于libevent的库写的,下面依据源代码来一步一步讲解。

    1、初始化event、链接池、内存块等

     [c]

     /* initialize main thread libevent instance */

     main_base = event_init(); //memcache.c line:4583

    /* initialize other stuff */

     stats_init();

     assoc_init();

     conn_init();

     slabs_init(settings.maxbytes, settings.factor, preallocate);

     [/c]

     2、 初始化线程,调用函数thread_init(settings.num_threads, main_base),内部实现如下:

    a)创建num_threads个线程处理对像(包括读写管道、客户端队列等,thread.c line:608

     [c]

    for (i = 0; i < nthreads; i++) {

     int fds[2];

     if (pipe(fds)) {//创建读写管道

     perror(“Can’t create notify pipe”);

     exit(1);

     }

    threads[i].notify_receive_fd = fds[0];

     threads[i].notify_send_fd = fds[1];

    //为读取管道绑定事件(当有可读数据时,以事件方式通知thread_libevent_process函数处理),并且初始化客户端队列

     setup_thread(&threads[i]);

     }

     [/c]

     b)创建线程,thread.c line:622

     [c]

     for (i = 0; i < nthreads; i++) {

     create_worker(worker_libevent, &threads[i]);//对创建成功的线程进行计数,同时开始进入事件循环

     }

     [/c]

     3、创建套接字,memcache.c line:4643

     [c]

     if (settings.port && server_socket(settings.port, tcp_transport,

     portnumber_file)) {

     vperror(“failed to listen on TCP port %d”, settings.port);

     exit(EX_OSERR);

     }

     [/c]

     4、在server_socket调用conn_new创建一个链接对像,内部实现如下:

    a)更新状态为conn_listening,memcache.c line:400

     [c]

     c->state = init_state;

     [/c]

    b)绑定事件,当套接字有可读数据时以事件方式通知event_handler处理(memcache.c line:420)

     [c]

     event_set(&c->event, sfd, event_flags, event_handler, (void *)c);

     event_base_set(base, &c->event);

     [/c]

     c)进入主事件循环,memcache.c line:4675

     [c]

     event_base_loop(main_base, 0);

     [/c]

     5、当有用户请求时即进行event_handler的conn_listening分支,接受请求后调用dispatch_conn_new函数(memcache.c line:3450),内部实现过程如下:

    a)从链接队列中取出一个空闲的对象,(thread.c line:496)

     [c]

     CQ_ITEM *item = cqi_new();

     [/c]

    b)以轮询的方式访问线程一个线程,(thread.c line:297)

     [c]

     iint tid = (last_thread + 1) % settings.num_threads;

     LIBEVENT_THREAD *thread = threads + tid;

     last_thread = tid;

     [/c]

     c)将状态置为conn_new_cmd,(thread.c line:304)

     [c]

     item->init_state = init_state

     [/c]

     d)然后将这个对象加入到线程的客户端链表中(thread.c line:309)

     [c]

     cq_push(thread->new_conn_queue, item);

     [/c]

     e)然后通知线程处理事件绑定等工作(原理是向线程的写入管道中发送一个字节,告诉线程有客户端来请求了),(thread.c line:312)

     [c]

     if (write(thread->notify_send_fd, “”, 1) != 1) {

     perror(“Writing to thread notify pipe”);

     }

     [/c]

     6、当某一线程的管道有可读数据时,调用thread_libevent_process进行处理,内部实现过程如下:

    a)读取一个字节,即取出一个客户端列表,(thread.c line:259)

     [c]

     if (read(fd, buf, 1) != 1)

     if (settings.verbose > 0)

     fprintf(stderr, “Can’t read from libevent pipe\\n”);

    item = cq_pop(me->new_conn_queue);//从客户端队列中取出一个对象

     [/c]

     b)调用conn_new创建一个链接对像,并将初始状态设为conn_new_cmd,同时为套接定绑定事件(thread.c line:266)

     [c]

     conn *c = conn_new(item->sfd, item->init_state, item->event_flags,

     item->read_buffer_size, item->transport, me->base);

     [/c]

     c)在主线程循环中进入conn_new_cmd分支,(memcache.c line:3494);

     [c]

     case conn_new_cmd:

     /* Only process nreqs at a time to avoid starving other

     connections */

    -nreqs;

     if (nreqs >= 0) {

     reset_cmd_handler(c);

     [/c]

     d)判断该套接字是否还有未处理数据,有未处理数据就更新状态为conn_parse_cmd,没有就更新为conn_waiting,(memcache.c line:2075)

     [c]

     if (c->rbytes > 0) {

     conn_set_state(c, conn_parse_cmd);

     } else {

     conn_set_state(c, conn_waiting);

     }

     [/c]

     e)当有未处理数据时进入conn_parse_cmd分支,尝试去处理(具体请看第7大步),如果内容不满足一条完整的数据就更新为conn_waiting(memcache.c line:3486)

     [c]

     case conn_parse_cmd :

     if (try_read_command(c) == 0) {

     /* wee need more data! */

     conn_set_state(c, conn_waiting);

     }

     [/c]

     f)当套接字有数据时,更新状态为conn_read,(memcache.c line:3461)

     [c]

     case conn_waiting:

     if (!update_event(c, EV_READ | EV_PERSIST)) {

     if (settings.verbose > 0)

     fprintf(stderr, “Couldn’t update event\\n”);

     conn_set_state(c, conn_closing);

     break;

     }

    conn_set_state(c, conn_read);

     [/c]

     g)进入conn_read分支以udp或者tcp的方式开始读取数据,当读取数据成功后

     更新状态为conn_parse_cmd(即下一次循环跳到第e步开始执行)(memcache.c line:3467)

     [c]

     case conn_read:

     res = IS_UDP(c->transport) ? try_read_udp(c) : try_read_network(c);

    switch (res) {

     case READ_NO_DATA_RECEIVED:

     conn_set_state(c, conn_waiting);

     break;

     case READ_DATA_RECEIVED:

     conn_set_state(c, conn_parse_cmd);

     break;

     case READ_ERROR:

     conn_set_state(c, conn_closing);

     break;

     case READ_MEMORY_ERROR: /* Failed to allocate more memory */

     /* State already set by try_read_network */

     break;

     }

     break;

     [/c]

    h)在try_read_network内部,首先是将没有处理掉的数据前置(memcache.c line:3247),然后循环读取客户端发来的所有数据(memcache.c line:3253),这里要详细讲一下客户端数据的存储,对于每个请求的客户端,都会分配数据缓冲区(rbuf)、当前数据处理偏移项(rcurr)、缓冲区长度(rsize)、已占用字节数(rbytes),当有消息时,先看看缓存区能不能存放下,如果不能,那么将数据缓冲区将以2倍的长度开始增长,然后将接受的数据存下;

    7、处理消息

     a)当有消息时会调用try_read_command来解析命令行,一般情况下是使用ascii传输数据,那么在函数内部会直接跳到memcache.c的3145行,然后判断数据包的完整性,在数据包完整的情况下,会直接调用process_command函数(memcache.c line:3181)

    [c]

     process_command(c, c->rcurr);

     [/c]

    b)在process_command内部先是调用tokenize_command函数来计算参数总数量,参数值长度,参数值(memcache.c line:2967)

     [c]

     ntokens = tokenize_command(command, tokens, MAX_TOKENS);

     [/c]

    c)根据参数总数量,参数值长度和参数值来解析命令,然后将数据更新到HashTable,包括:get,set,update等(memcache.c line:2968)

     [c]

     if (ntokens >= 3 &&

     ((strcmp(tokens[COMMAND_TOKEN].value, “get”) == 0) ||

     (strcmp(tokens[COMMAND_TOKEN].value, “bget”) == 0))) {

    process_get_command(c, tokens, ntokens, false);

     [/c]

    整个关于tcp处理这块分析完毕,比较简单,有错误之处请回复告之。

建议继续学习

  1. 关于memcache分布式一致性hash (阅读 11,661)
  2. 看源代码那些事 (阅读 10,462)
  3. 警惕 Chrome 的查看源代码 (View Page Source) 功能 (阅读 7,046)
  4. Memcache分布式部署方案 (阅读 6,668)
  5. 我自己研究开源项目源代码的两个重要习惯 (阅读 5,842)
  6. 关于session和memcache的若干问题 (阅读 5,186)
  7. Memcache mutex设计模式 (阅读 4,905)
  8. Memcache源代码分析之数据存储 (阅读 4,863)
  9. 解决memcache连接奇慢问题一例 (阅读 4,747)
  10. Memcache协议的学习 (阅读 4,705)