IT技术博客大学习 共学习 共进步
全部 移动开发 后端 数据库 AI 算法 安全 DevOps 前端 设计 开发者

kmemcache源码浅析

godorz... 2013-07-08 22:53:02 累计浏览 3,054 次
本机暂存

简介

   kmemcache是memcache的linux内核移植版, 这两天断断续续的看了其网络方面的实现.

   简单来说, kmemcache不落窠臼, 摈弃了epoll通知机制. 它借助skb的回调函数, 实现packet级别的调度. 在网路模型上, kmemcache分为一个dispatcher和多个workers(均为workqueue线程). dispatcher服务于TCP和unix domain sockets, 它将新建的连接丢给某个worker. 除此之外, workers还处理UDP请求.

   下面详细分析源码.

mc_connector

   kmemcache分为umemcached和kmemcache.ko两部分. umemcached为用户态daemon, 主要作用是解析启动参数, 将启动的settings信息传给kmemcache.ko. kmemcache.ko是内核模块,完成除解析启动参数之外的其他所有功能.

   umemcached解析启动参数的代码非常简单, 不必多说.

   这里简单分析下umemcached和kmemcache.ko通过netlink机制实现数据交互的代码(mc_connector.[hc]).

初始化阶段

   kmemcache.ko模块在初始化时创建协议号为NETLINK_MEMCACHE的netlink socket, 注册其回调函数为mc_nl_data().

   umemcached创建相同协议的netlink socket.

数据交互阶段

   kmemcache.ko向umemcached发起请求的流程:

     1. mc_get_unique_val          — 分配请求的序列号, 并填写命令号

     2. mc_add_callback(xx, xx, 1) — 注册该请求的函数函数

     3. mc_send_msg_*              — 发送数据. (同步请求)

     4. mc_del_callback(xx, 1)     — 删除cn_entry

     5. mc_put_unique_val          — 回收序列号

   注意kmemcache.ko在调用mc_send_msg_*()发送数据时, 将等待在cn_entry.comp完成量上.

   umemcached响应的流程与一般的网络服务代码无异, 通过epoll监听socket, 请求到达后, umemcached根据命令号调用sendmsg()响应. 这将回调mc_nl_callback(), 该函数在cn_queue.list中查找对应的cn_entry, 然后将cn_entry.work提交到cn_queue.workqueue. 当该work最终被调度时, 将回调mc_nc_work(). mc_nc_work()进一步回调通过mc_add_callback()注册的回调函数. 在这之后, mc_cn_work()调用complete(cn_entry.comp). 此时, 在mc_send_msg_*()函数内部等待cn_entry.comp的kmemcache.ko内核线程将被唤醒.

   从上面的描述可以看出, kmemcache.ko与umemcached之间的数据交互是 “请求 - 应答” 式的. 且kmemcache.ko发送请求后将同步等待回复(可以指定等待时间).

