RabbitMQ消息队列

MQ概述

MQ全称Message Queue(消息队列),是在消息的传输过程中保存消息的容器.多用于分布式系统之间进行通信

image-20210315234119123

  • 分布式系统通信方式:
    • 直接远程调用
    • 借助第三方完成间接通信

MQ的优势:

  • 应用解耦
  • 异步提速
  • 削峰填谷

MQ的劣势:

  • 系统可用性降低

    AB系统没问题 还需要保证MQ没问题

    引入的外部依赖越多,系统稳定性越差

  • 系统复杂度提高

    如何保证消息没有重复消费 没有丢失 保证顺序性

  • 一致性问题

    给BCD三个系统 如果其中一个处理失败如何处理

使用MQ的前提:

  1. 生产者不需要从消费者获得反馈
  2. 容许短暂的不一致性
  3. 使用收益超过加入成本

RabbitMQ简介

常用的MQ产品,各有侧重

image-20210315235521543

RabbitMQ是一个由erlang开发的AMQP(Advance Message Queue Protocol)的开源实现

AMQP:

高级消息队列协议,是一个网络协议,是应用层协议的一个开放标准,为面向消息的中间件设计.基于此协议的客户端与消息中间件可传递消息,并不 受客户端/中间件不同产品,不同开发语言的限制.2006年 AMQP规范发布.类比Http

2007年 Rabbit技术公司基于AMQP标准开发的RabbitMQ1.0发布.RabbitMQ采用Erlang语言开发,Erlang语言由Ericson设计,专门为开发高并发分布式系统的一种语言,在电信领域使用广泛

RabbitMQ基础架构:

image-20210316000417576

核心概念:

  • Message:

    消息,消息是不具名的,它由消息头和消息体组成,消息体是不透明的,而消息头则由一系列的可选属性相成,这些属性包括routing-key(路由键),priority(相对于其他消息的优先权),delivery-mode(指出该消息可能需要持久性存储)等

  • Publisher:

    消息的生产者,也是一个向交换器发布消息的客户端应用程序

  • Exchange:

    交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列

    根据分发规则,匹配查询表中的routing key;

    Exchange有四种类型: direct(默认),fanout,topic,headers

    不同类型的转发消息的策略有所区别

  • Queue

    消息队列,用来保存消息直到发送给消费者,它是消息的容器,也是消息的终点.一个消息可投入一个或多个队列.消息一直在队列里面,等待消费者连接到这个队列将其取走

  • Binding

    绑定,用于消息 队列交换器之间的关联.一个绑定就是基于路由键将交换器和消息队列连起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表. Exchange和Queue的绑定可以是多对多的关系

  • Connection

    网络链接,比如一个TCP链接

  • Broker

    接收和分发消息的应用,RabbitMQ Server就是Message Broker

  • Virtual host

    出于多租户和安全因素设计,把AMQP的基本组件划分到一个虚拟的分组中,类似于网络中的namespace概念.当多个不同用户使用同一个RabbitMQ server提供的服务的时候,可以划分出多个vhost,每个用户在自己的vhost里创建exchange/queue

  • Channel

    信道,多路复用连接中的一条独立的双向数据流通道,信道是建立在真实的TCP连接内的虚拟链接,AMQP命令都是通过信道发出去的,不管是发布消息,订阅队列还是接收消息,这些动作都是通过信道完成,因为对于操作系统来说,建立和销毁TCP都是非常昂贵的开销,所以引入了信道的概念,以复用一条TCP连接.

RabbitMQ 6种工作模式

  • 简单模式
  • work queues
  • Publish/Subscribe 发布与订阅模式
  • Routing路由模式
  • Topics主题模式
  • RPC远程调用模式(不太算MQ)

image-20210316001142649

JMS

  • JMS即Java消息服务应用程序接口,是一个Java平台中关于面向消息中间件的API

    类比JDBC

  • 很多消息中间件实现了JMS 但是RabbitMQ没有

Docker下安装RabbitMQ

1
docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management

其中:

  • 4369,25672(Erlang发现&集群端口)
  • 5672,5671 (AMQP端口) Java代码连接需要5672
  • 15672(web管理后台端口)
  • 1883,8883(MQTT协议端口)

RabbitMQ入门

1. 搭建示例工程

1.1 创建工程

1.2 添加依赖

往heima-rabbitmq的pom.xml文件中添加如下依赖:

1
2
3
4
5
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.6.0</version>
</dependency>

2. 编写生产者

