RabbitMQ消息队列
MQ概述
MQ全称Message Queue(消息队列),是在消息的传输过程中保存消息的容器.多用于分布式系统之间进行通信
MQ的优势:
MQ的劣势:
系统可用性降低
AB系统没问题 还需要保证MQ没问题
引入的外部依赖越多,系统稳定性越差
系统复杂度提高
如何保证消息没有重复消费 没有丢失 保证顺序性
一致性问题
给BCD三个系统 如果其中一个处理失败如何处理
使用MQ的前提:
- 生产者不需要从消费者获得反馈
- 容许短暂的不一致性
- 使用收益超过加入成本
RabbitMQ简介
常用的MQ产品,各有侧重
RabbitMQ是一个由erlang开发的AMQP(Advance Message Queue Protocol)的开源实现
AMQP:
高级消息队列协议,是一个网络协议,是应用层协议的一个开放标准,为面向消息的中间件设计.基于此协议的客户端与消息中间件可传递消息,并不 受客户端/中间件不同产品,不同开发语言的限制.2006年 AMQP规范发布.类比Http
2007年 Rabbit技术公司基于AMQP标准开发的RabbitMQ1.0发布.RabbitMQ采用Erlang语言开发,Erlang语言由Ericson设计,专门为开发高并发分布式系统的一种语言,在电信领域使用广泛
RabbitMQ基础架构:
核心概念:
Message:
消息,消息是不具名的,它由消息头和消息体组成,消息体是不透明的,而消息头则由一系列的可选属性相成,这些属性包括routing-key(路由键),priority(相对于其他消息的优先权),delivery-mode(指出该消息可能需要持久性存储)等
Publisher:
消息的生产者,也是一个向交换器发布消息的客户端应用程序
Exchange:
交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列
根据分发规则,匹配查询表中的routing key;
Exchange有四种类型: direct(默认)
,fanout
,topic
,headers
不同类型的转发消息的策略有所区别
Queue
消息队列,用来保存消息直到发送给消费者,它是消息的容器,也是消息的终点.一个消息可投入一个或多个队列.消息一直在队列里面,等待消费者连接到这个队列将其取走
Binding
绑定,用于消息 队列和交换器之间的关联.一个绑定就是基于路由键将交换器和消息队列连起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表. Exchange和Queue的绑定可以是多对多的关系
Connection
网络链接,比如一个TCP链接
Broker
接收和分发消息的应用,RabbitMQ Server就是Message Broker
Virtual host
出于多租户和安全因素设计,把AMQP的基本组件划分到一个虚拟的分组中,类似于网络中的namespace概念.当多个不同用户使用同一个RabbitMQ server提供的服务的时候,可以划分出多个vhost,每个用户在自己的vhost里创建exchange/queue
Channel
信道,多路复用连接中的一条独立的双向数据流通道,信道是建立在真实的TCP连接内的虚拟链接,AMQP命令都是通过信道发出去的,不管是发布消息,订阅队列还是接收消息,这些动作都是通过信道完成,因为对于操作系统来说,建立和销毁TCP都是非常昂贵的开销,所以引入了信道的概念,以复用一条TCP连接.
RabbitMQ 6种工作模式
- 简单模式
- work queues
- Publish/Subscribe 发布与订阅模式
- Routing路由模式
- Topics主题模式
- RPC远程调用模式(不太算MQ)
JMS
Docker下安装RabbitMQ
1
| docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management
|
其中:
- 4369,25672(Erlang发现&集群端口)
- 5672,5671 (AMQP端口) Java代码连接需要5672
- 15672(web管理后台端口)
- 1883,8883(MQTT协议端口)
RabbitMQ入门
1. 搭建示例工程
1.1 创建工程
1.2 添加依赖
往heima-rabbitmq的pom.xml文件中添加如下依赖:
1 2 3 4 5
| <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.6.0</version> </dependency>
|
2. 编写生产者
编写消息生产者com.itheima.producer.Producer_HelloWorld;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
| package com.itheima.producer;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException; import java.util.concurrent.TimeoutException;
public class Producer_HelloWorld { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory();
factory.setHost("106.15.72.229"); factory.setPort(5672); factory.setVirtualHost("/itcast"); factory.setUsername("heima"); factory.setPassword("heima"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel();
channel.queueDeclare("hello_world", true, false, false, null);
String body = "hello rabbitmq~~~";
channel.basicPublish("", "hello_world", null, body.getBytes());
channel.close(); connection.close();
} }
|
在执行上述的消息发送之后;可以登录rabbitMQ的管理控制台,可以发现队列和其消息:
3. 编写消费者
编写消息的消费者com.itheima.consumer.Consumer_HelloWorld;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
| package com.itheima.consumer;
import com.rabbitmq.client.*;
import java.io.IOException; import java.util.concurrent.TimeoutException;
public class Consumer_HelloWorld { public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory(); factory.setHost("106.15.72.229"); factory.setPort(5672); factory.setVirtualHost("/itcast"); factory.setUsername("heima"); factory.setPassword("heima"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel();
channel.queueDeclare("hello_world",true,false,false,null);
Consumer consumer = new DefaultConsumer(channel){
@Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("consumerTag:"+consumerTag); System.out.println("Exchange:"+envelope.getExchange()); System.out.println("RoutingKey:"+envelope.getRoutingKey()); System.out.println("properties:"+properties); System.out.println("body:"+new String(body)); } }; channel.basicConsume("hello_world",true,consumer);
} }
|
4. 小结
抽取创建connection的工具类com.itheima.util.ConnectionUtil;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| package com.itheima.rabbitmq.util;
import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;
public class ConnectionUtil {
public static Connection getConnection() throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("106.15.72.229"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest");
return connectionFactory.newConnection(); } }
|
上述的入门案例中中其实使用的是如下的简单模式:
在上图的模型中,有以下概念:
- P:生产者,也就是要发送消息的程序
- C:消费者:消息的接受者,会一直等待消息到来。
- queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。
RabbitMQ运行机制
AMQP中的消息路由
- AMQP中的消息路由过程和Java开发者熟悉的JMS存在一些差别,AMQP中增加了Exchange和Binding的角色.生产者把消息发布到Exchange上,消息最终到达队列并被消费者接收,而Binding决定交换器的消息应该发送到那个队列
Exchange类型
Exchange分发消息时根据类型的不同,分发策略有区别,目前共有四种类型:
direct fanout topic headers
headers匹配AMQP消息的header而不是路由键,headers交换器和direct交换器完全一致,但性能差很多,目前几乎用不到,直接看其它三种
RabbitMQ整合
引入spring-boot-starter-amqp
配置application.yml
1 2 3 4 5 6 7 8 9 10 11 12
| rabbitmq: host: 192.168.229.130 port: 5672 virtual-host: / publisher-confirms: true publisher-returns: true template: mandatory: true listener: simple: acknowledge-mode: manual
|
编写配置文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| @Autowired RabbitTemplate rabbitTemplate;
@Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); };
@Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.out.println("confirm...["+correlationData+"],"+"失败标志"+ack+"失败原因"+cause); } });
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override public void returnedMessage(Message message, int i, String s, String s1, String s2) {
} }); */ }
|
测试
- AmqpAdmin管理组件
- RabbitTemplate:消息发送处理组件
消息确认机制
保证消息不丢失,可靠抵达,可以使用事务消息,性能下降250倍,为此引入确认机制
- publisher confirmCallback 确认模式
- publisher returnCallback 未投递到queue退回模式
- consumer ack机制
可靠抵达-ConfirmCallback
- spring.rabbitmq.pulisher-confirms=true
- 在创建connectionFactory的时候设置PublisherConfirms(true)选项,开启confirmcallback.
- CorrelationData: 用来表示当前消息唯一性
- 消息只要被broker接受到就会执行confirmCallback,如果是cluster模式,需要所有broker接受到才会调用confirmCallback
- 被broker接收到只能表示message已经到达服务器,并不能保证消息一定会被投递到目标queue里,所以需要用到接下来的returnCallback.
代码
第一步 发送端配置
1
| publisher-confirms: true
|
第二步 放置确认回调
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| @PostConstruct public void initRabbitTemplate(){ rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.out.println("confirm...["+correlationData+"],"+"是否成功"+ack+"失败原因"+cause); } }); }
|
只要消息抵达broker 就ack=true
可靠抵达-ReturnCallback
- spring.rabbitmq.publisher-returns=true
- spring.rabbitmq.template.mandatory=true
- confirm模式里只能保证消息到达broker,不能保证消息准确投递到目标queue里,在有些业务场景下,我们需要保证消息一定要投递到目标queue里,这时候就需要用到return退回模式
- 这样如果未能投递到目标queue里将调用returnCallback,可以记录下详细到投递数据,定期的巡检或者自动纠错都需要那些数据
代码
开启发送端消息抵达队列确认
1 2 3
| template: mandatory: true
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override public void returnedMessage(Message message, int i, String s, String s1, String s2) {
} });
|
消费端 确认
- 默认是自动确认的 只要有消息接收到 客户端会自动确认 服务端移除消息
- 如果收到很多消息 自动回复给服务器ack 只有一个消息确认处理成功 宕机了 发送的消息丢失 手动确认
1 2 3
| listener: simple: acknowledge-mode: manual
|
只要不确认的消息都不算成功处理 处于unacked 即使服务宕机 消息也不会消失
RabbitMQ延时队列
场景: 未付款订单 超过一定时间后 系统自动取消订单并释放占有物品.
常用解决方案:
spring的schedule定时任务轮询数据库
缺点:
消耗系统内存 增加了数据库的压力 存在较大的时间误差
解决:
rabbitmq的消息TTl和死信Exchange结合
消息的TTL(TimeToLive)
消息的TTL
就是消息的存活时间
RabbitMQ 可以对队列和消息分别设置TTL
对队列设置就是队列没有消费者连着的保留时间 也可以对每一个单独的消息做单独的设置 超过了这个时间 我们认为这个消息就死了 称之为死信
如果队列设置了 消息也设置了 那么会取小的 所以一个消息如果被路由到 不同队列 这个消息死亡的时间有可能不一样(不同的队列设置).
这里单讲单个消息的TTL 因为它才是实现延迟任务的关键 可以通过设置消息的expiration
字段或者x-message-ttl
属性来设置时间 两者是一样的效果
Dead Letter Exchanges(DLX)
- 一个消息在满足如下条件下 会进死信路由 记住这里是路由而不是队列 一个路由可以对应很多队列.(死信)
- 一个消息被Consumer拒收了 并且reject方法的参数里requeue是false 也就是说不会被再次放在队列里 被其他消费者使用
- 上面的消息TTL到了 消息过期了
- 队列的长度限制满了 排在前面的消息就会被丢弃或者扔到死信路由上
- Dead Letter Exchange其实就是一种普通的exchange 和创建其它exchange没有两样 只是在某一个设置Dead Letter Exchange的队列中有消息过期了 会自动触发消息的转发 发送到Dead Letter Exchange中去
- 我们既可以控制消息在一段时间后变成死信 又可以控制变成死信的消息被路由到某一个指定的交换机 结合二者 其实就可以实现一个延时队列
代码
创建出对应容器
1 2 3 4
| @RabbitListener(queues = "stock-event-exchange") public void handle(Message message){ }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
| @Configuration public class MyMQConfig { @RabbitListener(queues = "order.release.order.queue") public void listen(OrderEntity entity, Channel channel, Message message) throws IOException { System.out.println("收到过期订单消息,准备关闭订单:"+entity.getOrderSn()); channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); }
@Bean public Queue orderDelayQueue(){ Map<String,Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange","order-event-exchange"); arguments.put("x-dead-routing-key","order.release.order"); arguments.put("x-message-ttl",60000); Queue queue = new Queue("order.delay.queue", true, false, false,arguments); return queue; }
@Bean public Queue orderReleaseOrderQueue(){ Queue queue = new Queue("order.release.order.queue", true, false, false); return queue; }
@Bean public Exchange orderEventExchange(){ Exchange exchange = new TopicExchange("order-event-exchange", true, false); return exchange; }
@Bean public Binding orderCreateOrderBinding(){ Binding binding = new Binding("order.delay.queue", Binding.DestinationType.QUEUE, "order-event-exchange", "order.create.order", null); return binding; }
@Bean public Binding orderReleaseOrderBinding(){ Binding binding = new Binding("order.release.order.queue", Binding.DestinationType.QUEUE, "order-event-exchange", "order.release.order", null); return binding; } }
|
虚拟请求测试
1 2 3 4 5 6 7 8 9 10 11
| @ResponseBody @GetMapping("/test/createOrder") public String createOrdertest(){ OrderEntity orderEntity = new OrderEntity(); orderEntity.setOrderSn(UUID.randomUUID().toString()); orderEntity.setModifyTime(new Date());
rabbitTemplate.convertAndSend("order-event-exchange","order.create.order",orderEntity); return "ok"; }
|
资料参考: 尚硅谷