相关细节

   61 struct cn_id {

    62     __u32   idx;

    63     __u32   val;

    64 };

    65

    66 struct cn_msg {

    67     struct cn_id id;

    68

    69     __u16   len;

    70     __u8    data[0];

    71 };

    72

    82 typedef void* (cn_callback_fn)(struct cn_msg *, struct netlink_skb_parms *);

    83

    84 struct cn_callback {

    85     struct sk_buff *skb;

    86

    87     cn_callback_fn *f;

    88

    89     void *out;

    90 };

    91

    92 struct cn_entry {

    93 #define ENTRY_NEW   (0×1 << 0)

    94 #define ENTRY_RUNNING   (0×1 << 1)

    95 #define ENTRY_FINISHED  (0×1 << 2)

    96     u32 flags:4;

    97     u32 unused:28;

    98     struct cn_id id;

    99     struct list_head list_entry;

   100

   101     struct cn_callback callback;

   102     struct work_struct work;

   103     struct completion comp;

   104 };

   cn_msg表示umemcached和kmemcache.ko间交互的数据包, cn_id为包头, cn_id.idx可以理解为命令号, 用以标识包体类型, cn_id.val可以理解为序列号, 用于防止窜包, 请求数据包和对应的回复数据包包头相同; 包体为len + data.

   19 struct cn_queue {

   21     struct workqueue_struct *workqueue;

   24     struct list_head list;

   25     spinlock_t lock;

   26 };

   cn_queue保存一个workqueue, 并以list成员维护cn_entry链表. 每个cn_entry结点实际上对应kmemcache.ko模块向umemcached发起的一个请求. kmemcache.ko发起请求前, 会分配一个cn_entry, 然后通过mc_add_callback()将cn_entry插入到cn_queue.list链表尾部. mc_add_callback()将该请求对应的回复数据包的回调函数保存在cn_entry.callback中, 并通过INIT_WORK初始化cn_entry.work, 注册cn_entry.work.func的回调函数为mc_cn_work(或mc_cn_work_del).

   当umemcached响应请求, 往netlink socket连接响应数据时, mc_nl_callback()函数将被回调.

   268 static void mc_nl_callback(struct sk_buff *_skb)

   269 {

   270     struct sk_buff *skb;

   271     struct nlmsghdr *nlh;

   272     struct cn_msg *msg;

   273     struct cn_entry *entry;

   274     struct cn_queue *queue = cn.queue;

   275

   276     skb = skb_get(_skb);

   280     nlh = nlmsg_hdr(skb);

   287

   288     msg = NLMSG_DATA(nlh);

   289     spin_lock_bh(&queue->lock);

   290     list_for_each_entry(entry, &queue->list, list_entry) {

   291         if (entry->id.idx == msg->id.idx &&

   292             entry->id.val == msg->id.val) {

   293             entry->callback.skb = skb;

   295             queue_work(queue->workqueue, &entry->work);

   306     }

   307     spin_unlock_bh(&queue->lock);

   315 }

   可以看到, mc_nl_callback()根据umemcached回复的数据包包头, 在cn_queue.list链表中查找对应的cn_entry. 查找成功后, 将cn_entry.work提交到cn_queue.workqueue.

   上文已经分析过mc_add_callback()注册了cn_entry.work.func为mc_cn_work, 所以当cn_entry.work任务被调度时, mc_cn_work()将被回调. 该函数最终将回调cn_entry.callback.f(). 而我们知道, 该回调函数是在调用mc_add_callback()时通过参数传入的.

例子

   举个例子, 当kmemcache.ko需要退出时, 它调用shutdown_cmd(), 通过mc_add_callback()注册了回调函数shutdown_callback(), 然后通过mc_send_msg_timeout()向umemcached发出命令号为CN_IDX_SHUTDOWN的请求. umemcached回复内容后退出. kmemcache.ko收到回复, 表明umemcached马上就要退出, 通过一系列回调, 最终shutdown_callback()被调用. 然后kmemcache.ko调用try_shutdown()尝试将自己卸载. 此后, umemcached从sendmsg()调用中返回, 程序退出.

   umemcached向kmemcache.ko传递启动设置信息的流程与退出流程原理相同, 不再赘述.

mc_dispatcher

   dispatcher是listen sockets的管理器, 其数据结构为:

   21 /* dispatcher master */

   22 struct dispatcher_master {

   23 #define ACCEPT_NEW  1

   24 #define SOCK_CLOSE  2

   25     unsigned long flags;

   26

   27     struct list_head list;     /* tcp/unix socket list */

   28     spinlock_t lock;

   29

   30     struct workqueue_struct *wq;

   31 };

   其中, list成员为serve_socket组成的链表, 一个serve_socket对象对应的listen socket. 其结构为:

   35 /* dispatcher listen socket */

   36 struct serve_sock {

   37     net_transport_t transport;

   38     unsigned long state;       /* conn state */

   39     struct socket *sock;       /* listen socket */

   40     struct list_head list;     /* link to master’s listen list */

   41     struct work_struct work;

   42 };

   mc_dispatcher.c文件头部定义了一个dispatcher_master结构的全局对象:

   33 static struct dispatcher_master dsper;

   其初始化函数为dispatcher_init():

   701 /**

   702  * init dispatcher.

   703  * create the shared dispatcher kthread and start listen socket

   704  *

   705  * Returns 0 on success, error code other wise.

   706  */

   707 int dispatcher_init(void)

   708 {

   711     INIT_LIST_HEAD(&dsper.list);

   712     spin_lock_init(&dsper.lock);

   713

   714     dsper.wq = create_singlethread_workqueue("kmcmasterd");

   720

   721     server_init();

   725     set_bit(ACCEPT_NEW, &dsper.flags);

   730 }

   dispatcher_init()首先初始化dsper.list和dsper.lock, 然后调用create_singlethread_workqueue创建一个名为kmcmasterd的workqueue, 保存在dsper.wq中. 然后调用server_init()创建listen sockets. server_init()函数根据启动参数中是否创建unix domain sockets的标志选择调用server_socket_unix()还是server_inet_init(), 为讨论方便, 这里假设kmemcache启动时未要求创建unix domain sockets, 直接看server_inet_init()的实现:

   509 static int server_inet_init(void)

   510 {

   512     char *path, *data = sock_info->data;

   513     int selen = sizeof(sock_entry_t);

   514     sock_entry_t *se = (sock_entry_t *)data;

   515     struct file *filp = NULL;

   532

   533     for (; data + selen + se->addrlen <= path;) {

   535     server_socket_inet(se, filp);

   538         data += selen + se->addrlen;

   539         se = (sock_entry_t *)data;

   540     }

   548 }

   可以看到, server_inet_init()为启动时要求的每个端口调用server_socket_inet():

   337 static int server_socket_inet(sock_entry_t *se, struct file *filp)

   338 {

   339     int ret = 0;

   340     int flags = 1, level, name;

   341     struct serve_sock *ss;

   342     struct linger ling = {0, 0};

   343

   344     ss = __alloc_serve_sock(se->trans);

   350

   351     ret = sock_create_kern(se->family, se->type, se->protocol, &ss->sock);

   358

   359     if (!IS_UDP(se->trans)) {

   360         ss->sock->sk->sk_allocation = GFP_ATOMIC;

   361         set_sock_callbacks(ss->sock, ss);

   362     }

   363

   415     ret = kernel_bind(ss->sock, (struct sockaddr *)se->addr, se->addrlen);

   420

   421     if (!IS_UDP(se->trans)) {

   422         ret = kernel_listen(ss->sock, settings.backlog);

   427     }

   436

   437     if (IS_UDP(se->trans)) {

   438         static int last_cpu = -1;

   439         int cpu, res = 0;

   440

   441         if (settings.num_threads_per_udp == 1) {

   442             last_cpu = (last_cpu + 1) % num_online_cpus();

   443             ret = mc_dispatch_conn_udp(ss->sock, conn_read,

   444                            UDP_READ_BUF_SIZE, last_cpu);

   445             if (!ret) res++;

   446         } else {

   447             for_each_online_cpu(cpu) {

   448                 ret = mc_dispatch_conn_udp(ss->sock, conn_read,

   449                                UDP_READ_BUF_SIZE,

   450                                cpu);

   451                 if (!ret) res++;

   452             }

   453         }

   454

   463     } else {

   464         spin_lock(&dsper.lock);

   465         list_add_tail(&ss->list, &dsper.list);

   466         spin_unlock(&dsper.lock);

   467     }

   468

   480 }

   函数server_socket_inet()首先调用__alloc_serve_sock()创建并初始化一个serve_sock对象, 然后调用sock_create_kern()创建一个socket, 然后对该socket进行一系列的setsockopt, bind, listen等初始化操作. 在这之后, server_socket_inet()根据这个socket是否为UDP协议, 分为两类操作:

   1. 如果为UDP协议, 调用mc_dispatch_conn_udp(), 后文细说

   2. 如果不是UDP协议, 那么将ss链接到dsper.list链表

   然后server_socket_inet()函数退出.

   接下来看看TCP listen sockets, 其实在第2点之前, server_socket_inet()函数针对非UDP socket, 将通过set_sock_callbacks()注册该socket的几个回调函数:

   240 static void set_sock_callbacks(struct socket *sock, struct serve_sock *ss)

   241 {

   242     struct sock *sk = sock->sk;

   245

   246     sk->sk_user_data    = ss;

   247     sk->sk_data_ready   = mc_disp_data_ready;

   252 }

   当某个listen socket上有新连接到达时, 将回调sk_user_data_ready, 也就是mc_disp_data_ready()函数:

   216 /* data available on socket, or listen socket received a connect */

   217 static void mc_disp_data_ready(struct sock *sk, int unused)

   218 {

   219     struct serve_sock *ss =

   220         (struct serve_sock *)sk->sk_user_data;

   221

   224     if (sk->sk_state == TCP_LISTEN)

   225         _queue(ss);

   226 }

   看下_queue(ss)的实现:

   202 static void inline _queue(struct serve_sock *ss)

   203 {

   209     queue_work(dsper.wq, &ss->work);

   210 }

   可以看到, _queue()非常简单, 它将ss->work提交到dsper.wq. ss->work的回调函数在server_socker_inet()调用__alloc_serve_sock()创建时设置为mc_listen_work().

   在这之后, 当ss->work任务被调度时, mc_listen_work()将被回调:

   190 static void mc_listen_work(struct work_struct *work)

   191 {

   192     struct serve_sock *ss =

   193         container_of(work, struct serve_sock, work);

   194

   195     /* accept many */;

   196     for (; !test_bit(SOCK_CLOSE, &dsper.flags);) {

   197         if (mc_accept_one(ss))

   198             break;

   199     }

   200 }

   mc_listen_work()尽可能的通过mc_accept_one()接收新连接.

   141 static int mc_accept_one(struct serve_sock *ss)

   142 {

   144     struct socket *nsock;

   145     struct socket *sock = ss->sock;

   146

   147     sock_create_lite(sock->sk->sk_family, sock->sk->sk_type,

   148                    sock->sk->sk_protocol, &nsock);

   151

   152     nsock->type = sock->type;

   153     nsock->ops = sock->ops;

   154     sock->ops->accept(sock, nsock, O_NONBLOCK);

   157

   158     nsock->sk->sk_allocation = GFP_ATOMIC;

   159     set_anon_sock_callbacks(nsock);

   165

   174     mc_dispatch_conn_new(nsock, conn_new_cmd,

   175          DATA_BUF_SIZE, ss->transport);

   188 }

   mc_accept_one()通过sock_create_lite()和sock->ops->accept()得到新连接, 之后调用mc_dispatch_conn_new(). 是不是觉得这个函数有点眼熟呢?

mc_worker

   上文提到, 针对UDP socket, dispatcher将调用mc_dispatch_conn_udp(). 而针对accept出来的新连接, 将调用mc_dispatch_conn_new(). 实际上这两个函数的是对__dispatch_conn_new()的简单封装, 这就使得UDP sockets和TCP sockets的处理得到了统一:

   682 int mc_dispatch_conn_udp(struct socket *sock, conn_state_t state,

   683              int rbuflen, int cpu)

   684 {

   685     return __dispatch_conn_new(sock, state, rbuflen, udp_transport, cpu);

   686 }

   687

   688 int mc_dispatch_conn_new(struct socket *sock, conn_state_t state,

   689              int rbuflen, net_transport_t transport)

   690 {

   691     int ret;

   692

   693     ret = __dispatch_conn_new(sock, state, rbuflen, transport, get_cpu());

   694     put_cpu();

   695

   696     return ret;

   697 }

   接下来一窥__dispatch_conn_new()究竟:

   643 /**

   644  * Dispatches a new connection to another thread.

   645  *

   646  * Returns 0 on success, error code other wise

   647  */

   648 static inline int __dispatch_conn_new(struct socket *sock, conn_state_t state,

   649                       int rbuflen, net_transport_t transport, int cpu)

   650 {

   651     int ret = 0;

   652     struct conn_req *rq;

   653

   654     rq = new_conn_req();

   660

   661     rq->state = state;

   662     rq->transport = transport;

   663     rq->sock = sock;

   664     rq->rsize = rbuflen;

   665     INIT_WORK(&rq->work, mc_conn_new_work);

   666

   667     ret = queue_work_on(cpu, slaved, &rq->work);

   673

   674     return 0;

   680 }

   该函数也是非常简单, 为参数socket *sock创建并初始化一个conn_req *rq对象, 注册rq->work的回调函数为mc_conn_new_work, 然后通过queue_work_on()提交任务到名为slaved的workqueue. slaved是在kmemcache.ko模块在初始化时通过kmemcache_init() -> register_kmemcache_bh() -> kmemcache_bh_init() ->  __kmemcache_bh_init() -> worker_init() 调用链初始化的. * (这里所说的调用链未区分调用和回调)

   699 /**

   700  * create slaved’s workqueue & info storage.

   701  *

   702  * Returns 0 on success, error code other wise.

   703  */

   704 int workers_init(void)

   705 {

   733     slaved = create_workqueue("kmcslaved");

   748 }

   可以看到, slaved被创建所使用的是create_workqueue(), 简单理解为通过该函数为每个CPU创建了对应的worker线程. 而queue_work_on(cpu, slaved, &rq->work)的第一个参数CPU的含义, 便是指定rq->work任务提交给slaved workqueue的哪个CPU对应的worker线程上.

   回头看看mc_dispatch_conn_udp()和mc_dispatch_conn_new()的实现, 不难发现:

   - 对UDP socket, rq->work任务所提交的CPU由参数传入. mc_dispatch_conn_udp()由server_socket_inet()调用, 通过server_socket_inet()第437-453行得知, kmemcache.ko模块将根据settings.num_threads_per_udp是否为1, 也就是每个UDP socket是否只使用一个worker线程的配置, 决定将一个UDP socket提交到某个CPU(各UDP sockets以round robin形式选择一个CPU), 还是将该UDP socket提交到所有在线的CPUs.

   - 对TCP socket, rq->work任务所提交的CPU恰恰就是mc_dispatch_conn_new()被执行时所在的CPU. 而该函数的调用者mc_listen_work(), 其实是作为一个任务, 由sk_user_data_ready()(即mc_disp_data_ready())调用_queue()提交到dsper.wq的. dsper.wq由create_singlethread_workqueue()创建, 它对应一个线程, 该线程在多个CPU之间调度, 该线程调度在某个CPU上执行, mc_dispatch_conn_new()被将rq->work提交到slaved workqueue的哪个CPU对应的worker线程上.

   到这里, 无论是UDP sockets还是accept出来的TCP sockets, 它们都被抽象成一个conn_req *rq, rq->work的回调函数统一为mc_conn_new_work(). 然后rq->work被提交到了slaved的某个CPU worker线程. 而这里所谓的”某个CPU”的选择, 是kmemcache代码实现的(即作者jgli说的”基于packet的线程调度机制“), 它保证了同个请求前后的多次处理始终在同一个CPU上, 一方面提高cache命中率, 另一方面合理利用了多CPU资源. 从这点看, kmemcache有点像RPS, RFS补丁(更多), 当然kmemcache更加强大, 控制能力更强.

   接下来, 便是rq->work任务被调度后, mc_conn_new_work()得到回调:

   600 static void mc_conn_new_work(struct work_struct *work)

   601 {

   602     conn *c;

   603     struct conn_req *rq =

   604         container_of(work, struct conn_req, work);

   605

   606     c = mc_conn_new(rq);

   611     mc_queue_conn(c);

   623 }

   388 void mc_queue_conn(conn *c)

   389 {

   395     __queue_conn(c);

   396 }

   368 static inline void __queue_conn(conn *c)

   369 {

   380     queue_work(slaved, &c->work);

   386 }

   在这里, conn_req *rq进一步被抽象为conn *c, 然后由mc_queue_conn()将c->work提交到原来所在的CPU的slaved orkqueue上. 回调函数由mc_conn_new()注册为mc_conn_work().

   70 conn* mc_conn_new(struct conn_req *rq)

    71 {

    74      conn *c = _conn_new();

   119     c->sock = rq->sock;

   120     c->state = rq->state;

   121     c->transport = rq->transport;

   122     INIT_WORK(&c->work, mc_conn_work);

   123     atomic_set(&c->nref, 1);

   124     set_bit(EV_RDWR, &c->event);

   125     set_sock_callbacks(c->sock, c);

   145 }

   625 void mc_conn_work(struct work_struct *work)

   626 {

   634     mc_worker_machine(c);

   637     mc_requeue_conn(c);

   641 }

   mc_worker_machine()由conn *c当前的状态驱动, 如已读数据不满足解析状态则读入数据, 否则解析数据, 解析成功曾调用相应的处理函数, 有数据可写则写出数据, 等等等等. 因为开始和memcache逻辑息息相关了, 后面的代码我未做深究.

   因为workqueue是one shot的, 回调后若仍需后续处理, 自然该重新提交任务, 很明显这就是mc_requeue_conn()的功能. 值得说明的一点是, 在这之前, mc_requeue_conn()将通过sock->ops->poll()主动获取当前socket的读写状态并填入conn *c的event字段:

   398 void mc_requeue_conn(conn *c)

   399 {

   400     int poll;

   401

   402     if (test_bit(EV_DEAD, &c->event)) {

   403         PRINFO("mc_requeue_conn %p ignore EV_DEAD", c);

   404         return;

   405     }

   406

   407     poll = c->sock->ops->poll(c->sock->file, c->sock, NULL);

   408     if (test_bit(EV_RDWR, &c->event)) {

   409         if (poll & CONN_READ) {

   410             goto queue_conn;

   411         } else {

   412             PRINFO("mc_queue_conn %p ignore EV_READ", c);

   413         }

   414     } else {

   415         if (poll & CONN_WRITE) {

   416             goto queue_conn;

   417         } else {

   418             PRINFO("mc_queue_conn %p ignore EV_WRITE", c);

   419         }

   420     }

   421

   422     return;

   423

   424 queue_conn:

   425     __queue_conn(c);

   426

   427 }

   注意调用poll()时最后一个参数为NULL, 这说明仅仅要求获取当前socket事件, 而不需要内核为该socket创建wait队列并在socket状态将来改变时回调以唤醒等待进程. (个人感觉将poll()延后到由mc_work_machine()函数调用后更好, 当前的实现是poll()出事件, 然后提交任务, 因而在任务被调度时可能该socket上又有了新事件, 但mc_worker_machine()对此毫不知情.)

   总结一下, 不管是UDP请求还是TCP请求, 都通过__dispatch_conn_new()提交任务到slaved. 任务的回调是mc_conn_new_work(). 该函数进一步将请求抽象为conn *c, 并再次向原来的CPU对应的slaved workqueue提交任务, 回调为mc_conn_work(). mc_conn_work()由conn *c的状态驱动, 每次被回调后会判断任务是否已完成, 若未完成, 则重新提交任务.

epoll的不足

   在用户态网络服务上, epoll工作的足够好, 当然, 这是比起select而言. 如果尝试在内核态使用epoll, 不难发现它的不足.

   1. epoll_wait实质是轮询

   2. epoll未反馈socket最后所在的CPU

   下面根据源码简单阐述(基于个人理解, 若有不对, 欢迎指正).

   1446 static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,

   1447            int maxevents, long timeout)

   1448 {

   1471 fetch_events:

   1472     spin_lock_irqsave(&ep->lock, flags);

   1473

   1474     if (!ep_events_available(ep)) {

   1482

   1483         for (;;) {

   1489             set_current_state(TASK_INTERRUPTIBLE);

   1490             if (ep_events_available(ep) || timed_out)

   1491                 break;

   1492             if (signal_pending(current)) {

   1493                 res = -EINTR;

   1494                 break;

   1495             }

   1496

   1497             spin_unlock_irqrestore(&ep->lock, flags);

   1498             if (!schedule_hrtimeout_range(to, slack, HRTIMER_MODE_ABS))

   1499                 timed_out = 1;

   1500

   1501             spin_lock_irqsave(&ep->lock, flags);

   1502         }

   1504

   1505         set_current_state(TASK_RUNNING);

   1506     }

   1523 }

   可以看到, ep_poll()在for循环中不断轮询是否有socket可用, 若无socket可用, 则调用schedule_hrtimeout_range()主动让出CPU, 直到超时或有可以sockets.

   实际上, epoll多路复用的功能, 是依靠->f_op->poll()注册回调实现的. 以ep_insert()为例:

   1145 static int ep_insert(struct eventpoll *ep, struct epoll_event *event,

   1146              struct file *tfile, int fd)

   1147 {

   1152     struct ep_pqueue epq;

   1153

   1177     /* Initialize the poll table using the queue callback */

   1178     epq.epi = epi;

   1179     init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);

   1180     epq.pt._key = event->events;

   1181

   1182     /*

   1183      * Attach the item to the poll hooks and get current event bits.

   1184      * We can safely use the file* here because its usage count has

   1185      * been increased by the caller of this function. Note that after

   1186      * this operation completes, the poll callback can start hitting

   1187      * the new item.

   1188      */

   1189     revents = tfile->f_op->poll(tfile, &epq.pt);

   1269 }

   ep_insert()通过tfile->f_op->poll()(对socket而言, 为sock_poll(); 更进一步, 对tcp socket来说, 便是tcp_poll())调用poll_wait()将回调函数ep_ptable_queue_proc()注册在wait queue上. 当socket状态改变时, 内核协议栈通过wait_event_*()对wait queue上的回调函数逐个回调. 对ep_ptable_queue_proc()而言, 它将fd封转为epitem添加到目的file的sock等待队列, 回调函数为ep_poll_callback().

   当socket收到数据后, 内核协议栈将回调sk_data_ready(默认为sock_def_readable), 最终会调用ep_poll_callback():

   896 static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key)

   897 {

   956     /* If this file is already in the ready list we exit soon */

   957     if (!ep_is_linked(&epi->rdllink)) {

   958         list_add_tail(&epi->rdllink, &ep->rdllist);

   959         __pm_stay_awake(epi->ws);

   960     }

   979 }

   ep_poll_callback()将socket插入就绪队列. 而epoll_wait()轮询的正是就绪队列是否为空.

   从上面的讨论可以看到, epoll_wait本质为轮询, 且其分割了数据逻辑和处理逻辑: socket有事件后, 通过辗转回调插入就绪队列, 最后由epoll_wait收割回用户态进行处理. 另一方面, 用户态无法获取就绪的socket所在的CPU, 处理逻辑如果不在原来的CPU, 则CPU cache命中率势必会受到影响.

