简单案例

发送消息

@Test
void testSimplateQueue() {
    // 队列名称
    String queueName = "simple.queue";
    // 消息
    String message = "Hello, Spring amqp!";
    // 发送消息
    rabbitTemplate.convertAndSend(queueName, message);
}

消费消息

@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String message) {
    log.info("收到消息: {}", message);
}

Work Queues

  • 多个消费者绑定到一个队列,可以加快消息的处理速度

  • 同一条消息只会被一个消费者处理

  • 通过设置prefetch来控制消费者预取的消息数量,处理完一条再处理下一条,失效能者多劳

  rabbitmq:
    listener:
      simple:
        prefetch: 1 # 每次只消费一个消息

交换机

  • 接收publisher发送的消息

  • 将消息按照规则路由到与之绑定的队列

Fanout交换机

  • FanoutExchange的会将消息路由到每个绑定的队列

发送消息给交换机
    @Test
    void testSendFanout() {
        // 交换机
        String exchangeName = "hmall.fanout";
        // 消息
        String message = "Hello, ereryone";
        // 发送消息
        rabbitTemplate.convertAndSend(exchangeName, "",message);
    }
监听消息
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String message) {
    // 绿色
//        log.info("\u001B[32m消费者1,收到[work.queue]消息: {}\u001B[0m", message);
    // systemOut绿色
    System.out.println("\u001B[32m消费者1,收到[fanout.queue1]消息: " + message + "\u001B[0m");

}

@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String message) {
    // 蓝色
    // systemOut蓝色
    System.out.println("\u001B[34m消费者2,收到[fanout.queue2]消息: " + message + "\u001B[0m");

}

Direct交换机

Direct Exchange 会将接收到的消息根据规则路由到指定的Queue,因此称为定向路由

  • 每一个Queue都与Exchange设置一个BindingKey

  • 发布者发送消息时,指定消息的RoutingKey

  • Exchange将消息路由到BindingKey与消息RoutingKey一致的队列

描述下Driect交换机与Fanout交换机的差异?

  • Fanout交换机将消息路由给每一个与之绑定的队列

  • Driect交换机通过RoutingKey判断路由给哪个队列

  • 如果多个队列具有相同的RoutingKey,则与Fanout功能类似

Topic交换机

  • TopicExchange与DirectExchange类似,区别在于routingKey可以是多个单词列表,并且以"." 点分割。

  • Queue与Exchange指定BindingKey时可以使用通配符:

    • #:代表0个或多个单词

    • *:代指一个单词

描述下Direct交换机与Topic交换机的差异?

  • Topic交换机收到的消息RoutingKey可以是多个单词,以"." 分割

  • Topic交换机与队列绑定时的bindingKey可以指定通配符

  • #:代表0个或多个单词

  • *:代表1个词

第一种声明交换机的方式

  • Queue

  • FanoutExchange、DirectExchange、TopicExchange

  • Binding

@Configuration
public class FanoutConfiguration {

    /**
     * 定义一个Fanout类型的交换机
     *
     * @return FanoutExchange
     */
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("hmall.fanout.java");
    }

    /**
     * 定义一个队列
     *
     * @return Queue
     */
    @Bean
    public Queue fanoutQueue1() {
        // 持久化
        return new Queue("fanout.java.queue1");
    }

    /**
     * 定义一个队列
     *
     * @return Queue
     */
    @Bean
    public Queue fanoutQueue2() {
        // 持久化
        return new Queue("fanout.java.queue2");
    }

    /**
     * 交换机与队列绑定
     */
    @Bean
    public Binding bindingFanoutQueue1(FanoutExchange fanoutExchange, @Qualifier("fanoutQueue1") Queue fanoutQueue1) {
        // 将fanoutExchange交换机与fanoutQueue1队列绑定
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }

    /**
     * 交换机与队列绑定
     */
    @Bean
    public Binding bindingFanoutQueue2(FanoutExchange fanoutExchange, @Qualifier("fanoutQueue2") Queue fanoutQueue2) {
        // 将fanoutExchange交换机与fanoutQueue2队列绑定
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
    }

}

