技术头条 - 一个快速在微博传播文章的方式     搜索本站
您现在的位置首页 --> 算法 --> 初探Thrift客户端异步模式

初探Thrift客户端异步模式

浏览:1297次  出处信息

   背景

   在某项目中,我们广泛使用thrift作为我们内部接口调用的RPC框架,而且基本上都是使用多线程请求等待应答的同步模式。但是在一些情况下(例如大数据量同步),如果可以使用异步模式,可以优化程序结构和提高模块性能。

   分析

   thrift有提供一套异步模式供我们使用,首先我们像往常一样编写thrift协议文件。

namespace cpp uctest

service Test{
    string pingpong(1: string data);
}

   不同的是,需要加入cpp:cob_type来生成代码。生成的代码文件外表与之前的基本相同,但在Test.h和Test.cpp中内涵就丰富了,增加了异步客户端和异步服务器使用的类。

   异步客户端代码有TestCobClient以及它继承的虚拟类。

class TestCobClient : virtual public TestCobClIf {
 public:
  TestCobClient(boost::shared_ptr< ::apache::thrift::async::TAsyncChannel> channel, ::apache::thrift::protocol::TProtocolFactory* protocolFactory) :
    channel_(channel),
    itrans_(new ::apache::thrift::transport::TMemoryBuffer()),
    otrans_(new ::apache::thrift::transport::TMemoryBuffer()),
    piprot_(protocolFactory->getProtocol(itrans_)),
    poprot_(protocolFactory->getProtocol(otrans_)) {
    iprot_ = piprot_.get();
    oprot_ = poprot_.get();
  }
  boost::shared_ptr< ::apache::thrift::async::TAsyncChannel> getChannel() {
    return channel_;
  }
  virtual void completed__(bool /* success */) {}
  void pingpong(std::tr1::function<void(TestCobClient* client)> cob, const std::string& data);
  void send_pingpong(const std::string& data);
  void recv_pingpong(std::string& _return);
 protected:
  boost::shared_ptr< ::apache::thrift::async::TAsyncChannel> channel_;
  boost::shared_ptr< ::apache::thrift::transport::TMemoryBuffer> itrans_;
  boost::shared_ptr< ::apache::thrift::transport::TMemoryBuffer> otrans_;
  boost::shared_ptr< ::apache::thrift::protocol::TProtocol> piprot_;
  boost::shared_ptr< ::apache::thrift::protocol::TProtocol> poprot_;
  ::apache::thrift::protocol::TProtocol* iprot_;
  ::apache::thrift::protocol::TProtocol* oprot_;
};

   从源文件上看,异步功能的核心在于TAsyncChannel,它是用于回调函数注册和异步收发数据。send_pingpong和recv_pingpong分别向缓冲区(TMemoryBuffer)写入和读取数据。而pingpong则通过调用TAsyncChannel的sendAndRecvMessage接口注册回调函数。

   TAsyncChannel作为接口类定义了三个接口函数。

/**
   * Send a message over the channel.
   */
  virtual void sendMessage(const VoidCallback& cob,
    apache::thrift::transport::TMemoryBuffer* message) = 0;

  /**
   * Receive a message from the channel.
   */
  virtual void recvMessage(const VoidCallback& cob,
    apache::thrift::transport::TMemoryBuffer* message) = 0;

  /**
   * Send a message over the channel and receive a response.
   */
  virtual void sendAndRecvMessage(const VoidCallback& cob,
    apache::thrift::transport::TMemoryBuffer* sendBuf,
    apache::thrift::transport::TMemoryBuffer* recvBuf);

   TAsyncChannel目前为止(0.9.1版本)只有一种客户端实现类TEvhttpClientChannel,顾名思义它是基于libevent和http协议实现的。 使用libevent的方法就不在这里累赘了,主要看下sendAndRecvMessage的实现。

