IT技术博客大学习 共学习 共进步

Chaos网络库(三)- 主循环及异步消息的实现

MySQLOPS 数据库与运维自动化技术分享 2012-08-14 13:58:52 浏览 2,324 次

基本原理 -

在chaos开篇介绍(http://www.cppthinker.com/chaos/57/chaos_1)中已经提到,task service作为chaos库的核心,主要承担着三个重则:

1. 网络I/O

2. 超时事件

3. 异步消息处理

简单来讲,可以认为一个task service中包含一个epollfd,一个定时事件管理器,一个等待被处理的异步消息队列

而task service会有一个主循环来周而复始地执行这些任务(单线程 or 多线程,后续篇章再做分析),如图:

注意: task service总是优先处理I/O事件,因为通常I/O事件都会牵涉到与用户的交互,所以我们希望尽可能快地响应用户

基本实现 -

对于I/O事件 和 超时事件 这里不想熬述太多,I/O事件的处理无非就是利用底层的网络复用模型(例如epoll)返回响应的fd然后回调,而超时事件的管理网上也有诸多方法,这里task service采用的是std::priority_queue(堆结构),每次tick都会从堆中拿出最近事件来检查是否超时

我们直接来关注异步消息的处理是如何实现的:

异步消息处理的关键在于:我们如何制定一个通用的“异步消息体”,让我们的task service看起来这些消息体都是等同的,都是可执行的元素,但现实总是那么的残酷,通常我们总需要执行不同的任务,调用不同的函数,传递不同的参数类型和参数个数

很多动态语言可以通过closure(闭包)来实现这一步,其实c++也可以,首先让我们先来整理一下我们的需求:

我们需要一个closure来打包我们的异步请求(函数和参数)

1. 它必须包含一个任何声明类型的函数执行体(函数指针)

2. 它可以装入任何类型的参数

3. 它可以装入任意个数的参数

对于需求1,我们有模板

对于需求2,我们有模板

对于需求3,我们有函数重载

是的,函数重载,但你一定会问,函数重载只能指定有限个参数,没错,我们一般只要提供最多10个左右的参数的函数重载就行了,毕竟谁会写出那么不结构化的代码,想向一个函数传入十几个以上的参数,如果有,那么ta一定会被后续的维护人员牵肠挂肚。

我们直接来看chaos中实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
template<typename FUNC, typename ARG0>  
class async_method_bind_func_1_t: public async_method_base_t  
{  
public:  
    async_method_bind_func_1_t()  
        : m_func1(NULL)  
    {     
    }     
  
    async_method_bind_func_1_t(FUNC func_, const ARG0& arg0_)  
        : m_func1(func_),  
          m_arg0(arg0_)  
    {     
    }     
  
    virtual void exec()  
    {     
        (*m_func1)(m_arg0);  
    }     
  
private:  
    FUNC            m_func1;  
    ARG0            m_arg0;   
};


以上就是我们实现的一个闭包,包含了一个模板函数的指针,包含了一个模板参数(要支持多个参数只需编写多个重载接口即可),当调用exec函数时,该闭包就会被执行

OK,这个问题是否已经被解决了?不,还没有,c++中存在着两种函数,一种是和对象无关的(全局/静态),一种是和对象有关的(成员函数),我们上面只是解决了和对象无关的函数,接下来我们来看如何解决对象成员函数的打包

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
template<typename T, typename FUNC, typename ARG0>  
class async_method_bind_obj_1_t: public async_method_base_t  
{  
public:  
    async_method_bind_obj_1_t()  
    {     
    }     
  
    async_method_bind_obj_1_t(T instance_, FUNC func1_, const ARG0& arg0_)  
        : m_instance(instance_),  
          m_func1(func1_),  
          m_arg0(arg0_)  
    {     
    }     
  
    virtual void exec()  
    {     
        (m_instance->*m_func1)(m_arg0);  
    }     
  
private:  
    T           m_instance;  
    FUNC        m_func1;  
    ARG0        m_arg0;  
};

我们只需多保存一个对象实例的地址,就能调用到其构造函数

这两种实现方法都是继承自同一个基类,那么对于task service来说,就有了统一的异步消息体的抽象,外界只要通过接口生成不同的async_method_base_t实例,然后投入到task service的队列中,task service就能完成对这些异步消息的执行

接口使用 -

task service的使用也很简单,对于我们上述的三种事件的处理,task service分别提供了三种不同的接口来供你注册

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class task_service_t : private noncopyable_t  
{  
public:  
...  
...  
void register_io_event(  
                        fd_t                    fd_,  
                        int                     event_type_flag_,  
                        callback_on_event_t     callback_       = NULL,  
                        void*                   cb_arg_         = NULL,  
                        bool                    is_persist_     = false  
                      );  
void register_timer(  
                     uint32_t                        interval_,  
                     const time_event_callback_t&    callback_,  
                     bool                            persist_ = false,  
                     time_t                          start_time_ = 0   
                    );  
int post(  
            const async_method_t& async_method_,  
            void* ext_data_ = NULL,  
            task_prior_e prior_ = TASK_PRIOR_NORMAL,  
            bool is_allow_exec_local_ = true  
        );  
...  
...  
};

这三个方法分别代表 注册一个I/O监听事件,注册一个超时事件,投递一个异步消息

我们接下来看如何绑定一个异步消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
//! 绑定一个全局/静态函数,并投递到task service中执行  
task_service.post(  
    async_method_t::bind_func(  
                     &test_static_func, tmp_str  
                             )  
                 );            
  
//! 绑定一个类中的静态函数,并投递到task service中执行  
task_service.post(  
    async_method_t::bind_func(  
                     &foo_t::test_static_func, tmp_str  
                             )  
                 );  
  
//! 绑定一个对象的成员函数,并投递到task service中执行  
task_service.post(  
    async_method_t::bind_memfunc(  
                     &hw, &hw_t::call1, 123456  
                                )  
                 );

PS:chaos中的async_method实现相比boost::function以及c++ 11中的std::bind都要少很多功能,我不使用它们而自己实现的原因在于

1. 不想引入庞大的boost库

2. 不希望chaos依赖于高版本的c++标准库

3. 我希望实现一个轻量级的,足以封装异步消息的类,而不用像boost::function那样做得面面俱到(话说回来boost::function确实很逆天:))

chaos库的task service我就先讲到这,之后我们再写一篇关于task service进阶的一些使用和注意细节

task service和async method的完整源代码大家可到

https://github.com/lyjdamzwf/chaos/tree/master/chaos/task_service

https://github.com/lyjdamzwf/chaos/tree/master/chaos/async_method

下载

个人技术博客: www.cppthinker.com,  专注于服务器开发

建议继续学习

  1. 各消息队列软件产品大比拼 (阅读 6,084)
  2. 浅析手机消息推送设计 (阅读 4,303)
  3. Chaos网络事件库开篇介绍(一) (阅读 3,903)
  4. Feed消息队列架构分析 (阅读 3,723)
  5. storm入门教程 第四章 消息的可靠处理 (阅读 3,103)
  6. Android的Handler机制原理 (阅读 2,904)
  7. ECS 中的消息发布订阅机制 (阅读 2,405)
  8. Chaos网络库(二)- Buffer的设计 (阅读 2,142)
  9. 嵌入主线程消息循环的任务调度器 (阅读 6)