第二种声明交换机的方式

  • @Queue

  • @Exchange

  • @QueueBinding

@Component
@Slf4j
public class DirectListener {

    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue(value = "direct.queue1", durable = "true"),
                    exchange = @Exchange(value = "hmall.direct.java", type = "direct"),
                    key = {"blue", "red"}
            )
    })
    public void listenDirectQueue1(String message) {
        // 绿色
        //        log.info("\u001B[32m消费者1,收到[work.queue]消息: {}\u001B[0m", message);
        // systemOut绿色
        System.out.println("\u001B[32m消费者1,收到[Direct.queue1]消息: " + message + "\u001B[0m");

    }

    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue(value = "direct.queue2", durable = "true"),
                    exchange = @Exchange(value = "hmall.direct.java", type = "direct"),
                    key = {"red", "yellow"}
            )
    })
    public void listenDirectQueue2(String message) {
        // 蓝色
        // systemOut蓝色
        System.out.println("\u001B[34m消费者2,收到[Direct.queue2]消息: " + message + "\u001B[0m");

    }


}

消息转换器

  • 建议采用JSON序列化替代默认的JDK序列化

  • 在publisher和consumer中都配置MeesageConverter


@Bean
public MessageConverter jacksonMessageConverter() {
	// 使用Jackson消息转换器
	return new Jackson2JsonMessageConverter();
}

连接失败时重试机制(生产者)

rabbitmq:
  # 连接失败时的重试配置
  connection-timeout: 1s # 连接超时时间1秒
  template:
    retry:
      enabled: true # 启用重试
      initial-interval: 1000ms # 初始间隔时间
      multiplier: 1 # 失败后下次等待时长倍数,下次等待时长 = initial-interval * multiplier
      max-attempts: 3 # 最大重试次数
  • 当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率,不过SpringAMQP提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的,会影响业务性能。

  • 如果对业务性能有要求,建议禁用重试机制,如果一定要使用,请合理配置等待时长和重试机制次数,当然也可以考虑使用异步线程来执行发送消息代码。

生产者确认机制

RabbitMQ提供了Publisher Confirm 和 Publisher Return两种确认机制,开启确认机制后,在MQ成功收到消息后会返回确认消息给生产者。返回的结果如以下几种情况。:

  • 消息投递到了MQ,但是路由失败,此时会通过PublisherReturn返回路由异常原因,返回ACK,告知投递成功。

  • 临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功。

  • 持久消息投递到了MQ,并且入队完成持久化,返回ACK,告知投递成功。

  • 其它情况都会返回NACK,告知投递失败。

配置

rabbitmq:
  template:
    mandatory: true # 如果消息无法投递到交换机,RabbitMQ会将消息返回给生产者
  # 生产者确认机制
  # none 关闭confirm机制
  # simple 开启confirm机制,同步阻塞等待MQ的回执消息
  # correlated MQ异步回调方式返回回执消息
  publisher-confirm-type: correlated # 开启publisher confirm机制 并设置 confirm类型为correlated
  publisher-returns: true # 开启publisher returns机制

交换机ACK确认机制

@Test
void testPublisherConfirm2() throws InterruptedException {
    // 交换机
    String exchangeName = "hmall.direct";
    // routingKey
    String routingKey = "yellow";
    // 消息
    String message = "Hello!";
    // 创建CorrelationData对象
    CorrelationData cd = new CorrelationData();
    // 给Future添加ConfirmCollback回调
    cd.getFuture().thenAccept(confirm -> {
        if (confirm.isAck()) {
            log.info("消息发送成功,消息ID: {}", cd.getId());
        } else {
            log.error("消息发送失败,消息ID: {}", cd.getId());
        }
    }).exceptionally(ex -> {
        log.error("消息发送异常,消息ID: {}", cd.getId(), ex);
        return null;
    });
    // 发送消息
    rabbitTemplate.convertAndSend(exchangeName, routingKey, message, cd);
    // 等待回调执行完毕再结束测试
    Thread.sleep(5000);
}

