RabbitMQ

RabbitMQ

一、消息队列

1.1、简介

消息队列是实现应用程序和应用程序之间通信的中间件产品。

消息队列底层实现的两大主流方式

  • 由于消息队列执行的是跨应用的信息传递,所以制定底层通信标准非常必要。
  • 目前主流的消息队列通信协议标准包括:
    • AMQP (Advanced Message Queuing Protocol):通用协议,IBM 公司研发。
    • JMS (Java Message Service):专门为 Java 语言服务,SUN 公司研发,一组由 Java 接口组成的 Java 标准。

1.2、AMQP 和 JMS 对比

1.3、各主流 MQ 产品对比

二、RabbitMQ 简介

2.1、介绍

官网地址:https://www.rabbitmq.com/
RabbitMQ 是一款基于 AMQP、由 Erlang 语言开发的消息队列产品,2007 年 Rabbit 技术公司发布了它的 1.0 版本。

2.2、RabbitMQ 体系结构

三、 RabbitMQ 工作模式

3.1、Work Queues


多个消费者监听同一个队列,则各消费者之间对同一个消息是竞争的关系。
Work Queues 工作模式适用于任务较重或任务较多的情况,多消费者分摊任务可以提高消息处理的效率。

3.2、Publish/Subscribe


组件之间关系:

  • 生产者把消息发送到交换机。
  • 队列直接和交换机绑定。

工作机制:消息发送到交换机上,就会以广播的形式发送给所有已绑定队列。

理解概念:

  • Publish:发布,这里就是把消息发送到交换机上。
  • Subscribe:订阅,这里只要把队列和交换机绑定,事实上就形成了一种订阅关系。

3.3、Routing


通过『路由绑定』的方式,把交换机和队列关联起来。
交换机和队列通过路由键进行绑定。
生产者发送消息时不仅要指定交换机,还要指定路由键。
交换机接收到消息会发送到路由键绑定的队列。
在编码上与 Publish/Subscribe 发布与订阅模式的区别:

  • 交换机的类型为:Direct。
  • 队列绑定交换机的时候需要指定 routing key。

如果一个交换机通过相同的 routing key 绑定了多个队列,就会有广播效果。

3.4、Topics


Topic 类型与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过 Topic 类型 Exchange 可以让队列在绑定 Routing key 的时候使用通配符。
Routingkey 一般都是由一个或多个单词组成,多个单词之间以 . 分割,例如:item.insert
通配符规则:

  • #:匹配零个或多个词。
  • *:匹配一个词。

3.5、RPC


远程过程调用,本质上是同步调用,和我们使用 OpenFeign 调用远程接口一样。
所以这不是典型的消息队列工作方式,就不展开说明了。

3.6、Publisher Confirms

发送端消息确认。

3.7、工作模式总结

直接发送到队列:底层使用了默认交换机。
经过交换机发送到队列:

  • Fanout:没有 Routing key 直接绑定队列。
  • Direct:通过 Routing key 绑定队列,消息发送到绑定的队列上。
    • 一个交换机绑定一个队列:定点发送。
    • 一个交换机绑定多个队列:广播发送。
  • Topic:针对 Routing key 使用通配符。

四、整合 SpringBoot

生产者:

import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest  
public class RabbitMQTest {  
  
    public static final String EXCHANGE_DIRECT = "exchange.direct.order";  
    public static final String ROUTING_KEY = "order";
  
    @Autowired  
    private RabbitTemplate rabbitTemplate;
  
    @Test  
    public void testSendMessage () {  
        rabbitTemplate.convertAndSend (  
                EXCHANGE_DIRECT,   
                ROUTING_KEY,   
                "Hello atguigu");  
    }  
  
}

消费者:

import lombok.extern.slf4j.Slf4j;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class MyMessageListener {
  
    public static final String EXCHANGE_DIRECT = "exchange.direct.order";  
    public static final String ROUTING_KEY = "order";  
    public static final String QUEUE_NAME  = "queue.order";  
  
    @RabbitListener (bindings = @QueueBinding (
            value = @Queue (value = QUEUE_NAME, durable = "true"),
            exchange = @Exchange (value = EXCHANGE_DIRECT),
            key = {ROUTING_KEY}
    ))
    public void processMessage (String dateString,
                               Message message,
                               Channel channel) {
        log.info (dateString);
    }
  
}

五、消息可靠性

