IT技术博客大学习 共学习 共进步
全部 移动开发 后端 数据库 AI 算法 安全 DevOps 前端 设计 开发者

rabbitmq java client api详解

五四陈科学院 2014-09-17 12:19:04 累计浏览 1,457 次
本机暂存

AMQP

AMQP协议是一个高级抽象层消息通信协议,RabbitMQ是AMQP协议的实现。

基础概念快速入门

每个rabbitmq-server叫做一个Broker,等着tcp连接进入。

在rabbitmq-server进程内有Exchange,定义了这个消息的发送类型。(一对多、直连、多对多等等)

Queue是进程内的逻辑队列,有多个,有名字。

Binding联系Exchane与Queue。

Routing key由生产者指定。Binding key由消费者指定。二者联合决定一条消息的来去。

java client api

连接

ConnectionFactory factory = new ConnectionFactory();
factory.setHost(hostName);
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();

以上是得到一个rabbitmq连接最最基础的代码,当然了,还可以设置一些诸如用户名密码的事情。

最后这个channel就可以用来收和发消息了。

消息者线程池

ExecutorService es = Executors.newFixedThreadPool(20);
Connection conn = factory.newConnection(es);

消费者时使用,上述自动开了一20个线程的池来搞。

地址数组

Address[] addrArr = new Address[]{ new Address(hostname1, portnumber1)
                                 , new Address(hostname2, portnumber2)};
Connection conn = factory.newConnection(addrArr);

上述代码如果连hostname1失败了就去hostname2。

factory.newConnection()会触发这个检测。

声明exchange与queue

channel.exchangeDeclare(exchangeName, "direct", true);
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, exchangeName, routingKey);

channel.exchangeDeclare 参数有 交换机名字  类型  是否持久化  不使用时是否自动删除 是否是内部的(不能被客户端使用) 其他参数

channel.queueDeclare 参数有 queue名字 是否持久化 独占的queue(仅供此连接) 不使用时是否自动删除 其他参数

channel.queueBind 参数有 queue名字 交换机名字 此次绑定使用的路由关键字  其他参数

发出消息

byte[] messageBodyBytes = "Hello, world!".getBytes();
channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);

channel.basicPublish 参数有 要发出的交换机名字  路由关键字 是否强制(设置为true时,找不到收的人时可以通过returnListener返回)  是否立即(其实rabbitmq不支持) 其他属性 消息主体

线程安全

Channel是线程好全的,但是最好是每个线程里用自己的Channel,因为在单个Channel里排队是有可能慢一些的。

最简单的办法消费消息

boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "myConsumerTag",
     new DefaultConsumer(channel) {
         @Override
         public void handleDelivery(String consumerTag,
                                    Envelope envelope,
                                    AMQP.BasicProperties properties,
                                    byte[] body)
             throws IOException
         {
             String routingKey = envelope.getRoutingKey();
             String contentType = properties.contentType;
             long deliveryTag = envelope.getDeliveryTag();
             // (process the message components here ...)
             channel.basicAck(deliveryTag, false);
         }
     });

一个Channel一个Consumer。

channel.basicAck 回发ACK 参数 tag 是否多个。

零碎

channel.basicQos 指定服务质量设置 参数 最大的投送字节数  最大的投送消息数量  设置是否要应用到整个channel(而不是一个消费者)。

factory.setAutomaticRecoveryEnabled(true) 网络有问题时,好后可自动恢复设置。

cf.setRequestedHeartbeat(5) 设置心跳时间。

exchange type可用的值:direct topic headers fanout。

exchange的类型有一个default,basicPublish没有指定时使用,而且,如果routingKey在指定绑定的时候,会去到绑定的exchange。

channel.queueDeclare().getQueue() 得到的是一个随机queue,断开连接后即删除。

当exchange为direct的时候routingKey与bindingKey必须完全一致才能消费消息。

同分类推荐文章

  1. 等了十年的 Go 链式管道,终于来了:seq 让你像写 Scala 一样写 Go (2026-06-25 18:38:18)
  2. Go 实验特性详解 (2026-06-21 10:05:27)
  3. amd64 微架构级别对 Go 程序性能提升多少? (2026-06-21 09:38:49)

查看更多 后端 文章 →

建议继续学习

  1. SmartSprites - 命令行形式的CSS Sprites生成器 (累计阅读 123,894)
  2. Java开发岗位面试题归类汇总 (累计阅读 22,155)
  3. android 开发入门 (累计阅读 19,527)
  4. 我的PHP,Python和Ruby之路 (累计阅读 13,146)
  5. HashMap解决hash冲突的方法 (累计阅读 12,653)
  6. 一个大二学生有关如何成为一名软件工程师的疑问及答复 (累计阅读 9,178)
  7. Java程序员应该知道的10个eclipse调试技巧 (累计阅读 8,012)
  8. 如何让员工忠于公司? (累计阅读 7,939)
  9. Java技术路线 (累计阅读 7,725)
  10. 聊聊ThoughtWorks面试 (累计阅读 7,614)