@Test
void testPublisherConfirm() throws InterruptedException {
    // 交换机
    String exchangeName = "hmall.direct";
    // routingKey
    String routingKey = "yellow";
    // 消息
    String message = "Hello!";
    // 创建CorrelationData对象
    CorrelationData cd = new CorrelationData();
    // 给Future添加ConfirmCollback回调
    cd.getFuture().whenComplete((confirm, ex) -> {
        if (ex != null) {
            log.error("消息发送异常,消息ID: {}", cd.getId(), ex);
        } else if (confirm.isAck()) {
            log.info("消息发送成功,消息ID: {}", cd.getId());
        } else {
            log.error("消息发送失败,消息ID: {}", cd.getId());
        }
    });
    // 发送消息
    rabbitTemplate.convertAndSend(exchangeName, routingKey, message, cd);
    // 等待回调执行完毕再结束测试
    Thread.sleep(5000);
}

ReturnsCback(用于处理 Exchange 到 Queue 路由失败的情况)

@Configuration
@Slf4j
public class RabbitConfig {

    @Bean
    public RabbitTemplate rabbitTemplate(CachingConnectionFactory cf) {
        // 强制消息未路由时退回
        cf.setPublisherReturns(true);
        RabbitTemplate template = new RabbitTemplate(cf);
        template.setMandatory(true);

        // ReturnsCallback 回调:处理路由失败的消息
        template.setReturnsCallback(returned -> {
            // 获取退回的原始消息体
            String body = new String(returned.getMessage().getBody(), StandardCharsets.UTF_8);
            // 打印日志:包含交换机、路由键、回复码/原因和消息内容
            log.error("[消息退回] exchg={}, routingKey={}, code={}, text={}, body={}",
                    returned.getExchange(),       // 交换机名称
                    returned.getRoutingKey(),      // 路由键
                    returned.getReplyCode(),       // Broker 返回码
                    returned.getReplyText(),       // Broker 返回原因
                    body                           // 消息内容
            );
        });
        return template;
    }

}
@Configuration
@Slf4j
public class RabbitConfig implements ApplicationContextAware {

    @Override
    public void setApplicationContext(@NonNull ApplicationContext applicationContext) throws BeansException {
        // 获取RabbitTemplate Bean
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        // 设置消息发送确认回调
        rabbitTemplate.setReturnsCallback(returnedMessage -> {
            // 取出原始消息体并转成字符串
            String body = new String(returnedMessage.getMessage().getBody(), StandardCharsets.UTF_8);
            // 记录日志:交换机、路由键、返回码、原因、消息内容
            log.error("[消息退回] exchg={}, routingKey={}, code={}, text={}, body={}",
                    returnedMessage.getExchange(),        // 交换机名称
                    returnedMessage.getRoutingKey(),      // 路由键
                    returnedMessage.getReplyCode(),       // Broker 返回码
                    returnedMessage.getReplyText(),       // Broker 返回原因
                    body);                         // 原始消息内容
        });
    }
}
@Configuration
@Slf4j
public class MyRabbitCustomizer implements RabbitTemplateCustomizer {

    @Override
    public void customize(RabbitTemplate rabbitTemplate) {
        // 设置消息发送确认回调
        rabbitTemplate.setReturnsCallback(returnedMessage -> {
            // 取出原始消息体并转成字符串
            String body = new String(returnedMessage.getMessage().getBody(), StandardCharsets.UTF_8);
            // 记录日志:交换机、路由键、返回码、原因、消息内容
            log.error("[消息退回] exchg={}, routingKey={}, code={}, text={}, body={}",
                    returnedMessage.getExchange(),        // 交换机名称
                    returnedMessage.getRoutingKey(),      // 路由键
                    returnedMessage.getReplyCode(),       // Broker 返回码
                    returnedMessage.getReplyText(),       // Broker 返回原因
                    body);                         // 原始消息内容
        });
    }
}

