技术头条 - 一个快速在微博传播文章的方式     搜索本站
您现在的位置首页 --> 源码分析 --> Memcache源代码分析之网络处理

Memcache源代码分析之网络处理

浏览:3687次  出处信息

    memcache是什么说略过,大家都懂。这篇文章主要分析memcache的网络处理,memcache的网络处理是基于libevent的库写的,下面依据源代码来一步一步讲解。

    1、初始化event、链接池、内存块等

     [c]

     /* initialize main thread libevent instance */

     main_base = event_init(); //memcache.c line:4583

    /* initialize other stuff */

     stats_init();

     assoc_init();

     conn_init();

     slabs_init(settings.maxbytes, settings.factor, preallocate);

     [/c]

     2、 初始化线程,调用函数thread_init(settings.num_threads, main_base),内部实现如下:

    a)创建num_threads个线程处理对像(包括读写管道、客户端队列等,thread.c line:608

     [c]

    for (i = 0; i < nthreads; i++) {

     int fds[2];

     if (pipe(fds)) {//创建读写管道

     perror(“Can’t create notify pipe”);

     exit(1);

     }

    threads[i].notify_receive_fd = fds[0];

     threads[i].notify_send_fd = fds[1];

    //为读取管道绑定事件(当有可读数据时,以事件方式通知thread_libevent_process函数处理),并且初始化客户端队列

     setup_thread(&threads[i]);

     }

     [/c]

     b)创建线程,thread.c line:622

     [c]

     for (i = 0; i < nthreads; i++) {

     create_worker(worker_libevent, &threads[i]);//对创建成功的线程进行计数,同时开始进入事件循环

     }

     [/c]

     3、创建套接字,memcache.c line:4643

     [c]

     if (settings.port && server_socket(settings.port, tcp_transport,

     portnumber_file)) {

     vperror(“failed to listen on TCP port %d”, settings.port);

     exit(EX_OSERR);

     }

     [/c]

     4、在server_socket调用conn_new创建一个链接对像,内部实现如下:

    a)更新状态为conn_listening,memcache.c line:400

     [c]

     c->state = init_state;

     [/c]

    b)绑定事件,当套接字有可读数据时以事件方式通知event_handler处理(memcache.c line:420)

     [c]

     event_set(&c->event, sfd, event_flags, event_handler, (void *)c);

     event_base_set(base, &c->event);

     [/c]

     c)进入主事件循环,memcache.c line:4675

     [c]

     event_base_loop(main_base, 0);

     [/c]

     5、当有用户请求时即进行event_handler的conn_listening分支,接受请求后调用dispatch_conn_new函数(memcache.c line:3450),内部实现过程如下:

    a)从链接队列中取出一个空闲的对象,(thread.c line:496)

     [c]

     CQ_ITEM *item = cqi_new();

     [/c]

    b)以轮询的方式访问线程一个线程,(thread.c line:297)

     [c]

     iint tid = (last_thread + 1) % settings.num_threads;

     LIBEVENT_THREAD *thread = threads + tid;

     last_thread = tid;

     [/c]

     c)将状态置为conn_new_cmd,(thread.c line:304)

     [c]

     item->init_state = init_state

     [/c]

     d)然后将这个对象加入到线程的客户端链表中(thread.c line:309)

     [c]

     cq_push(thread->new_conn_queue, item);

     [/c]

     e)然后通知线程处理事件绑定等工作(原理是向线程的写入管道中发送一个字节,告诉线程有客户端来请求了),(thread.c line:312)

     [c]

     if (write(thread->notify_send_fd, “”, 1) != 1) {

     perror(“Writing to thread notify pipe”);

     }

     [/c]

     6、当某一线程的管道有可读数据时,调用thread_libevent_process进行处理,内部实现过程如下:

    a)读取一个字节,即取出一个客户端列表,(thread.c line:259)

     [c]

     if (read(fd, buf, 1) != 1)

     if (settings.verbose > 0)

     fprintf(stderr, “Can’t read from libevent pipe\\n”);

    item = cq_pop(me->new_conn_queue);//从客户端队列中取出一个对象

     [/c]

     b)调用conn_new创建一个链接对像,并将初始状态设为conn_new_cmd,同时为套接定绑定事件(thread.c line:266)

     [c]

     conn *c = conn_new(item->sfd, item->init_state, item->event_flags,

     item->read_buffer_size, item->transport, me->base);

     [/c]

     c)在主线程循环中进入conn_new_cmd分支,(memcache.c line:3494);

     [c]

     case conn_new_cmd:

     /* Only process nreqs at a time to avoid starving other

     connections */

    -nreqs;

     if (nreqs >= 0) {

     reset_cmd_handler(c);

     [/c]

     d)判断该套接字是否还有未处理数据,有未处理数据就更新状态为conn_parse_cmd,没有就更新为conn_waiting,(memcache.c line:2075)

     [c]

     if (c->rbytes > 0) {

     conn_set_state(c, conn_parse_cmd);

     } else {

     conn_set_state(c, conn_waiting);

     }

     [/c]

     e)当有未处理数据时进入conn_parse_cmd分支,尝试去处理(具体请看第7大步),如果内容不满足一条完整的数据就更新为conn_waiting(memcache.c line:3486)

     [c]

     case conn_parse_cmd :

     if (try_read_command(c) == 0) {

     /* wee need more data! */

     conn_set_state(c, conn_waiting);

     }

     [/c]

     f)当套接字有数据时,更新状态为conn_read,(memcache.c line:3461)

     [c]

     case conn_waiting:

     if (!update_event(c, EV_READ | EV_PERSIST)) {

     if (settings.verbose > 0)

     fprintf(stderr, “Couldn’t update event\\n”);

     conn_set_state(c, conn_closing);

     break;

     }

    conn_set_state(c, conn_read);

     [/c]

     g)进入conn_read分支以udp或者tcp的方式开始读取数据,当读取数据成功后

     更新状态为conn_parse_cmd(即下一次循环跳到第e步开始执行)(memcache.c line:3467)

     [c]

     case conn_read:

     res = IS_UDP(c->transport) ? try_read_udp(c) : try_read_network(c);

    switch (res) {

     case READ_NO_DATA_RECEIVED:

     conn_set_state(c, conn_waiting);

     break;

     case READ_DATA_RECEIVED:

     conn_set_state(c, conn_parse_cmd);

     break;

     case READ_ERROR:

     conn_set_state(c, conn_closing);

     break;

     case READ_MEMORY_ERROR: /* Failed to allocate more memory */

     /* State already set by try_read_network */

     break;

     }

     break;

     [/c]

    h)在try_read_network内部,首先是将没有处理掉的数据前置(memcache.c line:3247),然后循环读取客户端发来的所有数据(memcache.c line:3253),这里要详细讲一下客户端数据的存储,对于每个请求的客户端,都会分配数据缓冲区(rbuf)、当前数据处理偏移项(rcurr)、缓冲区长度(rsize)、已占用字节数(rbytes),当有消息时,先看看缓存区能不能存放下,如果不能,那么将数据缓冲区将以2倍的长度开始增长,然后将接受的数据存下;

    7、处理消息

     a)当有消息时会调用try_read_command来解析命令行,一般情况下是使用ascii传输数据,那么在函数内部会直接跳到memcache.c的3145行,然后判断数据包的完整性,在数据包完整的情况下,会直接调用process_command函数(memcache.c line:3181)

    [c]

     process_command(c, c->rcurr);

     [/c]

    b)在process_command内部先是调用tokenize_command函数来计算参数总数量,参数值长度,参数值(memcache.c line:2967)

     [c]

     ntokens = tokenize_command(command, tokens, MAX_TOKENS);

     [/c]

    c)根据参数总数量,参数值长度和参数值来解析命令,然后将数据更新到HashTable,包括:get,set,update等(memcache.c line:2968)

     [c]

     if (ntokens >= 3 &&

     ((strcmp(tokens[COMMAND_TOKEN].value, “get”) == 0) ||

     (strcmp(tokens[COMMAND_TOKEN].value, “bget”) == 0))) {

    process_get_command(c, tokens, ntokens, false);

     [/c]

    整个关于tcp处理这块分析完毕,比较简单,有错误之处请回复告之。

建议继续学习:

  1. 关于memcache分布式一致性hash    (阅读:10730)
  2. 看源代码那些事    (阅读:9483)
  3. 警惕 Chrome 的查看源代码 (View Page Source) 功能    (阅读:6005)
  4. Memcache分布式部署方案    (阅读:5547)
  5. 我自己研究开源项目源代码的两个重要习惯    (阅读:4945)
  6. 关于session和memcache的若干问题    (阅读:4316)
  7. Memcache源代码分析之数据存储    (阅读:3982)
  8. Memcache mutex设计模式    (阅读:3788)
  9. 关于Memcache长连接自动重连的问题    (阅读:3694)
  10. Memcache协议的学习    (阅读:3639)
QQ技术交流群:445447336,欢迎加入!
扫一扫订阅我的微信号:IT技术博客大学习
© 2009 - 2025 by blogread.cn 微博:@IT技术博客大学习

京ICP备15002552号-1