简单案例

发送消息

@Test
void testSimplateQueue() {
    // 队列名称
    String queueName = "simple.queue";
    // 消息
    String message = "Hello, Spring amqp!";
    // 发送消息
    rabbitTemplate.convertAndSend(queueName, message);
}

消费消息

@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String message) {
    log.info("收到消息: {}", message);
}

Work Queues

  • 多个消费者绑定到一个队列,可以加快消息的处理速度

  • 同一条消息只会被一个消费者处理

  • 通过设置prefetch来控制消费者预取的消息数量,处理完一条再处理下一条,失效能者多劳

  rabbitmq:
    listener:
      simple:
        prefetch: 1 # 每次只消费一个消息

交换机

  • 接收publisher发送的消息

  • 将消息按照规则路由到与之绑定的队列

Fanout交换机

  • FanoutExchange的会将消息路由到每个绑定的队列

发送消息给交换机
    @Test
    void testSendFanout() {
        // 交换机
        String exchangeName = "hmall.fanout";
        // 消息
        String message = "Hello, ereryone";
        // 发送消息
        rabbitTemplate.convertAndSend(exchangeName, "",message);
    }
监听消息
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String message) {
    // 绿色
//        log.info("\u001B[32m消费者1,收到[work.queue]消息: {}\u001B[0m", message);
    // systemOut绿色
    System.out.println("\u001B[32m消费者1,收到[fanout.queue1]消息: " + message + "\u001B[0m");

}

@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String message) {
    // 蓝色
    // systemOut蓝色
    System.out.println("\u001B[34m消费者2,收到[fanout.queue2]消息: " + message + "\u001B[0m");

}

Direct交换机

Direct Exchange 会将接收到的消息根据规则路由到指定的Queue,因此称为定向路由

  • 每一个Queue都与Exchange设置一个BindingKey

  • 发布者发送消息时,指定消息的RoutingKey

  • Exchange将消息路由到BindingKey与消息RoutingKey一致的队列

描述下Driect交换机与Fanout交换机的差异?

  • Fanout交换机将消息路由给每一个与之绑定的队列

  • Driect交换机通过RoutingKey判断路由给哪个队列

  • 如果多个队列具有相同的RoutingKey,则与Fanout功能类似

Topic交换机

  • TopicExchange与DirectExchange类似,区别在于routingKey可以是多个单词列表,并且以"." 点分割。

  • Queue与Exchange指定BindingKey时可以使用通配符:

    • #:代表0个或多个单词

    • *:代指一个单词

描述下Direct交换机与Topic交换机的差异?

  • Topic交换机收到的消息RoutingKey可以是多个单词,以"." 分割

  • Topic交换机与队列绑定时的bindingKey可以指定通配符

  • #:代表0个或多个单词

  • *:代表1个词

第一种声明交换机的方式

  • Queue

  • FanoutExchange、DirectExchange、TopicExchange

  • Binding

@Configuration
public class FanoutConfiguration {

    /**
     * 定义一个Fanout类型的交换机
     *
     * @return FanoutExchange
     */
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("hmall.fanout.java");
    }

    /**
     * 定义一个队列
     *
     * @return Queue
     */
    @Bean
    public Queue fanoutQueue1() {
        // 持久化
        return new Queue("fanout.java.queue1");
    }

    /**
     * 定义一个队列
     *
     * @return Queue
     */
    @Bean
    public Queue fanoutQueue2() {
        // 持久化
        return new Queue("fanout.java.queue2");
    }

    /**
     * 交换机与队列绑定
     */
    @Bean
    public Binding bindingFanoutQueue1(FanoutExchange fanoutExchange, @Qualifier("fanoutQueue1") Queue fanoutQueue1) {
        // 将fanoutExchange交换机与fanoutQueue1队列绑定
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }

    /**
     * 交换机与队列绑定
     */
    @Bean
    public Binding bindingFanoutQueue2(FanoutExchange fanoutExchange, @Qualifier("fanoutQueue2") Queue fanoutQueue2) {
        // 将fanoutExchange交换机与fanoutQueue2队列绑定
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
    }

}

第二种声明交换机的方式

  • @Queue

  • @Exchange

  • @QueueBinding

@Component
@Slf4j
public class DirectListener {

    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue(value = "direct.queue1", durable = "true"),
                    exchange = @Exchange(value = "hmall.direct.java", type = "direct"),
                    key = {"blue", "red"}
            )
    })
    public void listenDirectQueue1(String message) {
        // 绿色
        //        log.info("\u001B[32m消费者1,收到[work.queue]消息: {}\u001B[0m", message);
        // systemOut绿色
        System.out.println("\u001B[32m消费者1,收到[Direct.queue1]消息: " + message + "\u001B[0m");

    }

    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue(value = "direct.queue2", durable = "true"),
                    exchange = @Exchange(value = "hmall.direct.java", type = "direct"),
                    key = {"red", "yellow"}
            )
    })
    public void listenDirectQueue2(String message) {
        // 蓝色
        // systemOut蓝色
        System.out.println("\u001B[34m消费者2,收到[Direct.queue2]消息: " + message + "\u001B[0m");

    }


}

消息转换器