如何处理生产者的确认消息?

  • 生产者确认需要额外的网络和系统资源开销,尽量不要使用

  • 如果一定要使用,无须开启Publicsher机制,因为一般路由失败是自己业务问题

  • 对于nack消息可以有限次数重试,依然失败则记录异常消息

消息持久化

@Test
void testSendMessage() {
    // 并行发送1百万条消息,分组交给每个线程处理
    int messageCount = 1000000;
    int threadCount = 6; // 线程数,可根据实际情况调整
    int batchSize = messageCount / threadCount; // 每个线程处理的消息数量
    CompletableFuture<?>[] futures = new CompletableFuture[threadCount];
    for (int t = 0; t < threadCount; t++) {
        final int start = t * batchSize; // 每个线程处理的起始索引
        final int end = (t == threadCount - 1) ? messageCount : (t + 1) * batchSize; // 计算结束索引,最后一个线程处理剩余的消息
        futures[t] = CompletableFuture.runAsync(() -> {
            String queueName = "simple.queue";
            // MessageDeliveryMode.NON_PERSISTENT 非持久化的
            // MessageDeliveryMode.PERSISTENT 持久化的
            Message msg = MessageBuilder.withBody(queueName.getBytes(StandardCharsets.UTF_8))
                    .setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();
            for (int i = start; i < end; i++) {
                String message = "Hello, Spring amqp! Message " + i;
                rabbitTemplate.convertAndSend(queueName, msg);
            }
        });
    }

    // 等待所有异步任务完成
    CompletableFuture.allOf(futures).join();
    log.info("所有异步任务已完成");
}

LazyQueue(3.12、3.13版本后默认开启,无须手动配置和发送消息时配置)

// 针对3.12版本之前的配置
@RabbitListener(bindings = {
        @QueueBinding(
                value = @Queue(value = "direct.queue1", durable = "true",
                        arguments = @Argument(name = "x-queue-mode", value = "lazy")),
                exchange = @Exchange(value = "hmall.direct.java", type = "direct"),
                key = {"blue", "red"}
        )
})
public void listenDirectQueue1(String message) {
    // 绿色
    //        log.info("\u001B[32m消费者1,收到[work.queue]消息: {}\u001B[0m", message);
    // systemOut绿色
    System.out.println("\u001B[32m消费者1,收到[Direct.queue1]消息: " + message + "\u001B[0m");

}
@Bean
public Queue lazyQueue() {
    return QueueBuilder
        .durable("lazy.queue")
        .lazy()     // 等同于 x-queue-mode=lazy
        .build();
}

RabbitMQ如何保证消息的可靠性?

  • 首先通过配置可以让交换机、队列、以及发送消息都持久化,这样队列中的消息会持久化到磁盘,MQ重启消息依然存在。

  • RabbitMQ在3.6版本引入了LazyQueue,并且在3.12版本后称为队列的默认模式,LazyQueue会将所有消息都持久化。

  • 开启持久化和生产者确认模式,RabbitMQ只有在消息持久化完成后才会给生产者返回ACK回执。

消费者确认机制

SpringAMQP已经实现了消息确认功能。并允许我们通过配置文件选择ACK处理方式,有三种方式

  • none:不处理,即消息投递给消费者后立刻ack,消息会立即从MQ删除,非常不安全,不建议使用

  • manual:手动模式。需要自己在业务中调用api,发送ack或reject,存在业务入侵,但是灵活

  • auto:自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack

    • 当业务出现异常时,根据异常判断返回不同的结果

      • 如果是业务异常,自动返回nack

      • 如果消息处理或校验异常,自动返回reject

配置

