简单案例
发送消息
@Test
void testSimplateQueue() {
// 队列名称
String queueName = "simple.queue";
// 消息
String message = "Hello, Spring amqp!";
// 发送消息
rabbitTemplate.convertAndSend(queueName, message);
}
消费消息
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String message) {
log.info("收到消息: {}", message);
}
Work Queues
多个消费者绑定到一个队列,可以加快消息的处理速度
同一条消息只会被一个消费者处理
通过设置prefetch来控制消费者预取的消息数量,处理完一条再处理下一条,失效能者多劳
rabbitmq:
listener:
simple:
prefetch: 1 # 每次只消费一个消息
交换机
接收publisher发送的消息
将消息按照规则路由到与之绑定的队列
Fanout交换机
FanoutExchange的会将消息路由到每个绑定的队列
发送消息给交换机
@Test
void testSendFanout() {
// 交换机
String exchangeName = "hmall.fanout";
// 消息
String message = "Hello, ereryone";
// 发送消息
rabbitTemplate.convertAndSend(exchangeName, "",message);
}
监听消息
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String message) {
// 绿色
// log.info("\u001B[32m消费者1,收到[work.queue]消息: {}\u001B[0m", message);
// systemOut绿色
System.out.println("\u001B[32m消费者1,收到[fanout.queue1]消息: " + message + "\u001B[0m");
}
@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String message) {
// 蓝色
// systemOut蓝色
System.out.println("\u001B[34m消费者2,收到[fanout.queue2]消息: " + message + "\u001B[0m");
}
Direct交换机
Direct Exchange 会将接收到的消息根据规则路由到指定的Queue,因此称为定向路由
每一个Queue都与Exchange设置一个BindingKey
发布者发送消息时,指定消息的RoutingKey
Exchange将消息路由到BindingKey与消息RoutingKey一致的队列
描述下Driect交换机与Fanout交换机的差异?
Fanout交换机将消息路由给每一个与之绑定的队列
Driect交换机通过RoutingKey判断路由给哪个队列
如果多个队列具有相同的RoutingKey,则与Fanout功能类似
Topic交换机
TopicExchange与DirectExchange类似,区别在于routingKey可以是多个单词列表,并且以
"."
点分割。Queue与Exchange指定BindingKey时可以使用通配符:
#:代表0个或多个单词
*:代指一个单词
描述下Direct交换机与Topic交换机的差异?
Topic交换机收到的消息RoutingKey可以是多个单词,以
"."
分割Topic交换机与队列绑定时的bindingKey可以指定通配符
#:代表0个或多个单词
*:代表1个词
第一种声明交换机的方式
Queue
FanoutExchange、DirectExchange、TopicExchange
Binding
@Configuration
public class FanoutConfiguration {
/**
* 定义一个Fanout类型的交换机
*
* @return FanoutExchange
*/
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("hmall.fanout.java");
}
/**
* 定义一个队列
*
* @return Queue
*/
@Bean
public Queue fanoutQueue1() {
// 持久化
return new Queue("fanout.java.queue1");
}
/**
* 定义一个队列
*
* @return Queue
*/
@Bean
public Queue fanoutQueue2() {
// 持久化
return new Queue("fanout.java.queue2");
}
/**
* 交换机与队列绑定
*/
@Bean
public Binding bindingFanoutQueue1(FanoutExchange fanoutExchange, @Qualifier("fanoutQueue1") Queue fanoutQueue1) {
// 将fanoutExchange交换机与fanoutQueue1队列绑定
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}
/**
* 交换机与队列绑定
*/
@Bean
public Binding bindingFanoutQueue2(FanoutExchange fanoutExchange, @Qualifier("fanoutQueue2") Queue fanoutQueue2) {
// 将fanoutExchange交换机与fanoutQueue2队列绑定
return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
}
}
第二种声明交换机的方式
@Queue
@Exchange
@QueueBinding
@Component
@Slf4j
public class DirectListener {
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue(value = "direct.queue1", durable = "true"),
exchange = @Exchange(value = "hmall.direct.java", type = "direct"),
key = {"blue", "red"}
)
})
public void listenDirectQueue1(String message) {
// 绿色
// log.info("\u001B[32m消费者1,收到[work.queue]消息: {}\u001B[0m", message);
// systemOut绿色
System.out.println("\u001B[32m消费者1,收到[Direct.queue1]消息: " + message + "\u001B[0m");
}
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue(value = "direct.queue2", durable = "true"),
exchange = @Exchange(value = "hmall.direct.java", type = "direct"),
key = {"red", "yellow"}
)
})
public void listenDirectQueue2(String message) {
// 蓝色
// systemOut蓝色
System.out.println("\u001B[34m消费者2,收到[Direct.queue2]消息: " + message + "\u001B[0m");
}
}
消息转换器
建议采用JSON序列化替代默认的JDK序列化
在publisher和consumer中都配置MeesageConverter
@Bean
public MessageConverter jacksonMessageConverter() {
// 使用Jackson消息转换器
return new Jackson2JsonMessageConverter();
}
连接失败时重试机制(生产者)
rabbitmq:
# 连接失败时的重试配置
connection-timeout: 1s # 连接超时时间1秒
template:
retry:
enabled: true # 启用重试
initial-interval: 1000ms # 初始间隔时间
multiplier: 1 # 失败后下次等待时长倍数,下次等待时长 = initial-interval * multiplier
max-attempts: 3 # 最大重试次数
当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率,不过SpringAMQP提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的,会影响业务性能。
如果对业务性能有要求,建议禁用重试机制,如果一定要使用,请合理配置等待时长和重试机制次数,当然也可以考虑使用异步线程来执行发送消息代码。
生产者确认机制
RabbitMQ提供了Publisher Confirm 和 Publisher Return两种确认机制,开启确认机制后,在MQ成功收到消息后会返回确认消息给生产者。返回的结果如以下几种情况。:
消息投递到了MQ,但是路由失败,此时会通过PublisherReturn返回路由异常原因,返回ACK,告知投递成功。
临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功。
持久消息投递到了MQ,并且入队完成持久化,返回ACK,告知投递成功。
其它情况都会返回NACK,告知投递失败。
配置
rabbitmq:
template:
mandatory: true # 如果消息无法投递到交换机,RabbitMQ会将消息返回给生产者
# 生产者确认机制
# none 关闭confirm机制
# simple 开启confirm机制,同步阻塞等待MQ的回执消息
# correlated MQ异步回调方式返回回执消息
publisher-confirm-type: correlated # 开启publisher confirm机制 并设置 confirm类型为correlated
publisher-returns: true # 开启publisher returns机制
交换机ACK确认机制
@Test
void testPublisherConfirm2() throws InterruptedException {
// 交换机
String exchangeName = "hmall.direct";
// routingKey
String routingKey = "yellow";
// 消息
String message = "Hello!";
// 创建CorrelationData对象
CorrelationData cd = new CorrelationData();
// 给Future添加ConfirmCollback回调
cd.getFuture().thenAccept(confirm -> {
if (confirm.isAck()) {
log.info("消息发送成功,消息ID: {}", cd.getId());
} else {
log.error("消息发送失败,消息ID: {}", cd.getId());
}
}).exceptionally(ex -> {
log.error("消息发送异常,消息ID: {}", cd.getId(), ex);
return null;
});
// 发送消息
rabbitTemplate.convertAndSend(exchangeName, routingKey, message, cd);
// 等待回调执行完毕再结束测试
Thread.sleep(5000);
}
@Test
void testPublisherConfirm() throws InterruptedException {
// 交换机
String exchangeName = "hmall.direct";
// routingKey
String routingKey = "yellow";
// 消息
String message = "Hello!";
// 创建CorrelationData对象
CorrelationData cd = new CorrelationData();
// 给Future添加ConfirmCollback回调
cd.getFuture().whenComplete((confirm, ex) -> {
if (ex != null) {
log.error("消息发送异常,消息ID: {}", cd.getId(), ex);
} else if (confirm.isAck()) {
log.info("消息发送成功,消息ID: {}", cd.getId());
} else {
log.error("消息发送失败,消息ID: {}", cd.getId());
}
});
// 发送消息
rabbitTemplate.convertAndSend(exchangeName, routingKey, message, cd);
// 等待回调执行完毕再结束测试
Thread.sleep(5000);
}
ReturnsCback(用于处理 Exchange 到 Queue 路由失败的情况)
@Configuration
@Slf4j
public class RabbitConfig {
@Bean
public RabbitTemplate rabbitTemplate(CachingConnectionFactory cf) {
// 强制消息未路由时退回
cf.setPublisherReturns(true);
RabbitTemplate template = new RabbitTemplate(cf);
template.setMandatory(true);
// ReturnsCallback 回调:处理路由失败的消息
template.setReturnsCallback(returned -> {
// 获取退回的原始消息体
String body = new String(returned.getMessage().getBody(), StandardCharsets.UTF_8);
// 打印日志:包含交换机、路由键、回复码/原因和消息内容
log.error("[消息退回] exchg={}, routingKey={}, code={}, text={}, body={}",
returned.getExchange(), // 交换机名称
returned.getRoutingKey(), // 路由键
returned.getReplyCode(), // Broker 返回码
returned.getReplyText(), // Broker 返回原因
body // 消息内容
);
});
return template;
}
}
@Configuration
@Slf4j
public class RabbitConfig implements ApplicationContextAware {
@Override
public void setApplicationContext(@NonNull ApplicationContext applicationContext) throws BeansException {
// 获取RabbitTemplate Bean
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
// 设置消息发送确认回调
rabbitTemplate.setReturnsCallback(returnedMessage -> {
// 取出原始消息体并转成字符串
String body = new String(returnedMessage.getMessage().getBody(), StandardCharsets.UTF_8);
// 记录日志:交换机、路由键、返回码、原因、消息内容
log.error("[消息退回] exchg={}, routingKey={}, code={}, text={}, body={}",
returnedMessage.getExchange(), // 交换机名称
returnedMessage.getRoutingKey(), // 路由键
returnedMessage.getReplyCode(), // Broker 返回码
returnedMessage.getReplyText(), // Broker 返回原因
body); // 原始消息内容
});
}
}
@Configuration
@Slf4j
public class MyRabbitCustomizer implements RabbitTemplateCustomizer {
@Override
public void customize(RabbitTemplate rabbitTemplate) {
// 设置消息发送确认回调
rabbitTemplate.setReturnsCallback(returnedMessage -> {
// 取出原始消息体并转成字符串
String body = new String(returnedMessage.getMessage().getBody(), StandardCharsets.UTF_8);
// 记录日志:交换机、路由键、返回码、原因、消息内容
log.error("[消息退回] exchg={}, routingKey={}, code={}, text={}, body={}",
returnedMessage.getExchange(), // 交换机名称
returnedMessage.getRoutingKey(), // 路由键
returnedMessage.getReplyCode(), // Broker 返回码
returnedMessage.getReplyText(), // Broker 返回原因
body); // 原始消息内容
});
}
}
如何处理生产者的确认消息?
生产者确认需要额外的网络和系统资源开销,尽量不要使用
如果一定要使用,无须开启Publicsher机制,因为一般路由失败是自己业务问题
对于nack消息可以有限次数重试,依然失败则记录异常消息
消息持久化
@Test
void testSendMessage() {
// 并行发送1百万条消息,分组交给每个线程处理
int messageCount = 1000000;
int threadCount = 6; // 线程数,可根据实际情况调整
int batchSize = messageCount / threadCount; // 每个线程处理的消息数量
CompletableFuture<?>[] futures = new CompletableFuture[threadCount];
for (int t = 0; t < threadCount; t++) {
final int start = t * batchSize; // 每个线程处理的起始索引
final int end = (t == threadCount - 1) ? messageCount : (t + 1) * batchSize; // 计算结束索引,最后一个线程处理剩余的消息
futures[t] = CompletableFuture.runAsync(() -> {
String queueName = "simple.queue";
// MessageDeliveryMode.NON_PERSISTENT 非持久化的
// MessageDeliveryMode.PERSISTENT 持久化的
Message msg = MessageBuilder.withBody(queueName.getBytes(StandardCharsets.UTF_8))
.setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();
for (int i = start; i < end; i++) {
String message = "Hello, Spring amqp! Message " + i;
rabbitTemplate.convertAndSend(queueName, msg);
}
});
}
// 等待所有异步任务完成
CompletableFuture.allOf(futures).join();
log.info("所有异步任务已完成");
}
LazyQueue(3.12、3.13版本后默认开启,无须手动配置和发送消息时配置)
// 针对3.12版本之前的配置
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue(value = "direct.queue1", durable = "true",
arguments = @Argument(name = "x-queue-mode", value = "lazy")),
exchange = @Exchange(value = "hmall.direct.java", type = "direct"),
key = {"blue", "red"}
)
})
public void listenDirectQueue1(String message) {
// 绿色
// log.info("\u001B[32m消费者1,收到[work.queue]消息: {}\u001B[0m", message);
// systemOut绿色
System.out.println("\u001B[32m消费者1,收到[Direct.queue1]消息: " + message + "\u001B[0m");
}
@Bean
public Queue lazyQueue() {
return QueueBuilder
.durable("lazy.queue")
.lazy() // 等同于 x-queue-mode=lazy
.build();
}
RabbitMQ如何保证消息的可靠性?
首先通过配置可以让交换机、队列、以及发送消息都持久化,这样队列中的消息会持久化到磁盘,MQ重启消息依然存在。
RabbitMQ在3.6版本引入了LazyQueue,并且在3.12版本后称为队列的默认模式,LazyQueue会将所有消息都持久化。
开启持久化和生产者确认模式,RabbitMQ只有在消息持久化完成后才会给生产者返回ACK回执。
消费者确认机制
SpringAMQP已经实现了消息确认功能。并允许我们通过配置文件选择ACK处理方式,有三种方式
none:不处理,即消息投递给消费者后立刻ack,消息会立即从MQ删除,非常不安全,不建议使用
manual:手动模式。需要自己在业务中调用api,发送ack或reject,存在业务入侵,但是灵活
auto:自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack
当业务出现异常时,根据异常判断返回不同的结果
如果是业务异常,自动返回nack
如果消息处理或校验异常,自动返回reject
配置
rabbitmq:
listener:
simple:
# none 表示不使用事务
# manual 表示手动确认
# auto 表示自动确认
acknowledge-mode: auto
本地重试机制
rabbitmq:
listener:
simple:
retry:
# 消费者失败重试配置
enabled: true # 开启消费者失败重试
initial-interval: 1000ms # 初始间隔时间为1秒
multiplier: 1 # 失败后下次等待时长倍数,下次等待时长 = initial-interval * multiplier
max-attempts: 3 # 最大重试次数
stateless: true # true为无状态;false有状态。如果业务中包含事务,这里改为false
失败消息处理策略
在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MeesageRecoverer接口来处理。它包含三种不同的实现:
RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机
消费者如何保证消息一定被消费?
开启消费者确认机制为auto,由Spring确认消息处理成功后返回ack,异常时返回nack
开启消费者失败重试机制,并设置MessageRecoverer,多次重试失败后将消息投递到异常交换机,交由人工处理
重试达到次数后到指定交换机
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@ConditionalOnProperty(prefix = "spring.rabbitmq.listener.simple.retry",name = "enabled", havingValue = "true") // 仅在启用重试时加载此配置
public class ErrorConfiguration {
@Bean
public DirectExchange errorExchange() {
// 创建一个名为 "error.direct.java" 的 DirectExchange
return new DirectExchange("error.direct.java", true, false, null);
}
@Bean
public Queue errorQueue() {
// 创建一个名为 "error.queue" 的队列
return new Queue("error.queue", true, false, false, null);
}
@Bean
public Binding errorBinding(@Qualifier("errorExchange") DirectExchange errorExchange, @Qualifier("errorQueue") Queue errorQueue) {
// 将队列绑定到 DirectExchange 上,并指定路由键为 "error"
return BindingBuilder.bind(errorQueue).to(errorExchange).with("error");
}
@Bean
public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {
// 创建一个自定义的 MessageRecoverer 实现
return new RepublishMessageRecoverer(rabbitTemplate,"error.direct.java", "error");
}
}
业务幂等性
唯一消息ID
方案一、是给每个消息都设置一个唯一ID,利用ID区分是否重复消息:
每一条消息都生成一个唯一的ID,与消息一起投递给消费者。
消费者接收到消息后处理自己的业务,业务处理成功后将消息ID保存至数据库。
如果下次又收到同样的消息,去数据库查询判断是否存在,存在则为重复消息放弃处理。
// 开启
@Bean
public MessageConverter jacksonMessageConverter() {
// 使用Jackson消息转换器
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
// 设置消息ID
jackson2JsonMessageConverter.setCreateMessageIds(true);
return jackson2JsonMessageConverter;
}
死信交换机
当一个队列中的消息满足下列情况之一时,就回称为死信(dead letter):
消费者使用basic.reject或basic.nack声音消息失败,并且消息的requeue参数设置为false
消息是一个过期消息(达到了队列或者消息本身设置的过期时间)超时无人消费
要投递的队列消息堆积满了,最早的消息可能成为死信
如果队列通过dead-letter-exchange属性指定了一个交换机,那么该队列中的死信就会投递到这个交换机中,这个交换机称为死信交换机(Dead Letter Exchange,简称DLX)
发送带expiration的消息
@Test
void testSendTTLMessage() throws InterruptedException {
// 消息
Map<String, Object> message = new HashMap<>();
message.put("name", "uluckyxh");
// 交换机
String exchange = "simple.direct";
// binding key
String routingKey = "hi";
// 过期时间
String expiration = "10000"; // 10秒
// 使用 Jackson2JsonMessageConverter 自动将 POJO 转为 JSON 并发送
// 通过 MessagePostProcessor 在发送前设置消息属性(如 TTL、持久化)
rabbitTemplate.convertAndSend(
exchange,
routingKey,
message,
msg -> {
// 5.1 设置消息过期时间
msg.getMessageProperties().setExpiration(expiration);
// 5.2 设置消息持久化
msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return msg;
}
);
log.info("发送消息到交换机[{}],路由键[{}],内容: {}", exchange, routingKey, message);
Thread.sleep(5000);
}
监听死信交换机
@RabbitListener(queues = "dlx.queue")
public void listenDlxQueue(Map<String,Object> message) {
// 打印接收到的消息
log.info("消费者,收到[dlx.queue]消息: {}", message);
// throw new RuntimeException("模拟异常,触发死信队列");
}
延迟消息插件
Docker安装插件
DockerFile
# 选择带管理界面的官方镜像,版本需与插件一致 # ← 修改版本时记得同时换插件
FROM rabbitmq:3-management-alpine
# 将宿主机 plugins 目录下的 .ez 插件 COPY 到镜像内插件目录 # ← 只 COPY 你自己的插件
COPY plugins/rabbitmq_delayed_message_exchange-3.13.0.ez /opt/rabbitmq/plugins/
# 离线启用延迟消息插件(管理 UI 已自带,无需再启) # ← --offline 表示不启动节点也能启用
RUN rabbitmq-plugins enable --offline rabbitmq_delayed_message_exchange
# 暴露端口(可选,Compose 里也会映射) # ← 5672 AMQP;15672 Web UI
EXPOSE 5672 15672
docker-compose.yml
version: "3.9" # Compose 文件版本
services:
rabbitmq: # 服务名称
build: . # 使用当前目录的 Dockerfile 构建镜像
container_name: rabbitmq # 指定容器名称
hostname: rabbitmq-host # 容器内主机名
ports:
- "5672:5672" # 宿主机 5672 -> 容器 5672 (AMQP)
- "15672:15672" # 宿主机 15672 -> 容器 15672 (Web UI)
volumes:
- ./rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf:ro # 自定义配置文件(只读)
- ./data:/var/lib/rabbitmq/mnesia # 持久化数据目录
- ./logs:/var/log/rabbitmq # 日志目录
environment:
RABBITMQ_ERLANG_COOKIE: "secret_cookie" # Erlang 集群通信用 Cookie(保持一致即可)
restart: always # 容器异常退出自动重启
healthcheck: # 健康检查设置
test: ["CMD", "rabbitmqctl", "status"] # 通过 rabbitmqctl 检查节点状态
interval: 30s # 每 30 s 执行一次
timeout: 30s # 单次检查超时 30 s
start_period: 30s # 容器启动后 30 s 再开始检查
retries: 3 # 连续失败 3 次视为不健康
一次启动多个插件的案例
# 以与插件版本对应的 RabbitMQ 镜像为基础
FROM rabbitmq:3.13.0-management-alpine # ← 换版本时记得同步插件
# 1️⃣ 复制宿主机 plugins 目录下所有 .ez 文件到镜像
# 这样后续只要把新插件放进 plugins/ 就不用改 Dockerfile 了
COPY plugins/*.ez /opt/rabbitmq/plugins/
# 2️⃣ 在同一个 RUN 层里一次性启用所有插件
# 写成空格分隔即可,避免多层镜像 & 减少构建时间
RUN rabbitmq-plugins enable --offline \
rabbitmq_delayed_message_exchange \
rabbitmq_shovel rabbitmq_shovel_management \
rabbitmq_peer_discovery_k8s \
rabbitmq_federation rabbitmq_federation_management
# 如需暴露端口,可留在这里,或交给 docker-compose.yml 来映射
EXPOSE 5672 15672
启动命令
# 构建镜像并启动服务(首次会下载基础镜像,稍等片刻)
docker compose up -d --build
# 查看插件是否启用成功
docker exec -it rabbitmq rabbitmq-plugins list -E | grep delayed
# 期望输出: [E*] rabbitmq_delayed_message_exchange 3.13.0
声明交换机和队列
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "delay.queue", durable = "true"),
exchange = @Exchange(name = "delay.exchange", delayed = "true"),
key = "delay"
))
public void listenDelayQueue(Map<String, Object> message) {
// 打印接收到的消息
log.info("消费者,收到[delay.queue]消息: {}", message);
}
发送delay消息
@Test
void testSendDelayMessage() throws InterruptedException {
// 消息
String message = "Hello, delay message!";
// 交换机
String exchange = "delay.exchange";
// binding key
String routingKey = "delay";
// 使用 Jackson2JsonMessageConverter 自动将 POJO 转为 JSON 并发送
// 通过 MessagePostProcessor 在发送前设置消息属性(如 TTL、持久化)
rabbitTemplate.convertAndSend(
exchange,
routingKey,
message,
msg -> {
// 设置消息类型
msg.getMessageProperties().getHeaders().put("__TypeId__", "java.lang.String");
// 设置消息过期时间为 10 秒
msg.getMessageProperties().setDelayLong(10000L);
// 设置消息持久化
msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return msg;
}
);
log.info("发送延迟消息到交换机[{}],路由键[{}],内容: {}", exchange, routingKey, message);
Thread.sleep(5000);
}
RabbitMQ延迟消息插件(x-delayed-message)的弊端
核心问题:mandatory机制完全失效
什么是mandatory?
mandatory=true
:如果消息无法路由到任何队列,Broker会立即返回basic.return
给生产者mandatory=false
:Broker不检查路由结果,不会有任何返回
延迟插件的致命缺陷
问题根源: x-delayed-message插件把路由动作推迟到未来执行,但mandatory检查是立即进行的!
具体表现
// 延迟消息 + mandatory=true 的问题演示
rabbitTemplate.setMandatory(true);
rabbitTemplate.convertAndSend("delay.exchange", "routing.key", "消息", message -> {
message.getMessageProperties().setDelay(5000); // 5秒延迟
return message;
});
// 结果:
// 1. 立刻收到 ReturnsCallback,replyCode=312 (NO_ROUTE)
// 2. 5秒后消息正常投递给消费者
// 3. 开发者困惑:明明成功了为什么说路由失败?
官方明确说明的限制
Closely related to the above, the mandatory flag is not supported by this exchange
原因:
延迟投递时无法保证队列仍然存在
延迟投递时原始发布连接可能已断开,无法接收
basic.return
解决方案对比
方案1:分离Template(复杂)
// 普通消息Template
@Bean("normalTemplate")
public RabbitTemplate normalTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMandatory(true); // 可以用mandatory
return template;
}
// 延迟消息Template
@Bean("delayTemplate")
public RabbitTemplate delayTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMandatory(false); // 必须关闭mandatory
return template;
}
方案2:TTL+死信队列(推荐)
// 只需要一个Template,功能完整
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMandatory(true); // mandatory正常工作
return template;
}
// 通过TTL过期 + 死信队列实现延迟
其他弊端
1. 插件依赖
需要手动安装
rabbitmq_delayed_message_exchange
插件插件版本需要与RabbitMQ版本匹配
增加运维复杂度
2. 性能考虑
延迟消息存储在内存中,大量延迟消息会占用内存
重启后延迟消息可能丢失(取决于配置)
3. 监控盲区
mandatory失效导致无法通过常规手段检测路由失败
需要额外的监控机制
TTL+死信队列的性能弊端
CPU压力问题
核心问题: TTL检查机制会对CPU造成持续压力
场景举例:
- 业务高并发:每秒1000个订单
- 延迟时间长:每个订单30分钟后检查支付状态
- 结果:30分钟内累积1,800,000个延迟消息在队列中!
什么情况下会产生大量延迟消息?
延迟时间设置过长
// 危险示例:2小时后执行 message.getMessageProperties().setExpiration("7200000");
业务并发量大
// 电商场景:每分钟数千个订单,每个都需要30分钟后检查支付 // 30分钟内累积的延迟消息 = 1000 × 30 = 30,000条
多种延迟任务混合
// 同时有: // - 15分钟后的支付检查 // - 1小时后的库存释放 // - 24小时后的订单清理
CPU压力的根本原因
TTL检查机制: RabbitMQ需要不断检查每条消息是否过期
队列中的消息越多 → TTL检查频率越高 → CPU占用越大
具体表现:
消息堆积时,CPU使用率明显上升
队列越长,响应延迟越大
极端情况下可能影响整个RabbitMQ性能
性能对比分析
优化建议
1. 限制延迟时间范围
// 建议延迟时间不超过10分钟
if (delayTime > 600000) { // 10分钟
throw new IllegalArgumentException("延迟时间过长,请使用定时任务替代");
}
2. 分层处理策略
// 短延迟:使用TTL+死信队列(< 10分钟)
if (delayTime <= 600000) {
sendToTTLQueue(message, delayTime);
}
// 长延迟:使用定时任务(> 10分钟)
else {
scheduleTask(message, delayTime);
}
3. 监控队列长度
// 监控延迟队列消息堆积情况
@Component
public class DelayQueueMonitor {
@Scheduled(fixedRate = 60000) // 每分钟检查
public void monitorQueueLength() {
int queueLength = getQueueMessageCount("delay.queue");
if (queueLength > 10000) {
log.warn("延迟队列消息堆积过多:{}", queueLength);
}
}
}
最终方案选择
短延迟场景(< 10分钟)
推荐:TTL+死信队列
✅ mandatory机制正常工作
✅ 原生RabbitMQ功能
✅ CPU压力可接受
长延迟场景(> 10分钟)
推荐:定时任务框架
// 使用Quartz、XXL-JOB等
@Scheduled(fixedRate = 60000)
public void checkDelayedTasks() {
// 从数据库查询到期任务执行
}
绝对避免的方案
x-delayed-message插件
❌ mandatory机制失效
❌ 需要复杂的Template分离
❌ 增加插件依赖
❌ 容易产生误解和调试困难
记忆口诀
延迟插件mandatory必坑,TTL死信短时长!
长延迟用定时任务,MQ别扛大压力!
实践建议总结
10分钟内延迟 → TTL+死信队列
10分钟以上延迟 → 定时任务 + 数据库
绝不使用 → x-delayed-message插件
时刻监控 → 队列长度和CPU使用率