编写消息生产者com.itheima.producer.Producer_HelloWorld;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package com.itheima.producer;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
* 发送消息
*/
public class Producer_HelloWorld {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();

//2. 设置参数
factory.setHost("106.15.72.229");//ip 默认值 localhost
factory.setPort(5672); //端口 默认值 5672
factory.setVirtualHost("/itcast");//虚拟机 默认值/
factory.setUsername("heima");//用户名 默认 guest
factory.setPassword("heima");//密码 默认值 guest
//3. 创建连接 Connection
Connection connection = factory.newConnection();
//4. 创建Channel
Channel channel = connection.createChannel();
//5. 创建队列Queue
/*
queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
参数:
1. queue:队列名称
2. durable:是否持久化,当mq重启之后,还在
3. exclusive:
* 是否独占。只能有一个消费者监听这队列
* 当Connection关闭时,是否删除队列
*
4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
5. arguments:参数。

*/
//如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
channel.queueDeclare("hello_world", true, false, false, null);
/*
basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
参数:
1. exchange:交换机名称。简单模式下交换机会使用默认的 ""
2. routingKey:路由名称
3. props:配置信息
4. body:发送消息数据

*/

String body = "hello rabbitmq~~~";

//6. 发送消息
channel.basicPublish("", "hello_world", null, body.getBytes());


//7.释放资源
channel.close();
connection.close();

}
}

在执行上述的消息发送之后;可以登录rabbitMQ的管理控制台,可以发现队列和其消息:

image-20201214231440832

image-20201214231617494

3. 编写消费者

编写消息的消费者com.itheima.consumer.Consumer_HelloWorld;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package com.itheima.consumer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer_HelloWorld {
public static void main(String[] args) throws IOException, TimeoutException {

//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2. 设置参数
factory.setHost("106.15.72.229");//ip 默认值 localhost
factory.setPort(5672); //端口 默认值 5672
factory.setVirtualHost("/itcast");//虚拟机 默认值/
factory.setUsername("heima");//用户名 默认 guest
factory.setPassword("heima");//密码 默认值 guest
//3. 创建连接 Connection
Connection connection = factory.newConnection();
//4. 创建Channel
Channel channel = connection.createChannel();
//5. 创建队列Queue
/*
queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
参数:
1. queue:队列名称
2. durable:是否持久化,当mq重启之后,还在
3. exclusive:
* 是否独占。只能有一个消费者监听这队列
* 当Connection关闭时,是否删除队列
4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
5. arguments:参数。

*/
//如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
channel.queueDeclare("hello_world",true,false,false,null);

/*
basicConsume(String queue, boolean autoAck, Consumer callback)
参数:
1. queue:队列名称
2. autoAck:是否自动确认
3. callback:回调对象

*/
// 接收消息
Consumer consumer = new DefaultConsumer(channel){
/*
回调方法,当收到消息后,会自动执行该方法

1. consumerTag:标识
2. envelope:获取一些信息,交换机,路由key...
3. properties:配置信息
4. body:数据

*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag:"+consumerTag);
System.out.println("Exchange:"+envelope.getExchange());
System.out.println("RoutingKey:"+envelope.getRoutingKey());
System.out.println("properties:"+properties);
System.out.println("body:"+new String(body));
}
};
channel.basicConsume("hello_world",true,consumer);


//关闭资源?不要

}
}

image-20201214232206963

image-20201214232238896

4. 小结

抽取创建connection的工具类com.itheima.util.ConnectionUtil;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package com.itheima.rabbitmq.util;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class ConnectionUtil {

public static Connection getConnection() throws Exception {
//创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//主机地址;默认为 localhost
connectionFactory.setHost("106.15.72.229");
//连接端口;默认为 5672
connectionFactory.setPort(5672);
//虚拟主机名称;默认为 /
connectionFactory.setVirtualHost("/");
//连接用户名;默认为guest
connectionFactory.setUsername("guest");
//连接密码;默认为guest
connectionFactory.setPassword("guest");

//创建连接
return connectionFactory.newConnection();
}
}

上述的入门案例中中其实使用的是如下的简单模式:

1555991074575

在上图的模型中,有以下概念:

  • P:生产者,也就是要发送消息的程序
  • C:消费者:消息的接受者,会一直等待消息到来。
  • queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。

RabbitMQ运行机制

AMQP中的消息路由

  • AMQP中的消息路由过程和Java开发者熟悉的JMS存在一些差别,AMQP中增加了Exchange和Binding的角色.生产者把消息发布到Exchange上,消息最终到达队列并被消费者接收,而Binding决定交换器的消息应该发送到那个队列

image-20200907140940669

Exchange类型

  • Exchange分发消息时根据类型的不同,分发策略有区别,目前共有四种类型:

    direct fanout topic headers

    headers匹配AMQP消息的header而不是路由键,headers交换器和direct交换器完全一致,但性能差很多,目前几乎用不到,直接看其它三种

    image-20200810163148389

    image-20200810163322085

RabbitMQ整合

  1. 引入spring-boot-starter-amqp

  2. 配置application.yml

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    rabbitmq:
    host: 192.168.229.130
    port: 5672
    virtual-host: /
    publisher-confirms: true
    publisher-returns: true
    # 只要抵达队列 以异步发送优先回调我们这个return confirm
    template:
    mandatory: true
    listener:
    simple:
    acknowledge-mode: manual
  3. 编写配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
    @Autowired
RabbitTemplate rabbitTemplate;

@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
};

/* 定制rabbittemplate postconstruct表示在MyRabbitConfig对象创建完成之后才调用
@PostConstruct
public void initRabbitTemplate(){
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
*
* @param correlationData 当前消息的唯一关联数据
* @param ack 消息是否成功
* @param cause 失败的原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("confirm...["+correlationData+"],"+"失败标志"+ack+"失败原因"+cause);
}
});

//设置消息抵达队列的确认回调 只要消息没有投递就触发 属于失败回调
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
/**
*
* @param message 投递失败的消息
* @param i 回复的状态码
* @param s 回复的文本内容
* @param s1 消息是发给哪个交换机
* @param s2 消息指定的路由键
*/
@Override
public void returnedMessage(Message message, int i, String s, String s1, String s2) {

}
});
*/
}

  1. 测试

    • AmqpAdmin管理组件
    • RabbitTemplate:消息发送处理组件