rabbitmq:
  listener:
    simple:
      # none 表示不使用事务
      # manual 表示手动确认
      # auto 表示自动确认
      acknowledge-mode: auto

本地重试机制

rabbitmq:
  listener:
    simple:
      retry:
          # 消费者失败重试配置
          enabled: true # 开启消费者失败重试
          initial-interval: 1000ms # 初始间隔时间为1秒
          multiplier: 1 # 失败后下次等待时长倍数,下次等待时长 = initial-interval * multiplier
          max-attempts: 3 # 最大重试次数
          stateless: true # true为无状态;false有状态。如果业务中包含事务,这里改为false

失败消息处理策略

在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MeesageRecoverer接口来处理。它包含三种不同的实现:

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式

  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队

  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

消费者如何保证消息一定被消费?

  • 开启消费者确认机制为auto,由Spring确认消息处理成功后返回ack,异常时返回nack

  • 开启消费者失败重试机制,并设置MessageRecoverer,多次重试失败后将消息投递到异常交换机,交由人工处理

重试达到次数后到指定交换机

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@ConditionalOnProperty(prefix = "spring.rabbitmq.listener.simple.retry",name = "enabled", havingValue = "true") // 仅在启用重试时加载此配置
public class ErrorConfiguration {

    @Bean
    public DirectExchange errorExchange() {
        // 创建一个名为 "error.direct.java" 的 DirectExchange
        return new DirectExchange("error.direct.java", true, false, null);
    }

    @Bean
    public Queue errorQueue() {
        // 创建一个名为 "error.queue" 的队列
        return new Queue("error.queue", true, false, false, null);
    }

    @Bean
    public Binding errorBinding(@Qualifier("errorExchange") DirectExchange errorExchange, @Qualifier("errorQueue") Queue errorQueue) {
        // 将队列绑定到 DirectExchange 上,并指定路由键为 "error"
        return BindingBuilder.bind(errorQueue).to(errorExchange).with("error");
    }

    @Bean
    public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {
        // 创建一个自定义的 MessageRecoverer 实现
        return new RepublishMessageRecoverer(rabbitTemplate,"error.direct.java", "error");

    }

}

业务幂等性

唯一消息ID

方案一、是给每个消息都设置一个唯一ID,利用ID区分是否重复消息:

  • 每一条消息都生成一个唯一的ID,与消息一起投递给消费者。

  • 消费者接收到消息后处理自己的业务,业务处理成功后将消息ID保存至数据库。

  • 如果下次又收到同样的消息,去数据库查询判断是否存在,存在则为重复消息放弃处理。

 // 开启
@Bean
public MessageConverter jacksonMessageConverter() {
	// 使用Jackson消息转换器
	Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
	// 设置消息ID
	jackson2JsonMessageConverter.setCreateMessageIds(true);
	return jackson2JsonMessageConverter;
}

死信交换机

当一个队列中的消息满足下列情况之一时,就回称为死信(dead letter):

  • 消费者使用basic.reject或basic.nack声音消息失败,并且消息的requeue参数设置为false

  • 消息是一个过期消息(达到了队列或者消息本身设置的过期时间)超时无人消费

  • 要投递的队列消息堆积满了,最早的消息可能成为死信

如果队列通过dead-letter-exchange属性指定了一个交换机,那么该队列中的死信就会投递到这个交换机中,这个交换机称为死信交换机(Dead Letter Exchange,简称DLX)

发送带expiration的消息

  @Test
  void testSendTTLMessage() throws InterruptedException {
      // 消息
      Map<String, Object> message = new HashMap<>();
      message.put("name", "uluckyxh");
      // 交换机
      String exchange = "simple.direct";
      // binding key
      String routingKey = "hi";
      // 过期时间
      String expiration = "10000"; // 10秒
      // 使用 Jackson2JsonMessageConverter 自动将 POJO 转为 JSON 并发送
      // 通过 MessagePostProcessor 在发送前设置消息属性(如 TTL、持久化)
      rabbitTemplate.convertAndSend(
              exchange,
              routingKey,
              message,
              msg -> {
                  // 5.1 设置消息过期时间
                  msg.getMessageProperties().setExpiration(expiration);
                  // 5.2 设置消息持久化
                  msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                  return msg;
              }
      );
      log.info("发送消息到交换机[{}],路由键[{}],内容: {}", exchange, routingKey, message);
      Thread.sleep(5000);
  }

