一、消息队列
1.1、简介
消息队列是实现应用程序和应用程序之间通信的中间件产品。
消息队列底层实现的两大主流方式
- 由于消息队列执行的是跨应用的信息传递,所以制定底层通信标准非常必要。
- 目前主流的消息队列通信协议标准包括:
AMQP (Advanced Message Queuing Protocol)
:通用协议,IBM 公司研发。JMS (Java Message Service)
:专门为 Java 语言服务,SUN 公司研发,一组由 Java 接口组成的 Java 标准。
1.2、AMQP 和 JMS 对比
1.3、各主流 MQ 产品对比
二、RabbitMQ 简介
2.1、介绍
官网地址:https://www.rabbitmq.com/
RabbitMQ 是一款基于 AMQP、由 Erlang 语言开发的消息队列产品,2007 年 Rabbit 技术公司发布了它的 1.0 版本。
2.2、RabbitMQ 体系结构
三、 RabbitMQ 工作模式
3.1、Work Queues
多个消费者监听同一个队列,则各消费者之间对同一个消息是竞争的关系。
Work Queues 工作模式适用于任务较重或任务较多的情况,多消费者分摊任务可以提高消息处理的效率。
3.2、Publish/Subscribe
组件之间关系:
- 生产者把消息发送到交换机。
- 队列直接和交换机绑定。
工作机制:消息发送到交换机上,就会以广播的形式发送给所有已绑定队列。
理解概念:
- Publish:发布,这里就是把消息发送到交换机上。
- Subscribe:订阅,这里只要把队列和交换机绑定,事实上就形成了一种订阅关系。
3.3、Routing
通过『路由绑定』的方式,把交换机和队列关联起来。
交换机和队列通过路由键进行绑定。
生产者发送消息时不仅要指定交换机,还要指定路由键。
交换机接收到消息会发送到路由键绑定的队列。
在编码上与 Publish/Subscribe 发布与订阅模式的区别:
- 交换机的类型为:Direct。
- 队列绑定交换机的时候需要指定 routing key。
如果一个交换机通过相同的 routing key 绑定了多个队列,就会有广播效果。
3.4、Topics
Topic 类型与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过 Topic 类型 Exchange 可以让队列在绑定 Routing key 的时候使用通配符。
Routingkey 一般都是由一个或多个单词组成,多个单词之间以 .
分割,例如:item.insert
。
通配符规则:
#
:匹配零个或多个词。*
:匹配一个词。
3.5、RPC
远程过程调用,本质上是同步调用,和我们使用 OpenFeign 调用远程接口一样。
所以这不是典型的消息队列工作方式,就不展开说明了。
3.6、Publisher Confirms
发送端消息确认。
3.7、工作模式总结
直接发送到队列:底层使用了默认交换机。
经过交换机发送到队列:
- Fanout:没有 Routing key 直接绑定队列。
- Direct:通过 Routing key 绑定队列,消息发送到绑定的队列上。
- 一个交换机绑定一个队列:定点发送。
- 一个交换机绑定多个队列:广播发送。
- Topic:针对 Routing key 使用通配符。
四、整合 SpringBoot
生产者:
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
public class RabbitMQTest {
public static final String EXCHANGE_DIRECT = "exchange.direct.order";
public static final String ROUTING_KEY = "order";
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSendMessage () {
rabbitTemplate.convertAndSend (
EXCHANGE_DIRECT,
ROUTING_KEY,
"Hello atguigu");
}
}
消费者:
import lombok.extern.slf4j.Slf4j;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class MyMessageListener {
public static final String EXCHANGE_DIRECT = "exchange.direct.order";
public static final String ROUTING_KEY = "order";
public static final String QUEUE_NAME = "queue.order";
@RabbitListener (bindings = @QueueBinding (
value = @Queue (value = QUEUE_NAME, durable = "true"),
exchange = @Exchange (value = EXCHANGE_DIRECT),
key = {ROUTING_KEY}
))
public void processMessage (String dateString,
Message message,
Channel channel) {
log.info (dateString);
}
}
五、消息可靠性
5.1、消息没有发送到消息队列
- 1、在生产者端进行确认,具体操作中我们会分别针对交换机和队列来确认,如果没有成功发送到消息队列服务器上,那就可以尝试重新发送。
- 2、为目标交换机指定备份交换机,当目标交换机投递失败时,把消息投递至备份交换机。
5.1.1、生产端确认
1、yaml
注意:publisher-confirm-type
和 publisher-returns
是两个必须要增加的配置,如果没有则功能不生效。
spring:
rabbitmq:
host: 192.168.200.100
port: 5672
username: guest
password: 123456
virtual-host: /
publisher-confirm-type: CORRELATED # 交换机的确认
publisher-returns: true # 队列的确认
2、创建配置类
这里需要声明回调函数来接收 RabbitMQ 服务器返回的确认信息:
| 方法名 | 方法功能 | 所属接口 | 接口所属类 |
| ----------------- | ------------------------ | --------------- | -------------- |
| confirm () | 确认消息是否发送到交换机 | ConfirmCallback | RabbitTemplate |
| returnedMessage () | 确认消息是否发送到队列 | ReturnsCallback | RabbitTemplate |
然后,就是对 RabbitTemplate 的功能进行增强,因为回调函数所在对象必须设置到 RabbitTemplate 对象中才能生效。
原本 RabbitTemplate 对象并没有生产者端消息确认的功能,要给它设置对应的组件才可以。
而设置对应的组件,需要调用 RabbitTemplate 对象下面两个方法:
| 设置组件调用的方法 | 所需对象类型 |
| -------------------- | ----------------------- |
| setConfirmCallback () | ConfirmCallback 接口类型 |
| setReturnCallback () | ReturnCallback 接口类型 |
3、配置类代码
yaml
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
# 开启发送端确认
publisher-confirm-type: correlated
# 开启发送端消息抵达队列的确认
publisher-returns: true
template:
# 只要抵达队列,以异步发送优先回调我们这个returnConfirm
mandatory: true
代码
@Configuration
public class RabbitConfig {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void initRabbitTemplate() {
// 设置确认回调
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
log.info("消息成功到达交换机!correlationData={}", correlationData);
} else {
log.error("消息未到达交换机!correlationData={}, cause={}", correlationData, cause);
}
});
// 设置返回回调
rabbitTemplate.setReturnsCallback(returned -> {
log.error("消息未到达队列,被返回!exchange={}, routingKey={}, message={}, replyCode={}, replyText={}",
returned.getExchange(),
returned.getRoutingKey(),
new String(returned.getMessage().getBody()),
returned.getReplyCode(),
returned.getReplyText());
});
}
}
生产者
@Service
public class MessageService {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessage() {
// 1. 创建消息唯一标识
String messageId = UUID.randomUUID().toString();
CorrelationData correlationData = new CorrelationData(messageId);
// 2. 可以在 correlationData 中存储额外信息
correlationData.setReturnedMessage(
MessageBuilder.withBody("Hello RabbitMQ".getBytes())
.setMessageId(messageId)
.build()
);
// 3. 发送消息时携带 correlationData
rabbitTemplate.convertAndSend(
"exchange-name",
"routing-key",
"消息内容",
correlationData
);
}
}
5.1.2、备份交换机
注意:备份交换机一定要选择 fanout 类型,因为原交换机转入备份交换机时并不会指定路由键。
创建备份交换机:
创建备份交换机要绑定的队列:
针对备份队列创建消费端监听器:
public static final String EXCHANGE_DIRECT_BACKUP = "exchange.direct.order.backup";
public static final String QUEUE_NAME_BACKUP = "queue.order.backup";
@RabbitListener (bindings = @QueueBinding (
value = @Queue (value = QUEUE_NAME_BACKUP, durable = "true"),
exchange = @Exchange (value = EXCHANGE_DIRECT_BACKUP),
key = {""}
))
public void processMessageBackup (String dateString,
Message message,
Channel channel) {
log.info ("BackUp:" + dateString);
}
5.2、消息队列服务器宕机导致内存中消息丢失
临时性的交换机和队列也能够接收消息。
RabbitMQ 服务器重启之后,刚才临时性的交换机和队列都没了。在交换机和队列这二者中,队列是消息存储的容器,队列没了,消息就也跟着没了。
在后台管理界面创建交换机和队列时,默认就是持久化的模式。
此时消息也是持久化的,不需要额外设置。
5.3、消费端宕机或抛异常导致消息没有成功被消费
消费端消费消息成功,给服务器返回 ACK 信息,然后消息队列删除该消息。
消费端消费消息失败,给服务器端返回 NACK 信息,同时把消息恢复为待消费的状态,这样就可以再次取回消息,重试一次(当然,这就需要消费端接口支持幂等性)。
5.3.1、默认情况
ACK 是 acknowledge 的缩写,表示已确认。
默认情况下,消费端取回消息后,默认会自动返回 ACK 确认消息,所以在前面的测试中消息被消费端消费之后,RabbitMQ 得到 ACK 确认信息就会删除消息。
但实际开发中,消费端根据消息队列投递的消息执行对应的业务,未必都能执行成功,如果希望能够多次重试,那么默认设定就不满足要求了。
所以还是要修改成手动确认。
5.3.2、配置
增加针对监听器的设置:
spring:
rabbitmq:
host: 192.168.200.100
port: 5672
username: guest
password: 123456
virtual-host: /
listener:
simple:
acknowledge-mode: manual # 把消息确认模式改为手动确认
接收消息方法内部逻辑:
- 业务处理成功:手动返回 ACK 信息,表示消息成功消费
- 业务处理失败:手动返回 NACK 信息,表示消息消费失败。此时有两种后续操作供选择:
- 把消息重新放回消息队列,RabbitMQ 会重新投递这条消息,那么消费端将重新消费这条消息 —— 从而让业务代码再执行一遍
- 不把消息放回消息队列,返回 reject 信息表示拒绝,那么这条消息的处理就到此为止
5.3.3、相关 API
basicAck ()
给 Broker 返回 ACK 确认信息,表示消息已经在消费端成功消费,这样 Broker 就可以把消息删除了。
参数列表:
long deliveryTag
Broker 给每一条进入队列的消息都设定一个唯一标识。boolean multiple
取值为 true:为小于、等于 deliveryTag 的消息批量返回 ACK 信息取值为 false:仅为指定的 deliveryTag 返回 ACK 信息。
basicNack ()
给 Broker 返回 NACK 信息,表示消息在消费端消费失败,此时 Broker 的后续操作取决于参数 requeue 的值。
参数列表:
long deliveryTag
Broker 给每一条进入队列的消息都设定一个唯一标识。boolean multiple
取值为 true:为小于、等于 deliveryTag 的消息批量返回 ACK 信息取值为 false:仅为指定的 deliveryTag 返回 ACK 信息。boolean requeue
取值为 true:Broker 将消息重新放回队列,接下来会重新投递给消费端取值为 false:Broker 将消息标记为已消费,不会放回队列。
basicReject ()
根据指定的 deliveryTag,对该消息表示拒绝。
参数列表:
long deliveryTag
Broker 给每一条进入队列的消息都设定一个唯一标识。boolean requeue
取值为 true:Broker 将消息重新放回队列,接下来会重新投递给消费端取值为 false:Broker 将消息标记为已消费,不会放回队列。
5.3.4、完整代码示例
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
@Slf4j
public class MyMessageListener {
public static final String EXCHANGE_DIRECT = "exchange.direct.order";
public static final String ROUTING_KEY = "order";
public static final String QUEUE_NAME = "queue.order";
// 修饰监听方法
@RabbitListener (
// 设置绑定关系
bindings = @QueueBinding (
// 配置队列信息:durable 设置为 true 表示队列持久化;autoDelete 设置为 false 表示关闭自动删除
value = @Queue (value = QUEUE_NAME, durable = "true", autoDelete = "false"),
// 配置交换机信息:durable 设置为 true 表示队列持久化;autoDelete 设置为 false 表示关闭自动删除
exchange = @Exchange (value = EXCHANGE_DIRECT, durable = "true", autoDelete = "false"),
// 配置路由键信息
key = {ROUTING_KEY}
))
public void processMessage (String dataString, Message message, Channel channel) throws IOException {
// 1、获取当前消息的 deliveryTag 值备用
long deliveryTag = message.getMessageProperties ().getDeliveryTag ();
try {
// 2、正常业务操作
log.info ("消费端接收到消息内容:" + dataString);
// System.out.println (10 / 0);
// 3、给 RabbitMQ 服务器返回 ACK 确认信息
channel.basicAck (deliveryTag, false);
} catch (Exception e) {
// 4、获取信息,看当前消息是否曾经被投递过
Boolean redelivered = message.getMessageProperties ().getRedelivered ();
if (!redelivered) {
// 5、如果没有被投递过,那就重新放回队列,重新投递,再试一次
channel.basicNack (deliveryTag, false, true);
} else {
// 6、如果已经被投递过,且这一次仍然进入了 catch 块,那么返回拒绝且不再放回队列
channel.basicReject (deliveryTag, false);
}
}
}
}
5.3.5、要点总结
- 1、把消息确认模式改为 手动确认。
- 2、调用 Channel 对象的方法返回信息。
ACK
:Acknowledgement,表示消息处理成功。NACK
:Negative Acknowledgement,表示消息处理失败。Reject
:拒绝,同样表示消息处理失败。
- 3、后续操作
requeue
为 true:重新放回队列,重新投递,再次尝试。requeue
为 false:不放回队列,不重新投递。
- 4、deliveryTag 消息的唯一标识,查找具体某一条消息的依据。
5.3.6、流程
六、交付标签机制
deliveryTag 是一个 64 位整数。
消费端把消息处理结果 ACK、NACK、Reject 等返回给 Broker 之后,Broker 需要对对应的消息执行后续操作,例如删除消息、重新排队或标记为死信等等那么 Broker 就必须知道它现在要操作的消息具体是哪一条。而 deliveryTag 作为消息的唯一标识就很好的满足了这个需求。
如果交换机是 Fanout 模式,同一个消息广播到了不同队列,deliveryTag 也不会重复,deliveryTag 是 broker 范围唯一的。
七、消费端限流
设定 prefetch
spring:
rabbitmq:
host: 192.168.200.100
port: 5672
username: guest
password: 123456
virtual-host: /
listener:
simple:
acknowledge-mode: manual
prefetch: 1 # 设置每次最多从消息队列服务器取回多少消息
八、消息超时
给消息设定一个过期时间,超过这个时间没有被取走的消息就会被删除。
- 给消息设定一个过期时间,超过这个时间没有被取走的消息就会被删除。
- 我们可以从两个层面来给消息设定过期时间:。
- 队列层面:在队列层面设定消息的过期时间,并不是队列的过期时间。意思是这个队列中的消息全部使用同一个过期时间。
- 消息本身:给具体的某个消息设定过期时间。
- 如果两个层面都做了设置,那么哪个时间短,哪个生效。
8.1、队列层面设置
设置绑定关系:
测试:
不启动消费端程序。
向设置了过期时间的队列中发送 100 条消息。
等 10 秒后,看是否全部被过期删除。
8.2、消息层面设置
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
@Test
public void testSendMessageTTL () {
// 1、创建消息后置处理器对象
MessagePostProcessor messagePostProcessor = (Message message) -> {
// 设定 TTL 时间,以毫秒为单位
message.getMessageProperties ().setExpiration ("5000");
return message;
};
// 2、发送消息
rabbitTemplate.convertAndSend (
EXCHANGE_DIRECT,
ROUTING_KEY,
"Hello atguigu", messagePostProcessor);
}
发送到普通队列上:
九、死信和死信队列
概念:当一个消息无法被消费,它就变成了死信。
死信产生的原因大致有下面三种:
拒绝
:消费者拒接消息,basicNack ()/basicReject ()
,并且不把消息重新放入原目标队列,requeue=false。溢出
:队列中消息数量到达限制。比如队列最大只能存储 10 条消息,且现在已经存储了 10 条,此时如果再发送一条消息进来,- 根据先进先出原则,队列中最早的消息会变成死信。超时
:消息到达超时时间未被消费。
死信的处理方式大致有下面三种:
丢弃
:对不重要的消息直接丢弃,不做处理。入库
:把死信写入数据库,日后处理。监听
:消息变成死信后进入死信队列,我们专门设置消费端监听死信队列,做后续处理(通常采用)。
9.1、测试相关
9.1.1、创建死信交换机和死信队列
常规设定即可,没有特殊设置:
- 死信交换机:exchange.dead.letter.video
- 死信队列:queue.dead.letter.video
- 死信路由键:routing.key.dead.letter.video
9.1.2、创建正常交换机和正常队列
注意:一定要注意正常队列有诸多限定和设置,这样才能让无法处理的消息进入死信交换机
正常交换机:exchange.normal.video
正常队列:queue.normal.video
正常路由键:routing.key.normal.video
全部设置完成后参照如下细节:
9.2、消费端拒收消息
9.2.1、发送消息的代码
@Test
public void testSendMessageButReject () {
rabbitTemplate
.convertAndSend (
EXCHANGE_NORMAL,
ROUTING_KEY_NORMAL,
"测试死信情况 1:消息被拒绝");
}
9.2.2、接收消息的代码
1、监听正常队列
@RabbitListener (queues = {QUEUE_NORMAL})
public void processMessageNormal (Message message, Channel channel) throws IOException {
// 监听正常队列,但是拒绝消息
log.info ("★[normal] 消息接收到,但我拒绝。");
channel.basicReject (message.getMessageProperties ().getDeliveryTag (), false);
}
2、监听死信队列
@RabbitListener (queues = {QUEUE_DEAD_LETTER})
public void processMessageDead (String dataString, Message message, Channel channel) throws IOException {
// 监听死信队列
log.info ("★[dead letter] dataString =" + dataString);
log.info ("★[dead letter] 我是死信监听方法,我接收到了死信消息");
channel.basicAck (message.getMessageProperties ().getDeliveryTag (), false);
}
9.3、消息数量超过队列容纳极限
1、发送消息的代码
@Test
public void testSendMultiMessage () {
for (int i = 0; i < 20; i++) {
rabbitTemplate.convertAndSend (
EXCHANGE_NORMAL,
ROUTING_KEY_NORMAL,
"测试死信情况 2:消息数量超过队列的最大容量" + i);
}
}
2、接收消息的代码
@RabbitListener (queues = {QUEUE_NORMAL})
public void processMessageNormal (Message message, Channel channel) throws IOException {
// 监听正常队列
log.info ("★[normal] 消息接收到。");
channel.basicAck (message.getMessageProperties ().getDeliveryTag (), false);
}
3、执行效果
正常队列的参数如下图所示:
生产者发送 20 条消息之后,消费端死信队列接收到前 10 条消息:
9.4、消息超时未消费
1、发送消息的代码
正常发送一条消息即可,所以使用第一个例子的代码。
@Test
public void testSendMessageTimeout () {
rabbitTemplate
.convertAndSend (
EXCHANGE_NORMAL,
ROUTING_KEY_NORMAL,
"测试死信情况 3:消息超时");
}
2、执行效果
因为没有消费端监听程序,所以消息未超时前滞留在队列中:
消息超时后,进入死信队列:
十、延迟队列
推迟一段时间后执行指定操作。
10.1、基于死信的延迟队列
10.2、基于插件的延迟队列
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
十一、事务消息
总结:
- 在生产者端使用事务消息和消费端没有关系。
- 在生产者端使用事务消息仅仅是控制事务内的消息是否发送。
- 提交事务就把事务内所有消息都发送到交换机。
- 回滚事务则事务内任何消息都不会被发送。
- 事务控制对消费端无效。
测试回滚事务的情况:
@Test
@Transactional
@Rollback(value = true)
public void testSendMessageInTx() {
// 1、发送第一条消息
rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, "I am a dragon(tx msg [rollback] ~~~01)");
// 2、发送第二条消息
rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, "I am a dragon(tx msg [rollback] ~~~02)");
}
十二、惰性队列
创建队列时,在 Durability 这里有两个选项可以选择。
- Durable:持久化队列,消息会持久化到硬盘上。
- Transient:临时队列,不做持久化操作,broker 重启后消息会丢失。
未设置惰性模式时队列的持久化机制:
惰性队列的工作方式:
12.1、创建惰性队列
官网说明:
队列可以创建为默认或惰性模式,模式指定方式是:
- 使用队列策略(建议)
- 设置
queue.declare
参数
如果策略和队列参数同时指定,那么队列参数有更高优先级。如果队列模式是在声明时通过可选参数指定的,那么只能通过删除队列再重新创建来修改。
1、基于策略方式设定:
# 登录Docker容器
docker exec -it rabbitmq /bin/bash
# 运行rabbitmqctl命令
rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues
命令解读:
rabbitmqctl
命令所在目录是:/opt/rabbitmq/sbin,该目录已配置到 Path 环境变量。set_policy
是子命令,表示设置策略。Lazy
是当前要设置的策略名称,是我们自己自定义的,不是系统定义的。"^lazy-queue$"
是用正则表达式限定的队列名称,凡是名称符合这个正则表达式的队列都会应用这里的设置。'{"queue-mode":"lazy"}'
是一个 JSON 格式的参数设置指定了队列的模式为 "lazy"。–-apply-to
参数指定该策略将应用于队列(queues)级别。- 命令执行后,所有名称符合正则表达式的队列都会应用指定策略,包括未来新创建的队列。
如果需要修改队列模式可以执行如下命令(不必删除队列再重建):
rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"default"}' --apply-to queues
2、在声明队列时使用参数设定
- 参数名称:x-queue-mode
- 可用参数值:
- default
- lazy
- 不设置就是取值为 default
Java 代码原生 API 设置方式:
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-queue-mode", "lazy");
channel.queueDeclare("myqueue", false, false, false, args);
Java 代码注解设置方式:
@Queue(value = QUEUE_NAME, durable = "true", autoDelete = "false", arguments = {
@Argument(name = "x-queue-mode", value = "lazy")
})
12.2、测试代码
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class MyLazyMessageProcessor {
public static final String EXCHANGE_LAZY_NAME = "exchange.atguigu.lazy";
public static final String ROUTING_LAZY_KEY = "routing.key.atguigu.lazy";
public static final String QUEUE_LAZY_NAME = "queue.atguigu.lazy";
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = QUEUE_LAZY_NAME, durable = "true", autoDelete = "false", arguments = {
@Argument(name = "x-queue-mode", value = "lazy")
}),
exchange = @Exchange(value = EXCHANGE_LAZY_NAME, durable = "true", autoDelete = "false"),
key = {ROUTING_LAZY_KEY}
))
public void processMessageLazy(String data, Message message, Channel channel) {
log.info("消费端接收到消息:" + data);
}
}
基于消费端 @RabbitListener 注解中的配置,自动创建了队列:
12.3、惰性队列:应用场景
使用惰性队列的主要原因之一是支持非常长的队列(数百万条消息)。由于各种原因,排队可能会变得很长:
- 消费者离线/崩溃/停机进行维护。
- 突然出现消息进入高峰,生产者的速度超过了消费者。
- 消费者比正常情况慢。
十三、优先级队列
机制说明:
- 默认情况:基于队列先进先出的特性,通常来说,先入队的先投递。
- 设置优先级之后:优先级高的消息更大几率先投递。
- 关键参数:
x-max-priority
。
消息的优先级设置:
RabbitMQ 允许我们使用一个正整数给消息设定优先级。
消息的优先级数值取值范围:1~255。
RabbitMQ 官网建议在1~5之间设置消息的优先级(优先级越高,占用CPU、内存等资源越多)。
操作:
- 队列在声明时可以指定参数:x-max-priority。
- 默认值:0 此时消息即使设置优先级也无效。
- 指定一个正整数值:消息的优先级数值不能超过这个。
13.1、创建相关资源
创建交换机:
创建队列:
队列绑定交换机:
13.2、测试代码
生产者:
import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
public class RabbitMQTest {
public static final String EXCHANGE_PRIORITY = "exchange.test.priority";
public static final String ROUTING_KEY_PRIORITY = "routing.key.test.priority";
@Resource
private RabbitTemplate rabbitTemplate;
@Test
public void testSendMessage() {
rabbitTemplate.convertAndSend(EXCHANGE_PRIORITY, ROUTING_KEY_PRIORITY, "I am a message with priority 1.", message->{
message.getMessageProperties().setPriority(1);
return message;
});
}
}
消费者:
@Slf4j
@Component
public class MyMessageProcessor {
public static final String QUEUE_PRIORITY = "queue.test.priority";
@RabbitListener(queues = {QUEUE_PRIORITY})
public void processPriorityMessage(String data, Message message, Channel channel) throws IOException {
log.info(data);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
评论