简单案例
发送消息
@Test
void testSimplateQueue() {
// 队列名称
String queueName = "simple.queue";
// 消息
String message = "Hello, Spring amqp!";
// 发送消息
rabbitTemplate.convertAndSend(queueName, message);
}
消费消息
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String message) {
log.info("收到消息: {}", message);
}
Work Queues
多个消费者绑定到一个队列,可以加快消息的处理速度
同一条消息只会被一个消费者处理
通过设置prefetch来控制消费者预取的消息数量,处理完一条再处理下一条,失效能者多劳
rabbitmq:
listener:
simple:
prefetch: 1 # 每次只消费一个消息
交换机
接收publisher发送的消息
将消息按照规则路由到与之绑定的队列
Fanout交换机
FanoutExchange的会将消息路由到每个绑定的队列
发送消息给交换机
@Test
void testSendFanout() {
// 交换机
String exchangeName = "hmall.fanout";
// 消息
String message = "Hello, ereryone";
// 发送消息
rabbitTemplate.convertAndSend(exchangeName, "",message);
}
监听消息
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String message) {
// 绿色
// log.info("\u001B[32m消费者1,收到[work.queue]消息: {}\u001B[0m", message);
// systemOut绿色
System.out.println("\u001B[32m消费者1,收到[fanout.queue1]消息: " + message + "\u001B[0m");
}
@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String message) {
// 蓝色
// systemOut蓝色
System.out.println("\u001B[34m消费者2,收到[fanout.queue2]消息: " + message + "\u001B[0m");
}
Direct交换机
Direct Exchange 会将接收到的消息根据规则路由到指定的Queue,因此称为定向路由
每一个Queue都与Exchange设置一个BindingKey
发布者发送消息时,指定消息的RoutingKey
Exchange将消息路由到BindingKey与消息RoutingKey一致的队列
描述下Driect交换机与Fanout交换机的差异?
Fanout交换机将消息路由给每一个与之绑定的队列
Driect交换机通过RoutingKey判断路由给哪个队列
如果多个队列具有相同的RoutingKey,则与Fanout功能类似
Topic交换机
TopicExchange与DirectExchange类似,区别在于routingKey可以是多个单词列表,并且以
"."
点分割。Queue与Exchange指定BindingKey时可以使用通配符:
#:代表0个或多个单词
*:代指一个单词
描述下Direct交换机与Topic交换机的差异?
Topic交换机收到的消息RoutingKey可以是多个单词,以
"."
分割Topic交换机与队列绑定时的bindingKey可以指定通配符
#:代表0个或多个单词
*:代表1个词
第一种声明交换机的方式
Queue
FanoutExchange、DirectExchange、TopicExchange
Binding
@Configuration
public class FanoutConfiguration {
/**
* 定义一个Fanout类型的交换机
*
* @return FanoutExchange
*/
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("hmall.fanout.java");
}
/**
* 定义一个队列
*
* @return Queue
*/
@Bean
public Queue fanoutQueue1() {
// 持久化
return new Queue("fanout.java.queue1");
}
/**
* 定义一个队列
*
* @return Queue
*/
@Bean
public Queue fanoutQueue2() {
// 持久化
return new Queue("fanout.java.queue2");
}
/**
* 交换机与队列绑定
*/
@Bean
public Binding bindingFanoutQueue1(FanoutExchange fanoutExchange, @Qualifier("fanoutQueue1") Queue fanoutQueue1) {
// 将fanoutExchange交换机与fanoutQueue1队列绑定
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}
/**
* 交换机与队列绑定
*/
@Bean
public Binding bindingFanoutQueue2(FanoutExchange fanoutExchange, @Qualifier("fanoutQueue2") Queue fanoutQueue2) {
// 将fanoutExchange交换机与fanoutQueue2队列绑定
return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
}
}
第二种声明交换机的方式
@Queue
@Exchange
@QueueBinding
@Component
@Slf4j
public class DirectListener {
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue(value = "direct.queue1", durable = "true"),
exchange = @Exchange(value = "hmall.direct.java", type = "direct"),
key = {"blue", "red"}
)
})
public void listenDirectQueue1(String message) {
// 绿色
// log.info("\u001B[32m消费者1,收到[work.queue]消息: {}\u001B[0m", message);
// systemOut绿色
System.out.println("\u001B[32m消费者1,收到[Direct.queue1]消息: " + message + "\u001B[0m");
}
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue(value = "direct.queue2", durable = "true"),
exchange = @Exchange(value = "hmall.direct.java", type = "direct"),
key = {"red", "yellow"}
)
})
public void listenDirectQueue2(String message) {
// 蓝色
// systemOut蓝色
System.out.println("\u001B[34m消费者2,收到[Direct.queue2]消息: " + message + "\u001B[0m");
}
}