监听死信交换机


@RabbitListener(queues = "dlx.queue")
public void listenDlxQueue(Map<String,Object> message) {
    // 打印接收到的消息
    log.info("消费者,收到[dlx.queue]消息: {}", message);
//        throw new RuntimeException("模拟异常,触发死信队列");
}

延迟消息插件

Docker安装插件

DockerFile

# 选择带管理界面的官方镜像,版本需与插件一致          # ← 修改版本时记得同时换插件
FROM rabbitmq:3-management-alpine

# 将宿主机 plugins 目录下的 .ez 插件 COPY 到镜像内插件目录  # ← 只 COPY 你自己的插件
COPY plugins/rabbitmq_delayed_message_exchange-3.13.0.ez /opt/rabbitmq/plugins/

# 离线启用延迟消息插件(管理 UI 已自带,无需再启)        # ← --offline 表示不启动节点也能启用
RUN rabbitmq-plugins enable --offline rabbitmq_delayed_message_exchange

# 暴露端口(可选,Compose 里也会映射)                   # ← 5672 AMQP;15672 Web UI
EXPOSE 5672 15672

docker-compose.yml

version: "3.9"                                         # Compose 文件版本

services:
  rabbitmq:                                            # 服务名称
    build: .                                           # 使用当前目录的 Dockerfile 构建镜像
    container_name: rabbitmq                           # 指定容器名称
    hostname: rabbitmq-host                            # 容器内主机名
    ports:
      - "5672:5672"                                    # 宿主机 5672 -> 容器 5672 (AMQP)
      - "15672:15672"                                  # 宿主机 15672 -> 容器 15672 (Web UI)
    volumes:
      - ./rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf:ro  # 自定义配置文件(只读)
      - ./data:/var/lib/rabbitmq/mnesia  # 持久化数据目录
      - ./logs:/var/log/rabbitmq  # 日志目录
    environment:
      RABBITMQ_ERLANG_COOKIE: "secret_cookie"          # Erlang 集群通信用 Cookie(保持一致即可)
    restart: always                                    # 容器异常退出自动重启
    healthcheck:                                       # 健康检查设置
      test: ["CMD", "rabbitmqctl", "status"]           # 通过 rabbitmqctl 检查节点状态
      interval: 30s                                    # 每 30 s 执行一次
      timeout: 30s                                     # 单次检查超时 30 s
      start_period: 30s                                # 容器启动后 30 s 再开始检查
      retries: 3                                       # 连续失败 3 次视为不健康

一次启动多个插件的案例

# 以与插件版本对应的 RabbitMQ 镜像为基础
FROM rabbitmq:3.13.0-management-alpine      # ← 换版本时记得同步插件