5.1、消息没有发送到消息队列

  • 1、在生产者端进行确认,具体操作中我们会分别针对交换机和队列来确认,如果没有成功发送到消息队列服务器上,那就可以尝试重新发送。
  • 2、为目标交换机指定备份交换机,当目标交换机投递失败时,把消息投递至备份交换机。

5.1.1、生产端确认

1、yaml
注意:publisher-confirm-typepublisher-returns 是两个必须要增加的配置,如果没有则功能不生效。

spring:
  rabbitmq:
    host: 192.168.200.100
    port: 5672
    username: guest
    password: 123456
    virtual-host: /
    publisher-confirm-type: CORRELATED # 交换机的确认
    publisher-returns: true # 队列的确认

2、创建配置类
这里需要声明回调函数来接收 RabbitMQ 服务器返回的确认信息:

| 方法名            | 方法功能                 | 所属接口        | 接口所属类     |
| ----------------- | ------------------------ | --------------- | -------------- |
| confirm ()         | 确认消息是否发送到交换机 | ConfirmCallback | RabbitTemplate |
| returnedMessage () | 确认消息是否发送到队列   | ReturnsCallback | RabbitTemplate |

然后,就是对 RabbitTemplate 的功能进行增强,因为回调函数所在对象必须设置到 RabbitTemplate 对象中才能生效。
原本 RabbitTemplate 对象并没有生产者端消息确认的功能,要给它设置对应的组件才可以。
而设置对应的组件,需要调用 RabbitTemplate 对象下面两个方法:

| 设置组件调用的方法   | 所需对象类型            |
| -------------------- | ----------------------- |
| setConfirmCallback () | ConfirmCallback 接口类型 |
| setReturnCallback ()  | ReturnCallback 接口类型  |

3、配置类代码
yaml

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    # 开启发送端确认
    publisher-confirm-type: correlated
    # 开启发送端消息抵达队列的确认
    publisher-returns: true
    template:
      # 只要抵达队列,以异步发送优先回调我们这个returnConfirm
      mandatory: true

代码

@Configuration
public class RabbitConfig {
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @PostConstruct
    public void initRabbitTemplate() {
        // 设置确认回调
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                log.info("消息成功到达交换机!correlationData={}", correlationData);
            } else {
                log.error("消息未到达交换机!correlationData={}, cause={}", correlationData, cause);
            }
        });
        
        // 设置返回回调
        rabbitTemplate.setReturnsCallback(returned -> {
            log.error("消息未到达队列,被返回!exchange={}, routingKey={}, message={}, replyCode={}, replyText={}",
                returned.getExchange(),
                returned.getRoutingKey(),
                new String(returned.getMessage().getBody()),
                returned.getReplyCode(),
                returned.getReplyText());
        });
    }
}

生产者

@Service
public class MessageService {
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    public void sendMessage() {
        // 1. 创建消息唯一标识
        String messageId = UUID.randomUUID().toString();
        CorrelationData correlationData = new CorrelationData(messageId);
        
        // 2. 可以在 correlationData 中存储额外信息
        correlationData.setReturnedMessage(
            MessageBuilder.withBody("Hello RabbitMQ".getBytes())
                .setMessageId(messageId)
                .build()
        );
        
        // 3. 发送消息时携带 correlationData
        rabbitTemplate.convertAndSend(
            "exchange-name",
            "routing-key",
            "消息内容",
            correlationData
        );
    }
}

5.1.2、备份交换机

注意:备份交换机一定要选择 fanout 类型,因为原交换机转入备份交换机时并不会指定路由键。
创建备份交换机:


创建备份交换机要绑定的队列:



针对备份队列创建消费端监听器:

public static final String EXCHANGE_DIRECT_BACKUP = "exchange.direct.order.backup";
    public static final String QUEUE_NAME_BACKUP  = "queue.order.backup";

    @RabbitListener (bindings = @QueueBinding (
            value = @Queue (value = QUEUE_NAME_BACKUP, durable = "true"),
            exchange = @Exchange (value = EXCHANGE_DIRECT_BACKUP),
            key = {""}
    ))
    public void processMessageBackup (String dateString,
                                     Message message,
                                     Channel channel) {
        log.info ("BackUp:" + dateString);
    }

5.2、消息队列服务器宕机导致内存中消息丢失

临时性的交换机和队列也能够接收消息。
RabbitMQ 服务器重启之后,刚才临时性的交换机和队列都没了。在交换机和队列这二者中,队列是消息存储的容器,队列没了,消息就也跟着没了。
在后台管理界面创建交换机和队列时,默认就是持久化的模式。
此时消息也是持久化的,不需要额外设置。

