初探Thrift客户端异步模式
背景
在某项目中,我们广泛使用thrift作为我们内部接口调用的RPC框架,而且基本上都是使用多线程请求等待应答的同步模式。但是在一些情况下(例如大数据量同步),如果可以使用异步模式,可以优化程序结构和提高模块性能。
分析
thrift有提供一套异步模式供我们使用,首先我们像往常一样编写thrift协议文件。
namespace cpp uctest service Test{ string pingpong(1: string data); }
不同的是,需要加入cpp:cob_type来生成代码。生成的代码文件外表与之前的基本相同,但在Test.h和Test.cpp中内涵就丰富了,增加了异步客户端和异步服务器使用的类。
异步客户端代码有TestCobClient以及它继承的虚拟类。
class TestCobClient : virtual public TestCobClIf { public: TestCobClient(boost::shared_ptr< ::apache::thrift::async::TAsyncChannel> channel, ::apache::thrift::protocol::TProtocolFactory* protocolFactory) : channel_(channel), itrans_(new ::apache::thrift::transport::TMemoryBuffer()), otrans_(new ::apache::thrift::transport::TMemoryBuffer()), piprot_(protocolFactory->getProtocol(itrans_)), poprot_(protocolFactory->getProtocol(otrans_)) { iprot_ = piprot_.get(); oprot_ = poprot_.get(); } boost::shared_ptr< ::apache::thrift::async::TAsyncChannel> getChannel() { return channel_; } virtual void completed__(bool /* success */) {} void pingpong(std::tr1::function<void(TestCobClient* client)> cob, const std::string& data); void send_pingpong(const std::string& data); void recv_pingpong(std::string& _return); protected: boost::shared_ptr< ::apache::thrift::async::TAsyncChannel> channel_; boost::shared_ptr< ::apache::thrift::transport::TMemoryBuffer> itrans_; boost::shared_ptr< ::apache::thrift::transport::TMemoryBuffer> otrans_; boost::shared_ptr< ::apache::thrift::protocol::TProtocol> piprot_; boost::shared_ptr< ::apache::thrift::protocol::TProtocol> poprot_; ::apache::thrift::protocol::TProtocol* iprot_; ::apache::thrift::protocol::TProtocol* oprot_; };
从源文件上看,异步功能的核心在于TAsyncChannel,它是用于回调函数注册和异步收发数据。send_pingpong和recv_pingpong分别向缓冲区(TMemoryBuffer)写入和读取数据。而pingpong则通过调用TAsyncChannel的sendAndRecvMessage接口注册回调函数。
TAsyncChannel作为接口类定义了三个接口函数。
/** * Send a message over the channel. */ virtual void sendMessage(const VoidCallback& cob, apache::thrift::transport::TMemoryBuffer* message) = 0; /** * Receive a message from the channel. */ virtual void recvMessage(const VoidCallback& cob, apache::thrift::transport::TMemoryBuffer* message) = 0; /** * Send a message over the channel and receive a response. */ virtual void sendAndRecvMessage(const VoidCallback& cob, apache::thrift::transport::TMemoryBuffer* sendBuf, apache::thrift::transport::TMemoryBuffer* recvBuf);
TAsyncChannel目前为止(0.9.1版本)只有一种客户端实现类TEvhttpClientChannel,顾名思义它是基于libevent和http协议实现的。 使用libevent的方法就不在这里累赘了,主要看下sendAndRecvMessage的实现。
void TEvhttpClientChannel::sendAndRecvMessage( const VoidCallback& cob, apache::thrift::transport::TMemoryBuffer* sendBuf, apache::thrift::transport::TMemoryBuffer* recvBuf) { cob_ = cob; recvBuf_ = recvBuf; struct evhttp_request* req = evhttp_request_new(response, this); uint8_t* obuf; uint32_t sz; sendBuf->getBuffer(&obuf, &sz); rv = evbuffer_add(req->output_buffer, obuf, sz); rv = evhttp_make_request(conn_, req, EVHTTP_REQ_POST, path_.c_str()); }
通过向evhttp_request中注册相应回调函数respones和传入回调实例本身的指针,在相应时候回调函数中调用TEvhttpClientChannel实例的finish接口完成数据接收,并写入缓存中,供应用层获取使用。
实验
有文章认为:使用Thrift异步客户端需要配合使用对应异步服务器才能工作。如果这个观点成立,我们改造目前程序代码的成本就会很高,而且可能会丧失使用Thrift的便捷性。
通过上述代码的阅读,发现唯一的局限性是服务器必须使用Http的传输层,此外只需要协议层保持一致,并不需要一定使用异步服务器。
下面我们通过简单代码基于上文“uctest”的协议实现了一个异步客户端和同步服务器。
服务器代码:
class TestHandler : virtual public TestIf { public: TestHandler() { } void pingpong(std::string& _return, const std::string& data) { if(data=="ping") printf("[%d] recv ping\n", (int)time(NULL)); _return = "pong"; printf("[%d] send pong\n", (int)time(NULL)); } }; int main(int argc, char **argv) { int port = 9091; shared_ptr<TestHandler> handler(new TestHandler()); shared_ptr<TProcessor> processor(new TestProcessor(handler)); shared_ptr<TServerTransport> serverTransport(new TServerSocket(port)); shared_ptr<TTransportFactory> transportFactory(new THttpServerTransportFactory()); shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory()); TSimpleServer server(processor, serverTransport, transportFactory, protocolFactory); server.serve(); return 0; }
客户端代码:
void my_ping_pong(TestCobClient* client) { std::string pong; client->recv_pingpong(pong); printf("[%d] recv:%s\n", (int)time(NULL), pong.c_str()); } int main(int argc, char **argv) { try{ event_base* evbase = event_base_new(); boost::shared_ptr<TAsyncChannel> channel(new TEvhttpClientChannel("127.0.0.1", "/", "127.0.0.1", 9091, evbase)); TestCobClient client(channel, new TBinaryProtocolFactory()); function<void(TestCobClient* client)> cob = bind(&my_ping_pong,_1); client.pingpong(cob, "ping"); printf("[%d] ping\n", (int)time(NULL)); for(int i=0;i<5;i++) { printf("[%d] running...\n", (int)time(NULL)); sleep(1); } event_base_dispatch(evbase); event_base_free(evbase); } catch(...) { printf("exception"); return 1; } return 0; };
运行结果如下:
[tangzheng@dev10 server]$ ./demo.serv
[1388639886] recv ping
[1388639886] send pong
[tangzheng@dev10 client]$ ./demo.client
[1388639881] ping
[1388639881] running…
[1388639882] running…
[1388639883] running…
[1388639884] running…
[1388639885] running…
[1388639886] recv:pong
达到异步客户端预期的效果。
结果
初步掌握了thrift异步客户端的用法,我们即可在需要的时候使用,或者优化当前的程序。 由于这种提供的异步模式必须基于HTTP传输层,使用有一定的局限性。之后将会继续研究是否可以在TAsyncChannel的基础上,开发支持其他传输层的接口。
建议继续学习:
- Linux大棚版Thrift入门教程 (阅读:23557)
- 关于IO的同步,异步,阻塞,非阻塞 (阅读:14455)
- fsockopen 异步处理 (阅读:9016)
- 配合jquery实现异步加载页面元素 (阅读:5367)
- HBase Thrift 接口使用注意事项 (阅读:5391)
- 使用django+celery+RabbitMQ实现异步执行 (阅读:5005)
- Thrift简析 (阅读:4840)
- Apache Avro 与 Thrift 比较 (阅读:4502)
- 异步编程与响应式框架 (阅读:3890)
- 多核与异步并行 (阅读:3862)
扫一扫订阅我的微信号:IT技术博客大学习
- 作者:tangzheng 来源: UC技术博客
- 标签: Thrift 异步
- 发布时间:2014-11-28 23:03:09
- [68] Go Reflect 性能
- [68] 如何拿下简短的域名
- [67] Oracle MTS模式下 进程地址与会话信
- [62] IOS安全–浅谈关于IOS加固的几种方法
- [61] 图书馆的世界纪录
- [60] 【社会化设计】自我(self)部分――欢迎区
- [58] android 开发入门
- [56] 视觉调整-设计师 vs. 逻辑
- [49] 给自己的字体课(一)——英文字体基础
- [48] 读书笔记-壹百度:百度十年千倍的29条法则