技术头条 - 一个快速在微博传播文章的方式     搜索本站
您现在的位置首页 --> 源码分析 --> kmemcache源码浅析

kmemcache源码浅析

浏览:2343次  出处信息

简介

   kmemcache是memcache的linux内核移植版, 这两天断断续续的看了其网络方面的实现.

   简单来说, kmemcache不落窠臼, 摈弃了epoll通知机制. 它借助skb的回调函数, 实现packet级别的调度. 在网路模型上, kmemcache分为一个dispatcher和多个workers(均为workqueue线程). dispatcher服务于TCP和unix domain sockets, 它将新建的连接丢给某个worker. 除此之外, workers还处理UDP请求.

   下面详细分析源码.

mc_connector

   kmemcache分为umemcached和kmemcache.ko两部分. umemcached为用户态daemon, 主要作用是解析启动参数, 将启动的settings信息传给kmemcache.ko. kmemcache.ko是内核模块,完成除解析启动参数之外的其他所有功能.

   umemcached解析启动参数的代码非常简单, 不必多说.

   这里简单分析下umemcached和kmemcache.ko通过netlink机制实现数据交互的代码(mc_connector.[hc]).

初始化阶段

   kmemcache.ko模块在初始化时创建协议号为NETLINK_MEMCACHE的netlink socket, 注册其回调函数为mc_nl_data().

   umemcached创建相同协议的netlink socket.

数据交互阶段

   kmemcache.ko向umemcached发起请求的流程:

     1. mc_get_unique_val          — 分配请求的序列号, 并填写命令号

     2. mc_add_callback(xx, xx, 1) — 注册该请求的函数函数

     3. mc_send_msg_*              — 发送数据. (同步请求)

     4. mc_del_callback(xx, 1)     — 删除cn_entry

     5. mc_put_unique_val          — 回收序列号

   注意kmemcache.ko在调用mc_send_msg_*()发送数据时, 将等待在cn_entry.comp完成量上.

   umemcached响应的流程与一般的网络服务代码无异, 通过epoll监听socket, 请求到达后, umemcached根据命令号调用sendmsg()响应. 这将回调mc_nl_callback(), 该函数在cn_queue.list中查找对应的cn_entry, 然后将cn_entry.work提交到cn_queue.workqueue. 当该work最终被调度时, 将回调mc_nc_work(). mc_nc_work()进一步回调通过mc_add_callback()注册的回调函数. 在这之后, mc_cn_work()调用complete(cn_entry.comp). 此时, 在mc_send_msg_*()函数内部等待cn_entry.comp的kmemcache.ko内核线程将被唤醒.

   从上面的描述可以看出, kmemcache.ko与umemcached之间的数据交互是 “请求 - 应答” 式的. 且kmemcache.ko发送请求后将同步等待回复(可以指定等待时间).