5.3、消费端宕机或抛异常导致消息没有成功被消费

消费端消费消息成功,给服务器返回 ACK 信息,然后消息队列删除该消息。
消费端消费消息失败,给服务器端返回 NACK 信息,同时把消息恢复为待消费的状态,这样就可以再次取回消息,重试一次(当然,这就需要消费端接口支持幂等性)。

5.3.1、默认情况

ACK 是 acknowledge 的缩写,表示已确认。
默认情况下,消费端取回消息后,默认会自动返回 ACK 确认消息,所以在前面的测试中消息被消费端消费之后,RabbitMQ 得到 ACK 确认信息就会删除消息。
但实际开发中,消费端根据消息队列投递的消息执行对应的业务,未必都能执行成功,如果希望能够多次重试,那么默认设定就不满足要求了。
所以还是要修改成手动确认。

5.3.2、配置

增加针对监听器的设置:

spring:
  rabbitmq:
    host: 192.168.200.100
    port: 5672
    username: guest
    password: 123456
    virtual-host: /
    listener:
      simple:
        acknowledge-mode: manual # 把消息确认模式改为手动确认

接收消息方法内部逻辑:

  • 业务处理成功:手动返回 ACK 信息,表示消息成功消费
  • 业务处理失败:手动返回 NACK 信息,表示消息消费失败。此时有两种后续操作供选择:
    • 把消息重新放回消息队列,RabbitMQ 会重新投递这条消息,那么消费端将重新消费这条消息 —— 从而让业务代码再执行一遍
    • 不把消息放回消息队列,返回 reject 信息表示拒绝,那么这条消息的处理就到此为止

5.3.3、相关 API

basicAck () 给 Broker 返回 ACK 确认信息,表示消息已经在消费端成功消费,这样 Broker 就可以把消息删除了。
参数列表:

  • long deliveryTag Broker 给每一条进入队列的消息都设定一个唯一标识。
  • boolean multiple 取值为 true:为小于、等于 deliveryTag 的消息批量返回 ACK 信息取值为 false:仅为指定的 deliveryTag 返回 ACK 信息。

basicNack () 给 Broker 返回 NACK 信息,表示消息在消费端消费失败,此时 Broker 的后续操作取决于参数 requeue 的值。
参数列表:

  • long deliveryTag Broker 给每一条进入队列的消息都设定一个唯一标识。
  • boolean multiple 取值为 true:为小于、等于 deliveryTag 的消息批量返回 ACK 信息取值为 false:仅为指定的 deliveryTag 返回 ACK 信息。
  • boolean requeue 取值为 true:Broker 将消息重新放回队列,接下来会重新投递给消费端取值为 false:Broker 将消息标记为已消费,不会放回队列。

basicReject () 根据指定的 deliveryTag,对该消息表示拒绝。
参数列表:

  • long deliveryTag Broker 给每一条进入队列的消息都设定一个唯一标识。
  • boolean requeue 取值为 true:Broker 将消息重新放回队列,接下来会重新投递给消费端取值为 false:Broker 将消息标记为已消费,不会放回队列。

5.3.4、完整代码示例

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Component
@Slf4j
public class MyMessageListener {

    public static final String EXCHANGE_DIRECT = "exchange.direct.order";
    public static final String ROUTING_KEY = "order";
    public static final String QUEUE_NAME  = "queue.order";

    // 修饰监听方法
    @RabbitListener (
            // 设置绑定关系
            bindings = @QueueBinding (
                // 配置队列信息:durable 设置为 true 表示队列持久化;autoDelete 设置为 false 表示关闭自动删除
                value = @Queue (value = QUEUE_NAME, durable = "true", autoDelete = "false"),
                // 配置交换机信息:durable 设置为 true 表示队列持久化;autoDelete 设置为 false 表示关闭自动删除
                exchange = @Exchange (value = EXCHANGE_DIRECT, durable = "true", autoDelete = "false"),
                // 配置路由键信息
                key = {ROUTING_KEY}
    ))
    public void processMessage (String dataString, Message message, Channel channel) throws IOException {
        // 1、获取当前消息的 deliveryTag 值备用
        long deliveryTag = message.getMessageProperties ().getDeliveryTag ();
        try {
            // 2、正常业务操作
            log.info ("消费端接收到消息内容:" + dataString);
            // System.out.println (10 / 0);
            // 3、给 RabbitMQ 服务器返回 ACK 确认信息
            channel.basicAck (deliveryTag, false);
        } catch (Exception e) {
            // 4、获取信息,看当前消息是否曾经被投递过
            Boolean redelivered = message.getMessageProperties ().getRedelivered ();
            if (!redelivered) {
                // 5、如果没有被投递过,那就重新放回队列,重新投递,再试一次
                channel.basicNack (deliveryTag, false, true);
            } else {
                // 6、如果已经被投递过,且这一次仍然进入了 catch 块,那么返回拒绝且不再放回队列
                channel.basicReject (deliveryTag, false);
            }

        }
    }

}