消息确认机制

image-20200907142507074

保证消息不丢失,可靠抵达,可以使用事务消息,性能下降250倍,为此引入确认机制

  • publisher confirmCallback 确认模式
  • publisher returnCallback 未投递到queue退回模式
  • consumer ack机制

可靠抵达-ConfirmCallback

  • spring.rabbitmq.pulisher-confirms=true
    • 在创建connectionFactory的时候设置PublisherConfirms(true)选项,开启confirmcallback.
    • CorrelationData: 用来表示当前消息唯一性
    • 消息只要被broker接受到就会执行confirmCallback,如果是cluster模式,需要所有broker接受到才会调用confirmCallback
    • 被broker接收到只能表示message已经到达服务器,并不能保证消息一定会被投递到目标queue里,所以需要用到接下来的returnCallback.

代码

第一步 发送端配置

1
publisher-confirms: true

第二步 放置确认回调

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//定制rabbittemplate postconstruct表示在MyRabbitConfig对象创建完成之后才调用
@PostConstruct
public void initRabbitTemplate(){
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
*
* @param correlationData 当前消息的唯一关联数据
* @param ack 消息是否成功
* @param cause 失败的原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("confirm...["+correlationData+"],"+"是否成功"+ack+"失败原因"+cause);
}
});
}

只要消息抵达broker 就ack=true

可靠抵达-ReturnCallback

  • spring.rabbitmq.publisher-returns=true
  • spring.rabbitmq.template.mandatory=true
    • confirm模式里只能保证消息到达broker,不能保证消息准确投递到目标queue里,在有些业务场景下,我们需要保证消息一定要投递到目标queue里,这时候就需要用到return退回模式
    • 这样如果未能投递到目标queue里将调用returnCallback,可以记录下详细到投递数据,定期的巡检或者自动纠错都需要那些数据

代码

开启发送端消息抵达队列确认

image-20200811130352702

1
2
3
# 只要抵达队列 以异步发送优先回调我们这个return confirm
template:
mandatory: true
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//设置消息抵达队列的确认回调 只要消息没有投递就触发  属于失败回调

rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
/**
*
* @param message 投递失败的消息
* @param i 回复的状态码
* @param s 回复的文本内容
* @param s1 消息是发给哪个交换机
* @param s2 消息指定的路由键
*/
@Override
public void returnedMessage(Message message, int i, String s, String s1, String s2) {

}
});

消费端 确认

  • 默认是自动确认的 只要有消息接收到 客户端会自动确认 服务端移除消息
  • 如果收到很多消息 自动回复给服务器ack 只有一个消息确认处理成功 宕机了 发送的消息丢失 手动确认