# 1️⃣ 复制宿主机 plugins 目录下所有 .ez 文件到镜像
#    这样后续只要把新插件放进 plugins/ 就不用改 Dockerfile 了
COPY plugins/*.ez /opt/rabbitmq/plugins/

# 2️⃣ 在同一个 RUN 层里一次性启用所有插件
#    写成空格分隔即可,避免多层镜像 & 减少构建时间
RUN rabbitmq-plugins enable --offline \
      rabbitmq_delayed_message_exchange \
      rabbitmq_shovel rabbitmq_shovel_management \
      rabbitmq_peer_discovery_k8s \
      rabbitmq_federation rabbitmq_federation_management

# 如需暴露端口,可留在这里,或交给 docker-compose.yml 来映射
EXPOSE 5672 15672

启动命令

# 构建镜像并启动服务(首次会下载基础镜像,稍等片刻)
docker compose up -d --build

# 查看插件是否启用成功
docker exec -it rabbitmq rabbitmq-plugins list -E | grep delayed
# 期望输出: [E*] rabbitmq_delayed_message_exchange  3.13.0

声明交换机和队列

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "delay.queue", durable = "true"),
        exchange = @Exchange(name = "delay.exchange", delayed = "true"),
        key = "delay"
))
public void listenDelayQueue(Map<String, Object> message) {
    // 打印接收到的消息
    log.info("消费者,收到[delay.queue]消息: {}", message);
}

发送delay消息

@Test
void testSendDelayMessage() throws InterruptedException {
    // 消息
    String message = "Hello, delay message!";
    // 交换机
    String exchange = "delay.exchange";
    // binding key
    String routingKey = "delay";
    // 使用 Jackson2JsonMessageConverter 自动将 POJO 转为 JSON 并发送
    // 通过 MessagePostProcessor 在发送前设置消息属性(如 TTL、持久化)
    rabbitTemplate.convertAndSend(
            exchange,
            routingKey,
            message,
            msg -> {
                // 设置消息类型
                msg.getMessageProperties().getHeaders().put("__TypeId__", "java.lang.String");
                // 设置消息过期时间为 10 秒
                msg.getMessageProperties().setDelayLong(10000L);
                // 设置消息持久化
                msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                return msg;
            }
    );
    log.info("发送延迟消息到交换机[{}],路由键[{}],内容: {}", exchange, routingKey, message);
    Thread.sleep(5000);
}

RabbitMQ延迟消息插件(x-delayed-message)的弊端

核心问题:mandatory机制完全失效

什么是mandatory?

  • mandatory=true:如果消息无法路由到任何队列,Broker会立即返回basic.return给生产者

  • mandatory=false:Broker不检查路由结果,不会有任何返回

延迟插件的致命缺陷

问题根源: x-delayed-message插件把路由动作推迟到未来执行,但mandatory检查是立即进行的!

mandatory设置

会收到ReturnsCallback吗?

结果

false

❌ 不会

正常工作,推荐

true

100%会收到

每次都误报路由失败!

具体表现

// 延迟消息 + mandatory=true 的问题演示
rabbitTemplate.setMandatory(true);
rabbitTemplate.convertAndSend("delay.exchange", "routing.key", "消息", message -> {
    message.getMessageProperties().setDelay(5000); // 5秒延迟
    return message;
});

// 结果:
// 1. 立刻收到 ReturnsCallback,replyCode=312 (NO_ROUTE)
// 2. 5秒后消息正常投递给消费者
// 3. 开发者困惑:明明成功了为什么说路由失败?

官方明确说明的限制

Closely related to the above, the mandatory flag is not supported by this exchange

原因:

  1. 延迟投递时无法保证队列仍然存在

  2. 延迟投递时原始发布连接可能已断开,无法接收basic.return

解决方案对比

方案1:分离Template(复杂)

// 普通消息Template
@Bean("normalTemplate")
public RabbitTemplate normalTemplate() {
    RabbitTemplate template = new RabbitTemplate(connectionFactory);
    template.setMandatory(true); // 可以用mandatory
    return template;
}

// 延迟消息Template  
@Bean("delayTemplate")
public RabbitTemplate delayTemplate() {
    RabbitTemplate template = new RabbitTemplate(connectionFactory);
    template.setMandatory(false); // 必须关闭mandatory
    return template;
}

方案2:TTL+死信队列(推荐)

// 只需要一个Template,功能完整
@Bean
public RabbitTemplate rabbitTemplate() {
    RabbitTemplate template = new RabbitTemplate(connectionFactory);
    template.setMandatory(true); // mandatory正常工作
    return template;
}

// 通过TTL过期 + 死信队列实现延迟

其他弊端

1. 插件依赖

  • 需要手动安装rabbitmq_delayed_message_exchange插件

  • 插件版本需要与RabbitMQ版本匹配

  • 增加运维复杂度

2. 性能考虑

  • 延迟消息存储在内存中,大量延迟消息会占用内存

  • 重启后延迟消息可能丢失(取决于配置)

3. 监控盲区

  • mandatory失效导致无法通过常规手段检测路由失败

  • 需要额外的监控机制

TTL+死信队列的性能弊端

CPU压力问题

核心问题: TTL检查机制会对CPU造成持续压力

场景举例:
- 业务高并发:每秒1000个订单
- 延迟时间长:每个订单30分钟后检查支付状态
- 结果:30分钟内累积1,800,000个延迟消息在队列中!

什么情况下会产生大量延迟消息?

  1. 延迟时间设置过长

    // 危险示例:2小时后执行
    message.getMessageProperties().setExpiration("7200000"); 
    
  2. 业务并发量大

    // 电商场景:每分钟数千个订单,每个都需要30分钟后检查支付
    // 30分钟内累积的延迟消息 = 1000 × 30 = 30,000条
    
  3. 多种延迟任务混合

    // 同时有:
    // - 15分钟后的支付检查
    // - 1小时后的库存释放  
    // - 24小时后的订单清理
    

CPU压力的根本原因

TTL检查机制: RabbitMQ需要不断检查每条消息是否过期

队列中的消息越多 → TTL检查频率越高 → CPU占用越大

具体表现:

  • 消息堆积时,CPU使用率明显上升

  • 队列越长,响应延迟越大

  • 极端情况下可能影响整个RabbitMQ性能

性能对比分析

延迟时间

并发量

30分钟内消息堆积

CPU压力

适用性

30秒

1000/分钟

500条

✅ 低

推荐

5分钟

1000/分钟

5,000条

⚠️ 中等

可接受

30分钟

1000/分钟

30,000条

❌ 高

不推荐

2小时

1000/分钟

120,000条

❌ 很高

危险

优化建议

1. 限制延迟时间范围

// 建议延迟时间不超过10分钟
if (delayTime > 600000) { // 10分钟
    throw new IllegalArgumentException("延迟时间过长,请使用定时任务替代");
}

2. 分层处理策略

// 短延迟:使用TTL+死信队列(< 10分钟)
if (delayTime <= 600000) {
    sendToTTLQueue(message, delayTime);
}
// 长延迟:使用定时任务(> 10分钟)  
else {
    scheduleTask(message, delayTime);
}

3. 监控队列长度

// 监控延迟队列消息堆积情况
@Component
public class DelayQueueMonitor {
    
    @Scheduled(fixedRate = 60000) // 每分钟检查
    public void monitorQueueLength() {
        int queueLength = getQueueMessageCount("delay.queue");
        if (queueLength > 10000) {
            log.warn("延迟队列消息堆积过多:{}", queueLength);
        }
    }
}

最终方案选择

短延迟场景(< 10分钟)

推荐:TTL+死信队列

  • ✅ mandatory机制正常工作

  • ✅ 原生RabbitMQ功能

  • ✅ CPU压力可接受

长延迟场景(> 10分钟)

推荐:定时任务框架

// 使用Quartz、XXL-JOB等
@Scheduled(fixedRate = 60000)
public void checkDelayedTasks() {
    // 从数据库查询到期任务执行
}

绝对避免的方案

x-delayed-message插件

  • ❌ mandatory机制失效

  • ❌ 需要复杂的Template分离

  • ❌ 增加插件依赖

  • ❌ 容易产生误解和调试困难

记忆口诀

  • 延迟插件mandatory必坑,TTL死信短时长!

  • 长延迟用定时任务,MQ别扛大压力!

实践建议总结

  1. 10分钟内延迟 → TTL+死信队列

  2. 10分钟以上延迟 → 定时任务 + 数据库

  3. 绝不使用 → x-delayed-message插件

  4. 时刻监控 → 队列长度和CPU使用率