5.3.5、要点总结

  • 1、把消息确认模式改为 手动确认。
  • 2、调用 Channel 对象的方法返回信息。
    • ACK:Acknowledgement,表示消息处理成功。
    • NACK:Negative Acknowledgement,表示消息处理失败。
    • Reject:拒绝,同样表示消息处理失败。
  • 3、后续操作
    • requeue 为 true:重新放回队列,重新投递,再次尝试。
    • requeue 为 false:不放回队列,不重新投递。
  • 4、deliveryTag 消息的唯一标识,查找具体某一条消息的依据。

5.3.6、流程

六、交付标签机制


deliveryTag 是一个 64 位整数。
消费端把消息处理结果 ACK、NACK、Reject 等返回给 Broker 之后,Broker 需要对对应的消息执行后续操作,例如删除消息、重新排队或标记为死信等等那么 Broker 就必须知道它现在要操作的消息具体是哪一条。而 deliveryTag 作为消息的唯一标识就很好的满足了这个需求。
如果交换机是 Fanout 模式,同一个消息广播到了不同队列,deliveryTag 也不会重复,deliveryTag 是 broker 范围唯一的。

七、消费端限流

设定 prefetch

spring:
  rabbitmq:
    host: 192.168.200.100
    port: 5672
    username: guest
    password: 123456
    virtual-host: /
    listener:
      simple:
        acknowledge-mode: manual
        prefetch: 1 # 设置每次最多从消息队列服务器取回多少消息

八、消息超时

给消息设定一个过期时间,超过这个时间没有被取走的消息就会被删除。

  • 给消息设定一个过期时间,超过这个时间没有被取走的消息就会被删除。
  • 我们可以从两个层面来给消息设定过期时间:。
    • 队列层面:在队列层面设定消息的过期时间,并不是队列的过期时间。意思是这个队列中的消息全部使用同一个过期时间。
    • 消息本身:给具体的某个消息设定过期时间。
  • 如果两个层面都做了设置,那么哪个时间短,哪个生效。

8.1、队列层面设置


设置绑定关系:

测试:
不启动消费端程序。
向设置了过期时间的队列中发送 100 条消息。
等 10 秒后,看是否全部被过期删除。

8.2、消息层面设置

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;

@Test  
public void testSendMessageTTL () {  
  
    // 1、创建消息后置处理器对象  
    MessagePostProcessor messagePostProcessor = (Message message) -> {  
  
        // 设定 TTL 时间,以毫秒为单位
        message.getMessageProperties ().setExpiration ("5000");  
  
        return message;
    };
  
    // 2、发送消息  
    rabbitTemplate.convertAndSend (    
            EXCHANGE_DIRECT,     
            ROUTING_KEY,     
            "Hello atguigu", messagePostProcessor);    
}

发送到普通队列上:

九、死信和死信队列

概念:当一个消息无法被消费,它就变成了死信。
死信产生的原因大致有下面三种:

  • 拒绝:消费者拒接消息,basicNack ()/basicReject (),并且不把消息重新放入原目标队列,requeue=false。
  • 溢出:队列中消息数量到达限制。比如队列最大只能存储 10 条消息,且现在已经存储了 10 条,此时如果再发送一条消息进来,- 根据先进先出原则,队列中最早的消息会变成死信。
  • 超时:消息到达超时时间未被消费。

死信的处理方式大致有下面三种:

  • 丢弃:对不重要的消息直接丢弃,不做处理。
  • 入库:把死信写入数据库,日后处理。
  • 监听:消息变成死信后进入死信队列,我们专门设置消费端监听死信队列,做后续处理(通常采用)。

9.1、测试相关

9.1.1、创建死信交换机和死信队列