1
2
3
listener:
simple:
acknowledge-mode: manual

只要不确认的消息都不算成功处理 处于unacked 即使服务宕机 消息也不会消失

  • 如何签收?
1
channel.basicAck

image-20200811133538324

image-20200811134125966

image-20200811134653044

RabbitMQ延时队列

场景: 未付款订单 超过一定时间后 系统自动取消订单并释放占有物品.

常用解决方案:

spring的schedule定时任务轮询数据库

缺点:

消耗系统内存 增加了数据库的压力 存在较大的时间误差

解决:

rabbitmq的消息TTl和死信Exchange结合

消息的TTL(TimeToLive)

  • 消息的TTL 就是消息的存活时间

  • RabbitMQ 可以对队列消息分别设置TTL

    • 对队列设置就是队列没有消费者连着的保留时间 也可以对每一个单独的消息做单独的设置 超过了这个时间 我们认为这个消息就死了 称之为死信

    • 如果队列设置了 消息也设置了 那么会取小的 所以一个消息如果被路由到 不同队列 这个消息死亡的时间有可能不一样(不同的队列设置).

      这里单讲单个消息的TTL 因为它才是实现延迟任务的关键 可以通过设置消息的expiration字段或者x-message-ttl属性来设置时间 两者是一样的效果

Dead Letter Exchanges(DLX)

  • 一个消息在满足如下条件下 会进死信路由 记住这里是路由而不是队列 一个路由可以对应很多队列.(死信)
    • 一个消息被Consumer拒收了 并且reject方法的参数里requeue是false 也就是说不会被再次放在队列里 被其他消费者使用
    • 上面的消息TTL到了 消息过期了
    • 队列的长度限制满了 排在前面的消息就会被丢弃或者扔到死信路由上
  • Dead Letter Exchange其实就是一种普通的exchange 和创建其它exchange没有两样 只是在某一个设置Dead Letter Exchange的队列中有消息过期了 会自动触发消息的转发 发送到Dead Letter Exchange中去
  • 我们既可以控制消息在一段时间后变成死信 又可以控制变成死信的消息被路由到某一个指定的交换机 结合二者 其实就可以实现一个延时队列

image-20200901214707912

image-20200901214952715

image-20200902111600090

image-20200902111846503

代码

创建出对应容器

  • 需要有监听消息才会创建
1
2
3
4
@RabbitListener(queues = "stock-event-exchange")
public void handle(Message message){

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
@Configuration
public class MyMQConfig {


//测试
@RabbitListener(queues = "order.release.order.queue")
public void listen(OrderEntity entity, Channel channel, Message message) throws IOException {
System.out.println("收到过期订单消息,准备关闭订单:"+entity.getOrderSn());
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}


//创建一个交换机 两个队列 两个绑定关系
//@Bean 容器中的都会自动创建
@Bean
public Queue orderDelayQueue(){
Map<String,Object> arguments = new HashMap<>();
/**
*
*/
arguments.put("x-dead-letter-exchange","order-event-exchange");
arguments.put("x-dead-routing-key","order.release.order");
arguments.put("x-message-ttl",60000);
Queue queue = new Queue("order.delay.queue", true, false, false,arguments);
return queue;
}

@Bean
public Queue orderReleaseOrderQueue(){
Queue queue = new Queue("order.release.order.queue", true, false, false);
return queue;
}

@Bean
public Exchange orderEventExchange(){
Exchange exchange = new TopicExchange("order-event-exchange", true, false);
return exchange;
}

@Bean
public Binding orderCreateOrderBinding(){
Binding binding = new Binding("order.delay.queue",
Binding.DestinationType.QUEUE,
"order-event-exchange",
"order.create.order",
null);
return binding;
}

@Bean
public Binding orderReleaseOrderBinding(){
Binding binding = new Binding("order.release.order.queue",
Binding.DestinationType.QUEUE,
"order-event-exchange",
"order.release.order",
null);
return binding;
}
}

虚拟请求测试

1
2
3
4
5
6
7
8
9
10
11
@ResponseBody
@GetMapping("/test/createOrder")
public String createOrdertest(){
OrderEntity orderEntity = new OrderEntity();
orderEntity.setOrderSn(UUID.randomUUID().toString());
orderEntity.setModifyTime(new Date());

//给Mq发送消息
rabbitTemplate.convertAndSend("order-event-exchange","order.create.order",orderEntity);
return "ok";
}

资料参考: 尚硅谷