技术头条 - 一个快速在微博传播文章的方式     搜索本站
您现在的位置首页 --> 系统架构 --> Chaos网络库(三)- 主循环及异步消息的实现

Chaos网络库(三)- 主循环及异步消息的实现

浏览:1455次  出处信息

基本原理 -

在chaos开篇介绍(http://www.cppthinker.com/chaos/57/chaos_1)中已经提到,task service作为chaos库的核心,主要承担着三个重则:

1. 网络I/O

2. 超时事件

3. 异步消息处理

简单来讲,可以认为一个task service中包含一个epollfd,一个定时事件管理器,一个等待被处理的异步消息队列

而task service会有一个主循环来周而复始地执行这些任务(单线程 or 多线程,后续篇章再做分析),如图:

注意: task service总是优先处理I/O事件,因为通常I/O事件都会牵涉到与用户的交互,所以我们希望尽可能快地响应用户

 

基本实现 -

对于I/O事件 和 超时事件 这里不想熬述太多,I/O事件的处理无非就是利用底层的网络复用模型(例如epoll)返回响应的fd然后回调,而超时事件的管理网上也有诸多方法,这里task service采用的是std::priority_queue(堆结构),每次tick都会从堆中拿出最近事件来检查是否超时

我们直接来关注异步消息的处理是如何实现的:

异步消息处理的关键在于:我们如何制定一个通用的“异步消息体”,让我们的task service看起来这些消息体都是等同的,都是可执行的元素,但现实总是那么的残酷,通常我们总需要执行不同的任务,调用不同的函数,传递不同的参数类型和参数个数

很多动态语言可以通过closure(闭包)来实现这一步,其实c++也可以,首先让我们先来整理一下我们的需求:

我们需要一个closure来打包我们的异步请求(函数和参数)

1. 它必须包含一个任何声明类型的函数执行体(函数指针)

2. 它可以装入任何类型的参数

3. 它可以装入任意个数的参数

对于需求1,我们有模板

对于需求2,我们有模板

对于需求3,我们有函数重载

是的,函数重载,但你一定会问,函数重载只能指定有限个参数,没错,我们一般只要提供最多10个左右的参数的函数重载就行了,毕竟谁会写出那么不结构化的代码,想向一个函数传入十几个以上的参数,如果有,那么ta一定会被后续的维护人员牵肠挂肚。

我们直接来看chaos中实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
template<typename FUNC, typename ARG0>  
class async_method_bind_func_1_t: public async_method_base_t  
{  
public:  
    async_method_bind_func_1_t()  
        : m_func1(NULL)  
    {     
    }     
  
    async_method_bind_func_1_t(FUNC func_, const ARG0& arg0_)  
        : m_func1(func_),  
          m_arg0(arg0_)  
    {     
    }     
  
    virtual void exec()  
    {     
        (*m_func1)(m_arg0);  
    }     
  
private:  
    FUNC            m_func1;  
    ARG0            m_arg0;   
};


以上就是我们实现的一个闭包,包含了一个模板函数的指针,包含了一个模板参数(要支持多个参数只需编写多个重载接口即可),当调用exec函数时,该闭包就会被执行

OK,这个问题是否已经被解决了?不,还没有,c++中存在着两种函数,一种是和对象无关的(全局/静态),一种是和对象有关的(成员函数),我们上面只是解决了和对象无关的函数,接下来我们来看如何解决对象成员函数的打包

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
template<typename T, typename FUNC, typename ARG0>  
class async_method_bind_obj_1_t: public async_method_base_t  
{  
public:  
    async_method_bind_obj_1_t()  
    {     
    }     
  
    async_method_bind_obj_1_t(T instance_, FUNC func1_, const ARG0& arg0_)  
        : m_instance(instance_),  
          m_func1(func1_),  
          m_arg0(arg0_)  
    {     
    }     
  
    virtual void exec()  
    {     
        (m_instance->*m_func1)(m_arg0);  
    }     
  
private:  
    T           m_instance;  
    FUNC        m_func1;  
    ARG0        m_arg0;  
};

我们只需多保存一个对象实例的地址,就能调用到其构造函数

这两种实现方法都是继承自同一个基类,那么对于task service来说,就有了统一的异步消息体的抽象,外界只要通过接口生成不同的async_method_base_t实例,然后投入到task service的队列中,task service就能完成对这些异步消息的执行

 

接口使用 -

task service的使用也很简单,对于我们上述的三种事件的处理,task service分别提供了三种不同的接口来供你注册

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class task_service_t : private noncopyable_t  
{  
public:  
...  
...  
void register_io_event(  
                        fd_t                    fd_,  
                        int                     event_type_flag_,  
                        callback_on_event_t     callback_       = NULL,  
                        void*                   cb_arg_         = NULL,  
                        bool                    is_persist_     = false  
                      );  
void register_timer(  
                     uint32_t                        interval_,  
                     const time_event_callback_t&    callback_,  
                     bool                            persist_ = false,  
                     time_t                          start_time_ = 0   
                    );  
int post(  
            const async_method_t& async_method_,  
            void* ext_data_ = NULL,  
            task_prior_e prior_ = TASK_PRIOR_NORMAL,  
            bool is_allow_exec_local_ = true  
        );  
...  
...  
};

这三个方法分别代表 注册一个I/O监听事件,注册一个超时事件,投递一个异步消息

我们接下来看如何绑定一个异步消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
//! 绑定一个全局/静态函数,并投递到task service中执行  
task_service.post(  
    async_method_t::bind_func(  
                     &test_static_func, tmp_str  
                             )  
                 );            
  
//! 绑定一个类中的静态函数,并投递到task service中执行  
task_service.post(  
    async_method_t::bind_func(  
                     &foo_t::test_static_func, tmp_str  
                             )  
                 );  
  
//! 绑定一个对象的成员函数,并投递到task service中执行  
task_service.post(  
    async_method_t::bind_memfunc(  
                     &hw, &hw_t::call1, 123456  
                                )  
                 );

 

PS:chaos中的async_method实现相比boost::function以及c++ 11中的std::bind都要少很多功能,我不使用它们而自己实现的原因在于

1. 不想引入庞大的boost库

2. 不希望chaos依赖于高版本的c++标准库

3. 我希望实现一个轻量级的,足以封装异步消息的类,而不用像boost::function那样做得面面俱到(话说回来boost::function确实很逆天:))

 

chaos库的task service我就先讲到这,之后我们再写一篇关于task service进阶的一些使用和注意细节

task service和async method的完整源代码大家可到

https://github.com/lyjdamzwf/chaos/tree/master/chaos/task_service

https://github.com/lyjdamzwf/chaos/tree/master/chaos/async_method

下载

 

个人技术博客: www.cppthinker.com,  专注于服务器开发

建议继续学习:

  1. 各消息队列软件产品大比拼    (阅读:5155)
  2. 浅析手机消息推送设计    (阅读:3235)
  3. Chaos网络事件库开篇介绍(一)    (阅读:2761)
  4. Feed消息队列架构分析    (阅读:2615)
  5. storm入门教程 第四章 消息的可靠处理    (阅读:2060)
  6. Android的Handler机制原理    (阅读:1868)
  7. Chaos网络库(二)- Buffer的设计    (阅读:1450)
  8. ECS 中的消息发布订阅机制    (阅读:1037)
QQ技术交流群:445447336,欢迎加入!
扫一扫订阅我的微信号:IT技术博客大学习
© 2009 - 2024 by blogread.cn 微博:@IT技术博客大学习

京ICP备15002552号-1