常规设定即可,没有特殊设置:

  • 死信交换机:exchange.dead.letter.video
  • 死信队列:queue.dead.letter.video
  • 死信路由键:routing.key.dead.letter.video

9.1.2、创建正常交换机和正常队列

注意:一定要注意正常队列有诸多限定和设置,这样才能让无法处理的消息进入死信交换机

正常交换机:exchange.normal.video
正常队列:queue.normal.video
正常路由键:routing.key.normal.video

全部设置完成后参照如下细节:

9.2、消费端拒收消息

9.2.1、发送消息的代码

@Test  
public void testSendMessageButReject () {  
    rabbitTemplate  
            .convertAndSend (  
                    EXCHANGE_NORMAL,  
                    ROUTING_KEY_NORMAL,  
                    "测试死信情况 1:消息被拒绝");  
}

9.2.2、接收消息的代码

1、监听正常队列

@RabbitListener (queues = {QUEUE_NORMAL})
public void processMessageNormal (Message message, Channel channel) throws IOException {
    // 监听正常队列,但是拒绝消息
    log.info ("★[normal] 消息接收到,但我拒绝。");
    channel.basicReject (message.getMessageProperties ().getDeliveryTag (), false);
}

2、监听死信队列

@RabbitListener (queues = {QUEUE_DEAD_LETTER})
public void processMessageDead (String dataString, Message message, Channel channel) throws IOException {  
    // 监听死信队列  
    log.info ("★[dead letter] dataString =" + dataString);
    log.info ("★[dead letter] 我是死信监听方法,我接收到了死信消息");
    channel.basicAck (message.getMessageProperties ().getDeliveryTag (), false);
}

9.3、消息数量超过队列容纳极限

1、发送消息的代码

@Test  
public void testSendMultiMessage () {  
    for (int i = 0; i < 20; i++) {  
        rabbitTemplate.convertAndSend (  
                EXCHANGE_NORMAL,  
                ROUTING_KEY_NORMAL,  
                "测试死信情况 2:消息数量超过队列的最大容量" + i);  
    }  
}

2、接收消息的代码

@RabbitListener (queues = {QUEUE_NORMAL})
public void processMessageNormal (Message message, Channel channel) throws IOException {
    // 监听正常队列
    log.info ("★[normal] 消息接收到。");
    channel.basicAck (message.getMessageProperties ().getDeliveryTag (), false);
}

3、执行效果
正常队列的参数如下图所示:

生产者发送 20 条消息之后,消费端死信队列接收到前 10 条消息:

9.4、消息超时未消费

1、发送消息的代码
正常发送一条消息即可,所以使用第一个例子的代码。

@Test
public void testSendMessageTimeout () {
    rabbitTemplate
            .convertAndSend (
                    EXCHANGE_NORMAL,
                    ROUTING_KEY_NORMAL,
                    "测试死信情况 3:消息超时");
}

2、执行效果

因为没有消费端监听程序,所以消息未超时前滞留在队列中:

消息超时后,进入死信队列:

十、延迟队列

推迟一段时间后执行指定操作。

10.1、基于死信的延迟队列

10.2、基于插件的延迟队列

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange

十一、事务消息


总结:

  • 在生产者端使用事务消息和消费端没有关系。
  • 在生产者端使用事务消息仅仅是控制事务内的消息是否发送。
  • 提交事务就把事务内所有消息都发送到交换机。
  • 回滚事务则事务内任何消息都不会被发送。
  • 事务控制对消费端无效。

测试回滚事务的情况:

@Test
@Transactional
@Rollback(value = true)
public void testSendMessageInTx() {
    // 1、发送第一条消息
    rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, "I am a dragon(tx msg [rollback] ~~~01)");

    // 2、发送第二条消息
    rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, "I am a dragon(tx msg [rollback] ~~~02)");
}

十二、惰性队列

创建队列时,在 Durability 这里有两个选项可以选择。

  • Durable:持久化队列,消息会持久化到硬盘上。
  • Transient:临时队列,不做持久化操作,broker 重启后消息会丢失。

未设置惰性模式时队列的持久化机制:

惰性队列的工作方式:

12.1、创建惰性队列

官网说明:

队列可以创建为默认或惰性模式,模式指定方式是:

  • 使用队列策略(建议)
  • 设置 queue.declare 参数

如果策略和队列参数同时指定,那么队列参数有更高优先级。如果队列模式是在声明时通过可选参数指定的,那么只能通过删除队列再重新创建来修改。