高性能, 路漫漫

   我曾断断续续的写过一个内核态网络框架knp, 原理与kmemcache几乎相同, 当然在实现上天真很多. (当时太过强调兼容已有的网络框架, 导致不少时间被浪费在重造fifo, msg queue, shm allocator, …之上. 现在回想起来, 后悔不已.)

   和kmemcache作者jgli一样, 在读完The Secret To 10 Million Concurrent Connections -The Kernel Is The Problem, Not The Solution后, 感触颇深: 内核确实带来了太多的overhead. 于是我转头了解了netmap项目, 其作者是N年前提出DEVICE_POLLING的Luigi Rizzo. netmap代码不多, 只是有太多我尚未熟悉的领域, 于是浅尝辄止, 悻悻作罢. 以后有空再看看吧.

   至于我那个knp框架, 早已沉寂多日了.

同分类推荐文章

  1. Vibe新开源项目 - Vaala AI Gateway (2026-05-17 02:10:19)
  2. SmartPerfetto 架构文章 Q&amp;A:8 个深度技术问答 (2026-04-10 11:00:00)
  3. 让 AI 把我的 PHP 博客重写成 Go (2026-03-27 18:33:54)

查看更多 后端 文章 →

建议继续学习

  1. 基于SSD的数据库性能优化 (累计阅读 8,754)
  2. 使用memc-nginx和srcache-nginx构建高效透明的缓存机制 (累计阅读 7,067)
  3. Linux C语言编程学习材料 (累计阅读 7,031)
  4. Memcache分布式部署方案 (累计阅读 6,752)
  5. fatcache源码浅析 (累计阅读 5,995)
  6. 微博架构与平台安全演讲稿 (累计阅读 5,692)
  7. 关于session和memcache的若干问题 (累计阅读 5,269)
  8. 启用memcached压缩注意事项 (累计阅读 5,189)
  9. Memcache mutex设计模式 (累计阅读 5,006)
  10. Memcache源代码分析之数据存储 (累计阅读 4,935)