在main()函数里面,Memcached为主线程调用event_init()创建了一个libevent base对象,随后调用thread_init()来初始化线程,我们来看下这个函数的实现。
void thread_init(int nthreads, struct event_base *main_base) { int i; pthread_mutex_init(&cache_lock, NULL); pthread_mutex_init(&stats_lock, NULL); pthread_mutex_init(&init_lock, NULL); pthread_cond_init(&init_cond, NULL); pthread_mutex_init(&cqi_freelist_lock, NULL); cqi_freelist = NULL; threads = calloc(nthreads, sizeof(LIBEVENT_THREAD)); if (! threads) { perror("Can\'t allocate thread descriptors"); exit(1); } dispatcher_thread.base = main_base; dispatcher_thread.thread_id = pthread_self(); for (i = 0; i < nthreads; i++) { int fds[2]; if (pipe(fds)) { perror("Can\'t create notify pipe"); exit(1); } threads[i].notify_receive_fd = fds[0]; threads[i].notify_send_fd = fds[1]; setup_thread(&threads[i]); } /* Create threads after we\'ve done all the libevent setup. */ for (i = 0; i < nthreads; i++) { create_worker(worker_libevent, &threads[i]); } /* Wait for all the threads to set themselves up before returning. */ pthread_mutex_lock(&init_lock); while (init_count < nthreads) { pthread_cond_wait(&init_cond, &init_lock); } pthread_mutex_unlock(&init_lock); }
static void setup_thread(LIBEVENT_THREAD *me) { me->base = event_init(); if (! me->base) { fprintf(stderr, "Can\'t allocate event base\\n"); exit(1); } /* Listen for notifications from other threads */ event_set(&me->notify_event, me->notify_receive_fd, EV_READ | EV_PERSIST, thread_libevent_process, me); event_base_set(me->base, &me->notify_event); if (event_add(&me->notify_event, 0) == -1) { fprintf(stderr, "Can\'t monitor libevent notify pipe\\n"); exit(1); } me->new_conn_queue = malloc(sizeof(struct conn_queue)); if (me->new_conn_queue == NULL) { perror("Failed to allocate memory for connection queue"); exit(EXIT_FAILURE); } cq_init(me->new_conn_queue); }
首先调用event_init()为该线程初始化一个事件池,我们知道libevent的事件池(event base)不是线程安全的,所以每个线程需要有自己的事件池,所有的IO事件都由这个事件池来处理,在这个函数里面,首先将notify_receive_fd添加到事件监听循环中,设置回调函数thread_libevent_process(),当主线程向该worker线程通过pipe IPC发送消息时,便会触发该worker线程执行thread_libevent_process()函数。随后setup_thread()函数创建connection队列并将其初始化。
static void thread_libevent_process(int fd, short which, void *arg) { LIBEVENT_THREAD *me = arg; CQ_ITEM *item; char buf[1]; if (read(fd, buf, 1) != 1) if (settings.verbose > 0) fprintf(stderr, "Can\'t read from libevent pipe\\n"); item = cq_pop(me->new_conn_queue); if (NULL != item) { conn *c = conn_new(item->sfd, item->init_state, item->event_flags, item->read_buffer_size, item->transport, me->base); if (c == NULL) { close(item->sfd); } else { c->thread = me; } cqi_free(item); } }
const char* const statenames[] = { "conn_listening", "conn_new_cmd", "conn_waiting", "conn_read", "conn_parse_cmd", "conn_write", "conn_nread", "conn_swallow", "conn_closing", "conn_mwrite" };
关于状态机是怎么运行的就不讨论了,代码篇幅过长,涉及到的细节太多,不展开说了,要说的一点是状态机在哪里进入的conn_listening状态,这是某个socketfd的起始状态,socket需要先监听然后accept,再之后才可以进行read,write等操作,listen必然会在程序初始化的过程中调用,我们可以看到在main()函数里面线程初始化结束以后,便开始对socket描述符进行初始化,获取系统存在几个可用的地址信息,然后创建socket描述符,再然后bind(),再之后调用listen(),这些操作都是在server_socket()函数中执行的,该函数初始化监听socket,并针对服务器做一些socket options的设置,最后最关键的一步是针对该socket调用了conn_new(),但事件池是main_base,也就是说accept()事件是由主线程接收到的,accept()是发生在主线程内,这样也就避免了多线程accept()的惊群,accept()函数的执行也是在状态机循环中执行的,drive_machine()函数中,accept()完成后,主线程便调用dispatch_conn_new()对该描述符进行调度,状态机的代码太长,不列出来了,看一看dispatch_conn_machine()的代码吧。
void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags, int read_buffer_size, enum network_transport transport) { CQ_ITEM *item = cqi_new(); int tid = (last_thread + 1) % settings.num_threads; LIBEVENT_THREAD *thread = threads + tid; last_thread = tid; item->sfd = sfd; item->init_state = init_state; item->event_flags = event_flags; item->read_buffer_size = read_buffer_size; item->transport = transport; cq_push(thread->new_conn_queue, item); MEMCACHED_CONN_DISPATCH(sfd, thread->thread_id); if (write(thread->notify_send_fd, "", 1) != 1) { perror("Writing to thread notify pipe"); } }
- 分布式缓存系统 Memcached 入门 (阅读:14827)
- 30分钟3300%性能提升――python+memcached网页优化小记 (阅读:12275)
- Cacti 添加 Memcached 监控 (阅读:8238)
- Redis和Memcached的区别 (阅读:6964)
- 启用memcached压缩注意事项 (阅读:4169)
- Memcached内存管理机制浅析 (阅读:4106)
- memcached 源码阅读笔记 (阅读:3943)
- Memcached and MySQL (阅读:3354)
- Memcached二三事儿 (阅读:3282)
- 详细步骤:在64位Linux上安装Memcached (阅读:3171)
- 作者:levin 来源: basic coder
- 标签: Memcached 状态机 线程模型
- 发布时间:2012-03-31 23:49:47
- [51] WEB系统需要关注的一些点
- [48] Oracle MTS模式下 进程地址与会话信
- [48] Go Reflect 性能
- [46] IOS安全–浅谈关于IOS加固的几种方法
- [45] Twitter/微博客的学习摘要
- [45] android 开发入门
- [45] find命令的一点注意事项
- [44] 【社会化设计】自我(self)部分――欢迎区
- [44] 图书馆的世界纪录
- [43] 关于恐惧的自白