1、基于策略方式设定:

# 登录Docker容器
docker exec -it rabbitmq /bin/bash

# 运行rabbitmqctl命令
rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues

命令解读:

  • rabbitmqctl 命令所在目录是:/opt/rabbitmq/sbin,该目录已配置到 Path 环境变量。
  • set_policy 是子命令,表示设置策略。
  • Lazy 是当前要设置的策略名称,是我们自己自定义的,不是系统定义的。
  • "^lazy-queue$" 是用正则表达式限定的队列名称,凡是名称符合这个正则表达式的队列都会应用这里的设置。
  • '{"queue-mode":"lazy"}' 是一个 JSON 格式的参数设置指定了队列的模式为 "lazy"。
  • –-apply-to 参数指定该策略将应用于队列(queues)级别。
  • 命令执行后,所有名称符合正则表达式的队列都会应用指定策略,包括未来新创建的队列。

如果需要修改队列模式可以执行如下命令(不必删除队列再重建):

rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"default"}' --apply-to queues

2、在声明队列时使用参数设定

  • 参数名称:x-queue-mode
  • 可用参数值:
    • default
    • lazy
  • 不设置就是取值为 default

Java 代码原生 API 设置方式:

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-queue-mode", "lazy");
channel.queueDeclare("myqueue", false, false, false, args);

Java 代码注解设置方式:

@Queue(value = QUEUE_NAME, durable = "true", autoDelete = "false", arguments = {
    @Argument(name = "x-queue-mode", value = "lazy")
})

12.2、测试代码

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class MyLazyMessageProcessor {

    public static final String EXCHANGE_LAZY_NAME = "exchange.atguigu.lazy";
    public static final String ROUTING_LAZY_KEY = "routing.key.atguigu.lazy";
    public static final String QUEUE_LAZY_NAME = "queue.atguigu.lazy";

    @RabbitListener(bindings = @QueueBinding(
        value = @Queue(value = QUEUE_LAZY_NAME, durable = "true", autoDelete = "false", arguments = {
            @Argument(name = "x-queue-mode", value = "lazy")
        }),
        exchange = @Exchange(value = EXCHANGE_LAZY_NAME, durable = "true", autoDelete = "false"),
        key = {ROUTING_LAZY_KEY}
    ))
    public void processMessageLazy(String data, Message message, Channel channel) {
        log.info("消费端接收到消息:" + data);
    }

}

基于消费端 @RabbitListener 注解中的配置,自动创建了队列:

12.3、惰性队列:应用场景

使用惰性队列的主要原因之一是支持非常长的队列(数百万条消息)。由于各种原因,排队可能会变得很长:

  • 消费者离线/崩溃/停机进行维护。
  • 突然出现消息进入高峰,生产者的速度超过了消费者。
  • 消费者比正常情况慢。

十三、优先级队列

机制说明:

  • 默认情况:基于队列先进先出的特性,通常来说,先入队的先投递。
  • 设置优先级之后:优先级高的消息更大几率先投递。
  • 关键参数:x-max-priority

消息的优先级设置:
RabbitMQ 允许我们使用一个正整数给消息设定优先级。
消息的优先级数值取值范围:1~255。
RabbitMQ 官网建议在1~5之间设置消息的优先级(优先级越高,占用CPU、内存等资源越多)。
操作:

  • 队列在声明时可以指定参数:x-max-priority。
  • 默认值:0 此时消息即使设置优先级也无效。
  • 指定一个正整数值:消息的优先级数值不能超过这个。

13.1、创建相关资源

创建交换机:

创建队列:


队列绑定交换机:

13.2、测试代码

生产者:

import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
public class RabbitMQTest {

    public static final String EXCHANGE_PRIORITY = "exchange.test.priority";
    public static final String ROUTING_KEY_PRIORITY = "routing.key.test.priority";

    @Resource
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSendMessage() {
        rabbitTemplate.convertAndSend(EXCHANGE_PRIORITY, ROUTING_KEY_PRIORITY, "I am a message with priority 1.", message->{
            message.getMessageProperties().setPriority(1);
            return message;
        });
    }
}

消费者:

@Slf4j
@Component
public class MyMessageProcessor {

    public static final String QUEUE_PRIORITY = "queue.test.priority";

    @RabbitListener(queues = {QUEUE_PRIORITY})
    public void processPriorityMessage(String data, Message message, Channel channel) throws IOException {
        log.info(data);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

评论

暂无

添加新评论