相关细节

   61 struct cn_id {

    62     __u32   idx;

    63     __u32   val;

    64 };

    65

    66 struct cn_msg {

    67     struct cn_id id;

    68

    69     __u16   len;

    70     __u8    data[0];

    71 };

    72

    82 typedef void* (cn_callback_fn)(struct cn_msg *, struct netlink_skb_parms *);

    83

    84 struct cn_callback {

    85     struct sk_buff *skb;

    86

    87     cn_callback_fn *f;

    88

    89     void *out;

    90 };

    91

    92 struct cn_entry {

    93 #define ENTRY_NEW   (0×1 << 0)

    94 #define ENTRY_RUNNING   (0×1 << 1)

    95 #define ENTRY_FINISHED  (0×1 << 2)

    96     u32 flags:4;

    97     u32 unused:28;

    98     struct cn_id id;

    99     struct list_head list_entry;

   100

   101     struct cn_callback callback;

   102     struct work_struct work;

   103     struct completion comp;

   104 };

   cn_msg表示umemcached和kmemcache.ko间交互的数据包, cn_id为包头, cn_id.idx可以理解为命令号, 用以标识包体类型, cn_id.val可以理解为序列号, 用于防止窜包, 请求数据包和对应的回复数据包包头相同; 包体为len + data.

   19 struct cn_queue {

   21     struct workqueue_struct *workqueue;

   24     struct list_head list;

   25     spinlock_t lock;

   26 };

   cn_queue保存一个workqueue, 并以list成员维护cn_entry链表. 每个cn_entry结点实际上对应kmemcache.ko模块向umemcached发起的一个请求. kmemcache.ko发起请求前, 会分配一个cn_entry, 然后通过mc_add_callback()将cn_entry插入到cn_queue.list链表尾部. mc_add_callback()将该请求对应的回复数据包的回调函数保存在cn_entry.callback中, 并通过INIT_WORK初始化cn_entry.work, 注册cn_entry.work.func的回调函数为mc_cn_work(或mc_cn_work_del).

   当umemcached响应请求, 往netlink socket连接响应数据时, mc_nl_callback()函数将被回调.

   268 static void mc_nl_callback(struct sk_buff *_skb)

   269 {

   270     struct sk_buff *skb;

   271     struct nlmsghdr *nlh;

   272     struct cn_msg *msg;

   273     struct cn_entry *entry;

   274     struct cn_queue *queue = cn.queue;

   275

   276     skb = skb_get(_skb);

   280     nlh = nlmsg_hdr(skb);

   287

   288     msg = NLMSG_DATA(nlh);

   289     spin_lock_bh(&queue->lock);

   290     list_for_each_entry(entry, &queue->list, list_entry) {

   291         if (entry->id.idx == msg->id.idx &&

   292             entry->id.val == msg->id.val) {

   293             entry->callback.skb = skb;

   295             queue_work(queue->workqueue, &entry->work);

   306     }

   307     spin_unlock_bh(&queue->lock);

   315 }

   可以看到, mc_nl_callback()根据umemcached回复的数据包包头, 在cn_queue.list链表中查找对应的cn_entry. 查找成功后, 将cn_entry.work提交到cn_queue.workqueue.

   上文已经分析过mc_add_callback()注册了cn_entry.work.func为mc_cn_work, 所以当cn_entry.work任务被调度时, mc_cn_work()将被回调. 该函数最终将回调cn_entry.callback.f(). 而我们知道, 该回调函数是在调用mc_add_callback()时通过参数传入的.

例子

   举个例子, 当kmemcache.ko需要退出时, 它调用shutdown_cmd(), 通过mc_add_callback()注册了回调函数shutdown_callback(), 然后通过mc_send_msg_timeout()向umemcached发出命令号为CN_IDX_SHUTDOWN的请求. umemcached回复内容后退出. kmemcache.ko收到回复, 表明umemcached马上就要退出, 通过一系列回调, 最终shutdown_callback()被调用. 然后kmemcache.ko调用try_shutdown()尝试将自己卸载. 此后, umemcached从sendmsg()调用中返回, 程序退出.

   umemcached向kmemcache.ko传递启动设置信息的流程与退出流程原理相同, 不再赘述.

mc_dispatcher

   dispatcher是listen sockets的管理器, 其数据结构为:

   21 /* dispatcher master */

   22 struct dispatcher_master {

   23 #define ACCEPT_NEW  1

   24 #define SOCK_CLOSE  2

   25     unsigned long flags;

   26

   27     struct list_head list;     /* tcp/unix socket list */

   28     spinlock_t lock;

   29

   30     struct workqueue_struct *wq;

   31 };

   其中, list成员为serve_socket组成的链表, 一个serve_socket对象对应的listen socket. 其结构为:

   35 /* dispatcher listen socket */

   36 struct serve_sock {

   37     net_transport_t transport;

   38     unsigned long state;       /* conn state */

   39     struct socket *sock;       /* listen socket */

   40     struct list_head list;     /* link to master’s listen list */

   41     struct work_struct work;

   42 };

   mc_dispatcher.c文件头部定义了一个dispatcher_master结构的全局对象:

   33 static struct dispatcher_master dsper;

   其初始化函数为dispatcher_init():

   701 /**

   702  * init dispatcher.

   703  * create the shared dispatcher kthread and start listen socket

   704  *

   705  * Returns 0 on success, error code other wise.

   706  */

   707 int dispatcher_init(void)

   708 {

   711     INIT_LIST_HEAD(&dsper.list);

   712     spin_lock_init(&dsper.lock);

   713

   714     dsper.wq = create_singlethread_workqueue("kmcmasterd");

   720

   721     server_init();

   725     set_bit(ACCEPT_NEW, &dsper.flags);

   730 }

   dispatcher_init()首先初始化dsper.list和dsper.lock, 然后调用create_singlethread_workqueue创建一个名为kmcmasterd的workqueue, 保存在dsper.wq中. 然后调用server_init()创建listen sockets. server_init()函数根据启动参数中是否创建unix domain sockets的标志选择调用server_socket_unix()还是server_inet_init(), 为讨论方便, 这里假设kmemcache启动时未要求创建unix domain sockets, 直接看server_inet_init()的实现:

   509 static int server_inet_init(void)

   510 {

   512     char *path, *data = sock_info->data;

   513     int selen = sizeof(sock_entry_t);

   514     sock_entry_t *se = (sock_entry_t *)data;

   515     struct file *filp = NULL;

   532

   533     for (; data + selen + se->addrlen <= path;) {

   535     server_socket_inet(se, filp);

   538         data += selen + se->addrlen;

   539         se = (sock_entry_t *)data;

   540     }

   548 }

   可以看到, server_inet_init()为启动时要求的每个端口调用server_socket_inet():

   337 static int server_socket_inet(sock_entry_t *se, struct file *filp)

   338 {

   339     int ret = 0;

   340     int flags = 1, level, name;

   341     struct serve_sock *ss;

   342     struct linger ling = {0, 0};

   343

   344     ss = __alloc_serve_sock(se->trans);

   350

   351     ret = sock_create_kern(se->family, se->type, se->protocol, &ss->sock);

   358

   359     if (!IS_UDP(se->trans)) {

   360         ss->sock->sk->sk_allocation = GFP_ATOMIC;

   361         set_sock_callbacks(ss->sock, ss);

   362     }

   363

   415     ret = kernel_bind(ss->sock, (struct sockaddr *)se->addr, se->addrlen);

   420

   421     if (!IS_UDP(se->trans)) {

   422         ret = kernel_listen(ss->sock, settings.backlog);

   427     }

   436

   437     if (IS_UDP(se->trans)) {

   438         static int last_cpu = -1;

   439         int cpu, res = 0;

   440

   441         if (settings.num_threads_per_udp == 1) {

   442             last_cpu = (last_cpu + 1) % num_online_cpus();

   443             ret = mc_dispatch_conn_udp(ss->sock, conn_read,

   444                            UDP_READ_BUF_SIZE, last_cpu);

   445             if (!ret) res++;

   446         } else {

   447             for_each_online_cpu(cpu) {

   448                 ret = mc_dispatch_conn_udp(ss->sock, conn_read,

   449                                UDP_READ_BUF_SIZE,

   450                                cpu);

   451                 if (!ret) res++;

   452             }

   453         }

   454

   463     } else {

   464         spin_lock(&dsper.lock);

   465         list_add_tail(&ss->list, &dsper.list);

   466         spin_unlock(&dsper.lock);

   467     }

   468

   480 }

   函数server_socket_inet()首先调用__alloc_serve_sock()创建并初始化一个serve_sock对象, 然后调用sock_create_kern()创建一个socket, 然后对该socket进行一系列的setsockopt, bind, listen等初始化操作. 在这之后, server_socket_inet()根据这个socket是否为UDP协议, 分为两类操作:

   1. 如果为UDP协议, 调用mc_dispatch_conn_udp(), 后文细说

   2. 如果不是UDP协议, 那么将ss链接到dsper.list链表

   然后server_socket_inet()函数退出.

   接下来看看TCP listen sockets, 其实在第2点之前, server_socket_inet()函数针对非UDP socket, 将通过set_sock_callbacks()注册该socket的几个回调函数:

   240 static void set_sock_callbacks(struct socket *sock, struct serve_sock *ss)

   241 {

   242     struct sock *sk = sock->sk;

   245

   246     sk->sk_user_data    = ss;

   247     sk->sk_data_ready   = mc_disp_data_ready;

   252 }

   当某个listen socket上有新连接到达时, 将回调sk_user_data_ready, 也就是mc_disp_data_ready()函数:

   216 /* data available on socket, or listen socket received a connect */

   217 static void mc_disp_data_ready(struct sock *sk, int unused)

   218 {

   219     struct serve_sock *ss =

   220         (struct serve_sock *)sk->sk_user_data;

   221

   224     if (sk->sk_state == TCP_LISTEN)

   225         _queue(ss);

   226 }

   看下_queue(ss)的实现:

   202 static void inline _queue(struct serve_sock *ss)

   203 {

   209     queue_work(dsper.wq, &ss->work);

   210 }

   可以看到, _queue()非常简单, 它将ss->work提交到dsper.wq. ss->work的回调函数在server_socker_inet()调用__alloc_serve_sock()创建时设置为mc_listen_work().

   在这之后, 当ss->work任务被调度时, mc_listen_work()将被回调:

   190 static void mc_listen_work(struct work_struct *work)

   191 {

   192     struct serve_sock *ss =

   193         container_of(work, struct serve_sock, work);

   194

   195     /* accept many */;

   196     for (; !test_bit(SOCK_CLOSE, &dsper.flags);) {

   197         if (mc_accept_one(ss))

   198             break;

   199     }

   200 }

   mc_listen_work()尽可能的通过mc_accept_one()接收新连接.

   141 static int mc_accept_one(struct serve_sock *ss)

   142 {

   144     struct socket *nsock;

   145     struct socket *sock = ss->sock;

   146

   147     sock_create_lite(sock->sk->sk_family, sock->sk->sk_type,

   148                    sock->sk->sk_protocol, &nsock);

   151

   152     nsock->type = sock->type;

   153     nsock->ops = sock->ops;

   154     sock->ops->accept(sock, nsock, O_NONBLOCK);

   157

   158     nsock->sk->sk_allocation = GFP_ATOMIC;

   159     set_anon_sock_callbacks(nsock);

   165

   174     mc_dispatch_conn_new(nsock, conn_new_cmd,

   175          DATA_BUF_SIZE, ss->transport);

   188 }

   mc_accept_one()通过sock_create_lite()和sock->ops->accept()得到新连接, 之后调用mc_dispatch_conn_new(). 是不是觉得这个函数有点眼熟呢?

mc_worker

   上文提到, 针对UDP socket, dispatcher将调用mc_dispatch_conn_udp(). 而针对accept出来的新连接, 将调用mc_dispatch_conn_new(). 实际上这两个函数的是对__dispatch_conn_new()的简单封装, 这就使得UDP sockets和TCP sockets的处理得到了统一:

   682 int mc_dispatch_conn_udp(struct socket *sock, conn_state_t state,

   683              int rbuflen, int cpu)

   684 {

   685     return __dispatch_conn_new(sock, state, rbuflen, udp_transport, cpu);

   686 }

   687

   688 int mc_dispatch_conn_new(struct socket *sock, conn_state_t state,

   689              int rbuflen, net_transport_t transport)

   690 {

   691     int ret;

   692

   693     ret = __dispatch_conn_new(sock, state, rbuflen, transport, get_cpu());

   694     put_cpu();

   695

   696     return ret;

   697 }

   接下来一窥__dispatch_conn_new()究竟:

   643 /**

   644  * Dispatches a new connection to another thread.

   645  *

   646  * Returns 0 on success, error code other wise

   647  */

   648 static inline int __dispatch_conn_new(struct socket *sock, conn_state_t state,

   649                       int rbuflen, net_transport_t transport, int cpu)

   650 {

   651     int ret = 0;

   652     struct conn_req *rq;

   653

   654     rq = new_conn_req();

   660

   661     rq->state = state;

   662     rq->transport = transport;

   663     rq->sock = sock;

   664     rq->rsize = rbuflen;

   665     INIT_WORK(&rq->work, mc_conn_new_work);

   666

   667     ret = queue_work_on(cpu, slaved, &rq->work);

   673

   674     return 0;

   680 }

   该函数也是非常简单, 为参数socket *sock创建并初始化一个conn_req *rq对象, 注册rq->work的回调函数为mc_conn_new_work, 然后通过queue_work_on()提交任务到名为slaved的workqueue. slaved是在kmemcache.ko模块在初始化时通过kmemcache_init() -> register_kmemcache_bh() -> kmemcache_bh_init() ->  __kmemcache_bh_init() -> worker_init() 调用链初始化的. * (这里所说的调用链未区分调用和回调)

   699 /**

   700  * create slaved’s workqueue & info storage.

   701  *

   702  * Returns 0 on success, error code other wise.

   703  */

   704 int workers_init(void)

   705 {

   733     slaved = create_workqueue("kmcslaved");

   748 }

   可以看到, slaved被创建所使用的是create_workqueue(), 简单理解为通过该函数为每个CPU创建了对应的worker线程. 而queue_work_on(cpu, slaved, &rq->work)的第一个参数CPU的含义, 便是指定rq->work任务提交给slaved workqueue的哪个CPU对应的worker线程上.

   回头看看mc_dispatch_conn_udp()和mc_dispatch_conn_new()的实现, 不难发现:

   - 对UDP socket, rq->work任务所提交的CPU由参数传入. mc_dispatch_conn_udp()由server_socket_inet()调用, 通过server_socket_inet()第437-453行得知, kmemcache.ko模块将根据settings.num_threads_per_udp是否为1, 也就是每个UDP socket是否只使用一个worker线程的配置, 决定将一个UDP socket提交到某个CPU(各UDP sockets以round robin形式选择一个CPU), 还是将该UDP socket提交到所有在线的CPUs.

   - 对TCP socket, rq->work任务所提交的CPU恰恰就是mc_dispatch_conn_new()被执行时所在的CPU. 而该函数的调用者mc_listen_work(), 其实是作为一个任务, 由sk_user_data_ready()(即mc_disp_data_ready())调用_queue()提交到dsper.wq的. dsper.wq由create_singlethread_workqueue()创建, 它对应一个线程, 该线程在多个CPU之间调度, 该线程调度在某个CPU上执行, mc_dispatch_conn_new()被将rq->work提交到slaved workqueue的哪个CPU对应的worker线程上.

   到这里, 无论是UDP sockets还是accept出来的TCP sockets, 它们都被抽象成一个conn_req *rq, rq->work的回调函数统一为mc_conn_new_work(). 然后rq->work被提交到了slaved的某个CPU worker线程. 而这里所谓的”某个CPU”的选择, 是kmemcache代码实现的(即作者jgli说的”基于packet的线程调度机制“), 它保证了同个请求前后的多次处理始终在同一个CPU上, 一方面提高cache命中率, 另一方面合理利用了多CPU资源. 从这点看, kmemcache有点像RPS, RFS补丁(更多), 当然kmemcache更加强大, 控制能力更强.

   接下来, 便是rq->work任务被调度后, mc_conn_new_work()得到回调:

   600 static void mc_conn_new_work(struct work_struct *work)

   601 {

   602     conn *c;

   603     struct conn_req *rq =

   604         container_of(work, struct conn_req, work);

   605

   606     c = mc_conn_new(rq);

   611     mc_queue_conn(c);

   623 }

   388 void mc_queue_conn(conn *c)

   389 {

   395     __queue_conn(c);

   396 }

   368 static inline void __queue_conn(conn *c)

   369 {

   380     queue_work(slaved, &c->work);

   386 }

   在这里, conn_req *rq进一步被抽象为conn *c, 然后由mc_queue_conn()将c->work提交到原来所在的CPU的slaved orkqueue上. 回调函数由mc_conn_new()注册为mc_conn_work().

   70 conn* mc_conn_new(struct conn_req *rq)

    71 {

    74      conn *c = _conn_new();

   119     c->sock = rq->sock;

   120     c->state = rq->state;

   121     c->transport = rq->transport;

   122     INIT_WORK(&c->work, mc_conn_work);

   123     atomic_set(&c->nref, 1);

   124     set_bit(EV_RDWR, &c->event);

   125     set_sock_callbacks(c->sock, c);

   145 }

   625 void mc_conn_work(struct work_struct *work)

   626 {

   634     mc_worker_machine(c);

   637     mc_requeue_conn(c);

   641 }

   mc_worker_machine()由conn *c当前的状态驱动, 如已读数据不满足解析状态则读入数据, 否则解析数据, 解析成功曾调用相应的处理函数, 有数据可写则写出数据, 等等等等. 因为开始和memcache逻辑息息相关了, 后面的代码我未做深究.

   因为workqueue是one shot的, 回调后若仍需后续处理, 自然该重新提交任务, 很明显这就是mc_requeue_conn()的功能. 值得说明的一点是, 在这之前, mc_requeue_conn()将通过sock->ops->poll()主动获取当前socket的读写状态并填入conn *c的event字段:

   398 void mc_requeue_conn(conn *c)

   399 {

   400     int poll;

   401

   402     if (test_bit(EV_DEAD, &c->event)) {

   403         PRINFO("mc_requeue_conn %p ignore EV_DEAD", c);

   404         return;

   405     }

   406

   407     poll = c->sock->ops->poll(c->sock->file, c->sock, NULL);

   408     if (test_bit(EV_RDWR, &c->event)) {

   409         if (poll & CONN_READ) {

   410             goto queue_conn;

   411         } else {

   412             PRINFO("mc_queue_conn %p ignore EV_READ", c);

   413         }

   414     } else {

   415         if (poll & CONN_WRITE) {

   416             goto queue_conn;

   417         } else {

   418             PRINFO("mc_queue_conn %p ignore EV_WRITE", c);

   419         }

   420     }

   421

   422     return;

   423

   424 queue_conn:

   425     __queue_conn(c);

   426

   427 }

   注意调用poll()时最后一个参数为NULL, 这说明仅仅要求获取当前socket事件, 而不需要内核为该socket创建wait队列并在socket状态将来改变时回调以唤醒等待进程. (个人感觉将poll()延后到由mc_work_machine()函数调用后更好, 当前的实现是poll()出事件, 然后提交任务, 因而在任务被调度时可能该socket上又有了新事件, 但mc_worker_machine()对此毫不知情.)

   总结一下, 不管是UDP请求还是TCP请求, 都通过__dispatch_conn_new()提交任务到slaved. 任务的回调是mc_conn_new_work(). 该函数进一步将请求抽象为conn *c, 并再次向原来的CPU对应的slaved workqueue提交任务, 回调为mc_conn_work(). mc_conn_work()由conn *c的状态驱动, 每次被回调后会判断任务是否已完成, 若未完成, 则重新提交任务.

epoll的不足

   在用户态网络服务上, epoll工作的足够好, 当然, 这是比起select而言. 如果尝试在内核态使用epoll, 不难发现它的不足.

   1. epoll_wait实质是轮询

   2. epoll未反馈socket最后所在的CPU

   下面根据源码简单阐述(基于个人理解, 若有不对, 欢迎指正).

   1446 static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,

   1447            int maxevents, long timeout)

   1448 {

   1471 fetch_events:

   1472     spin_lock_irqsave(&ep->lock, flags);

   1473

   1474     if (!ep_events_available(ep)) {

   1482

   1483         for (;;) {

   1489             set_current_state(TASK_INTERRUPTIBLE);

   1490             if (ep_events_available(ep) || timed_out)

   1491                 break;

   1492             if (signal_pending(current)) {

   1493                 res = -EINTR;

   1494                 break;

   1495             }

   1496

   1497             spin_unlock_irqrestore(&ep->lock, flags);

   1498             if (!schedule_hrtimeout_range(to, slack, HRTIMER_MODE_ABS))

   1499                 timed_out = 1;

   1500

   1501             spin_lock_irqsave(&ep->lock, flags);

   1502         }

   1504

   1505         set_current_state(TASK_RUNNING);

   1506     }

   1523 }

   可以看到, ep_poll()在for循环中不断轮询是否有socket可用, 若无socket可用, 则调用schedule_hrtimeout_range()主动让出CPU, 直到超时或有可以sockets.

   实际上, epoll多路复用的功能, 是依靠->f_op->poll()注册回调实现的. 以ep_insert()为例:

   1145 static int ep_insert(struct eventpoll *ep, struct epoll_event *event,

   1146              struct file *tfile, int fd)

   1147 {

   1152     struct ep_pqueue epq;

   1153

   1177     /* Initialize the poll table using the queue callback */

   1178     epq.epi = epi;

   1179     init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);

   1180     epq.pt._key = event->events;

   1181

   1182     /*

   1183      * Attach the item to the poll hooks and get current event bits.

   1184      * We can safely use the file* here because its usage count has

   1185      * been increased by the caller of this function. Note that after

   1186      * this operation completes, the poll callback can start hitting

   1187      * the new item.

   1188      */

   1189     revents = tfile->f_op->poll(tfile, &epq.pt);

   1269 }

   ep_insert()通过tfile->f_op->poll()(对socket而言, 为sock_poll(); 更进一步, 对tcp socket来说, 便是tcp_poll())调用poll_wait()将回调函数ep_ptable_queue_proc()注册在wait queue上. 当socket状态改变时, 内核协议栈通过wait_event_*()对wait queue上的回调函数逐个回调. 对ep_ptable_queue_proc()而言, 它将fd封转为epitem添加到目的file的sock等待队列, 回调函数为ep_poll_callback().

   当socket收到数据后, 内核协议栈将回调sk_data_ready(默认为sock_def_readable), 最终会调用ep_poll_callback():

   896 static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key)

   897 {

   956     /* If this file is already in the ready list we exit soon */

   957     if (!ep_is_linked(&epi->rdllink)) {

   958         list_add_tail(&epi->rdllink, &ep->rdllist);

   959         __pm_stay_awake(epi->ws);

   960     }

   979 }

   ep_poll_callback()将socket插入就绪队列. 而epoll_wait()轮询的正是就绪队列是否为空.

   从上面的讨论可以看到, epoll_wait本质为轮询, 且其分割了数据逻辑和处理逻辑: socket有事件后, 通过辗转回调插入就绪队列, 最后由epoll_wait收割回用户态进行处理. 另一方面, 用户态无法获取就绪的socket所在的CPU, 处理逻辑如果不在原来的CPU, 则CPU cache命中率势必会受到影响.

高性能, 路漫漫

   我曾断断续续的写过一个内核态网络框架knp, 原理与kmemcache几乎相同, 当然在实现上天真很多. (当时太过强调兼容已有的网络框架, 导致不少时间被浪费在重造fifo, msg queue, shm allocator, …之上. 现在回想起来, 后悔不已.)

   和kmemcache作者jgli一样, 在读完The Secret To 10 Million Concurrent Connections -The Kernel Is The Problem, Not The Solution后, 感触颇深: 内核确实带来了太多的overhead. 于是我转头了解了netmap项目, 其作者是N年前提出DEVICE_POLLING的Luigi Rizzo. netmap代码不多, 只是有太多我尚未熟悉的领域, 于是浅尝辄止, 悻悻作罢. 以后有空再看看吧.

   至于我那个knp框架, 早已沉寂多日了.

QQ技术交流群:445447336,欢迎加入!
扫一扫订阅我的微信号:IT技术博客大学习
© 2009 - 2025 by blogread.cn 微博:@IT技术博客大学习

京ICP备15002552号-1