rabbitmq java client api详解
AMQP
AMQP协议是一个高级抽象层消息通信协议,RabbitMQ是AMQP协议的实现。
基础概念快速入门
每个rabbitmq-server叫做一个Broker,等着tcp连接进入。
在rabbitmq-server进程内有Exchange,定义了这个消息的发送类型。(一对多、直连、多对多等等)
Queue是进程内的逻辑队列,有多个,有名字。
Binding联系Exchane与Queue。
Routing key由生产者指定。Binding key由消费者指定。二者联合决定一条消息的来去。
java client api
连接
1 2 3 4 | ConnectionFactory factory = new ConnectionFactory(); factory.setHost(hostName); Connection conn = factory.newConnection(); Channel channel = conn.createChannel(); |
以上是得到一个rabbitmq连接最最基础的代码,当然了,还可以设置一些诸如用户名密码的事情。
最后这个channel就可以用来收和发消息了。
消息者线程池
1 2 | ExecutorService es = Executors.newFixedThreadPool(20); Connection conn = factory.newConnection(es); |
消费者时使用,上述自动开了一20个线程的池来搞。
地址数组
1 2 3 | Address[] addrArr = new Address[]{ new Address(hostname1, portnumber1) , new Address(hostname2, portnumber2)}; Connection conn = factory.newConnection(addrArr); |
上述代码如果连hostname1失败了就去hostname2。
factory.newConnection()会触发这个检测。
声明exchange与queue
1 2 3 | channel.exchangeDeclare(exchangeName, "direct", true); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, exchangeName, routingKey); |
channel.exchangeDeclare 参数有 交换机名字 类型 是否持久化 不使用时是否自动删除 是否是内部的(不能被客户端使用) 其他参数
channel.queueDeclare 参数有 queue名字 是否持久化 独占的queue(仅供此连接) 不使用时是否自动删除 其他参数
channel.queueBind 参数有 queue名字 交换机名字 此次绑定使用的路由关键字 其他参数
发出消息
1 2 | byte[] messageBodyBytes = "Hello, world!".getBytes(); channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes); |
channel.basicPublish 参数有 要发出的交换机名字 路由关键字 是否强制(设置为true时,找不到收的人时可以通过returnListener返回) 是否立即(其实rabbitmq不支持) 其他属性 消息主体
线程安全
Channel是线程好全的,但是最好是每个线程里用自己的Channel,因为在单个Channel里排队是有可能慢一些的。
最简单的办法消费消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | boolean autoAck = false; channel.basicConsume(queueName, autoAck, "myConsumerTag", new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String routingKey = envelope.getRoutingKey(); String contentType = properties.contentType; long deliveryTag = envelope.getDeliveryTag(); // (process the message components here ...) channel.basicAck(deliveryTag, false); } }); |
一个Channel一个Consumer。
channel.basicAck 回发ACK 参数 tag 是否多个。
零碎
channel.basicQos 指定服务质量设置 参数 最大的投送字节数 最大的投送消息数量 设置是否要应用到整个channel(而不是一个消费者)。
factory.setAutomaticRecoveryEnabled(true) 网络有问题时,好后可自动恢复设置。
cf.setRequestedHeartbeat(5) 设置心跳时间。
exchange type可用的值:direct topic headers fanout。
exchange的类型有一个default,basicPublish没有指定时使用,而且,如果routingKey在指定绑定的时候,会去到绑定的exchange。
channel.queueDeclare().getQueue() 得到的是一个随机queue,断开连接后即删除。
当exchange为direct的时候routingKey与bindingKey必须完全一致才能消费消息。
建议继续学习:
- 使用django+celery+RabbitMQ实现异步执行 (阅读:5004)
- RabbitMQ与Redis队列对比 (阅读:2939)
- 分布式消息系统尝试(rabbitmq, celery, redis) (阅读:2584)
扫一扫订阅我的微信号:IT技术博客大学习
- 作者:五四陈科学院 来源: 五四陈科学院
- 标签: AMQP RabbitMQ
- 发布时间:2014-09-17 12:19:04
- [70] Go Reflect 性能
- [68] 如何拿下简短的域名
- [65] Oracle MTS模式下 进程地址与会话信
- [63] 图书馆的世界纪录
- [62] IOS安全–浅谈关于IOS加固的几种方法
- [61] 【社会化设计】自我(self)部分――欢迎区
- [59] android 开发入门
- [54] 视觉调整-设计师 vs. 逻辑
- [49] 界面设计速成
- [48] 读书笔记-壹百度:百度十年千倍的29条法则