void TEvhttpClientChannel::sendAndRecvMessage(
    const VoidCallback& cob,
    apache::thrift::transport::TMemoryBuffer* sendBuf,
    apache::thrift::transport::TMemoryBuffer* recvBuf) {
  cob_ = cob;
  recvBuf_ = recvBuf;

  struct evhttp_request* req = evhttp_request_new(response, this);

  uint8_t* obuf;
  uint32_t sz;
  sendBuf->getBuffer(&obuf, &sz);
  rv = evbuffer_add(req->output_buffer, obuf, sz);

  rv = evhttp_make_request(conn_, req, EVHTTP_REQ_POST, path_.c_str());

}

   通过向evhttp_request中注册相应回调函数respones和传入回调实例本身的指针,在相应时候回调函数中调用TEvhttpClientChannel实例的finish接口完成数据接收,并写入缓存中,供应用层获取使用。

   实验

   有文章认为:使用Thrift异步客户端需要配合使用对应异步服务器才能工作。如果这个观点成立,我们改造目前程序代码的成本就会很高,而且可能会丧失使用Thrift的便捷性。

   通过上述代码的阅读,发现唯一的局限性是服务器必须使用Http的传输层,此外只需要协议层保持一致,并不需要一定使用异步服务器。

   下面我们通过简单代码基于上文“uctest”的协议实现了一个异步客户端和同步服务器。

   服务器代码:

class TestHandler : virtual public TestIf {
 public:
  TestHandler() { }

  void pingpong(std::string& _return, const std::string& data) {
    if(data=="ping")
      printf("[%d] recv ping\n", (int)time(NULL));
  _return = "pong";
  printf("[%d] send pong\n", (int)time(NULL));
  }
};

int main(int argc, char **argv) {
  int port = 9091;

  shared_ptr<TestHandler> handler(new TestHandler());
  shared_ptr<TProcessor> processor(new TestProcessor(handler));
  shared_ptr<TServerTransport> serverTransport(new TServerSocket(port));
  shared_ptr<TTransportFactory> transportFactory(new THttpServerTransportFactory());
  shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());

  TSimpleServer server(processor, serverTransport, transportFactory, protocolFactory);
  server.serve();
  return 0;
}

   客户端代码:

void my_ping_pong(TestCobClient* client)
{
    std::string pong;
    client->recv_pingpong(pong);
    printf("[%d] recv:%s\n", (int)time(NULL), pong.c_str());
}

int main(int argc, char **argv) {
  try{
      event_base* evbase = event_base_new();
      boost::shared_ptr<TAsyncChannel> channel(new TEvhttpClientChannel("127.0.0.1", "/", "127.0.0.1", 9091, evbase));
      TestCobClient client(channel, new TBinaryProtocolFactory());
      function<void(TestCobClient* client)> cob = bind(&my_ping_pong,_1);

      client.pingpong(cob, "ping");
      printf("[%d] ping\n", (int)time(NULL));

      for(int i=0;i<5;i++)
      {
        printf("[%d] running...\n", (int)time(NULL));
        sleep(1);
      }

      event_base_dispatch(evbase);
      event_base_free(evbase);
    }

    catch(...)
    {
      printf("exception");
      return 1;
    }

    return 0;
};

   运行结果如下:

   [tangzheng@dev10 server]$ ./demo.serv

   [1388639886] recv ping

   [1388639886] send pong

   [tangzheng@dev10 client]$ ./demo.client

   [1388639881] ping

   [1388639881] running…

   [1388639882] running…

   [1388639883] running…

   [1388639884] running…

   [1388639885] running…

   [1388639886] recv:pong

   达到异步客户端预期的效果。

   结果

   初步掌握了thrift异步客户端的用法,我们即可在需要的时候使用,或者优化当前的程序。 由于这种提供的异步模式必须基于HTTP传输层,使用有一定的局限性。之后将会继续研究是否可以在TAsyncChannel的基础上,开发支持其他传输层的接口。

建议继续学习:

  1. Linux大棚版Thrift入门教程    (阅读:23686)
  2. 关于IO的同步,异步,阻塞,非阻塞    (阅读:14593)
  3. fsockopen 异步处理    (阅读:9117)
  4. 配合jquery实现异步加载页面元素    (阅读:5401)
  5. HBase Thrift 接口使用注意事项    (阅读:5480)
  6. 使用django+celery+RabbitMQ实现异步执行    (阅读:5108)
  7. Thrift简析    (阅读:4878)
  8. Apache Avro 与 Thrift 比较    (阅读:4636)
  9. 异步编程与响应式框架    (阅读:3989)
  10. 多核与异步并行    (阅读:3994)
QQ技术交流群:445447336,欢迎加入!
扫一扫订阅我的微信号:IT技术博客大学习
© 2009 - 2025 by blogread.cn 微博:@IT技术博客大学习

京ICP备15002552号-1