技术头条 - 一个快速在微博传播文章的方式     搜索本站
您现在的位置首页 --> 其他 --> rabbitmq java client api详解

rabbitmq java client api详解

浏览:866次  出处信息

AMQP

AMQP协议是一个高级抽象层消息通信协议,RabbitMQ是AMQP协议的实现。

基础概念快速入门

每个rabbitmq-server叫做一个Broker,等着tcp连接进入。

在rabbitmq-server进程内有Exchange,定义了这个消息的发送类型。(一对多、直连、多对多等等)

Queue是进程内的逻辑队列,有多个,有名字。

Binding联系Exchane与Queue。

Routing key由生产者指定。Binding key由消费者指定。二者联合决定一条消息的来去。

java client api

连接

1
2
3
4
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(hostName);
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();

以上是得到一个rabbitmq连接最最基础的代码,当然了,还可以设置一些诸如用户名密码的事情。

最后这个channel就可以用来收和发消息了。

消息者线程池

1
2
ExecutorService es = Executors.newFixedThreadPool(20);
Connection conn = factory.newConnection(es);

消费者时使用,上述自动开了一20个线程的池来搞。

地址数组

1
2
3
Address[] addrArr = new Address[]{ new Address(hostname1, portnumber1)
                                 , new Address(hostname2, portnumber2)};
Connection conn = factory.newConnection(addrArr);

上述代码如果连hostname1失败了就去hostname2。

factory.newConnection()会触发这个检测。

声明exchange与queue

1
2
3
channel.exchangeDeclare(exchangeName, "direct", true);
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, exchangeName, routingKey);

channel.exchangeDeclare 参数有 交换机名字  类型  是否持久化  不使用时是否自动删除 是否是内部的(不能被客户端使用) 其他参数

channel.queueDeclare 参数有 queue名字 是否持久化 独占的queue(仅供此连接) 不使用时是否自动删除 其他参数

channel.queueBind 参数有 queue名字 交换机名字 此次绑定使用的路由关键字  其他参数

发出消息

1
2
byte[] messageBodyBytes = "Hello, world!".getBytes();
channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);

channel.basicPublish 参数有 要发出的交换机名字  路由关键字 是否强制(设置为true时,找不到收的人时可以通过returnListener返回)  是否立即(其实rabbitmq不支持) 其他属性 消息主体

线程安全

Channel是线程好全的,但是最好是每个线程里用自己的Channel,因为在单个Channel里排队是有可能慢一些的。

最简单的办法消费消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "myConsumerTag",
     new DefaultConsumer(channel) {
         @Override
         public void handleDelivery(String consumerTag,
                                    Envelope envelope,
                                    AMQP.BasicProperties properties,
                                    byte[] body)
             throws IOException
         {
             String routingKey = envelope.getRoutingKey();
             String contentType = properties.contentType;
             long deliveryTag = envelope.getDeliveryTag();
             // (process the message components here ...)
             channel.basicAck(deliveryTag, false);
         }
     });

一个Channel一个Consumer。

channel.basicAck 回发ACK 参数 tag 是否多个。

零碎

channel.basicQos 指定服务质量设置 参数 最大的投送字节数  最大的投送消息数量  设置是否要应用到整个channel(而不是一个消费者)。

factory.setAutomaticRecoveryEnabled(true) 网络有问题时,好后可自动恢复设置。

cf.setRequestedHeartbeat(5) 设置心跳时间。

exchange type可用的值:direct topic headers fanout。

exchange的类型有一个default,basicPublish没有指定时使用,而且,如果routingKey在指定绑定的时候,会去到绑定的exchange。

channel.queueDeclare().getQueue() 得到的是一个随机queue,断开连接后即删除。

当exchange为direct的时候routingKey与bindingKey必须完全一致才能消费消息。

建议继续学习:

  1. 使用django+celery+RabbitMQ实现异步执行    (阅读:5109)
  2. RabbitMQ与Redis队列对比    (阅读:3084)
  3. 分布式消息系统尝试(rabbitmq, celery, redis)    (阅读:2709)
QQ技术交流群:445447336,欢迎加入!
扫一扫订阅我的微信号:IT技术博客大学习
© 2009 - 2025 by blogread.cn 微博:@IT技术博客大学习

京ICP备15002552号-1