代码地址: https://gitee.com/Aes_yt/middleware-demo/tree/master/rabbitmq

安装RabbitMq1. docker拉取镜像

docker pull rabbitmq:3.9.29-management

2. 创建rabbitmq容器

docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.9.29-management

3. 访问地址

http://{ip地址}:15672/,可以看到RabbitMq的管理后台界面。账号密码默认 guest

消息生产和消费rabbitmq-producer

  1. 新建module,引入依赖

    org.springframework.bootspring-boot-starter-amqp
  2. yaml配置地址

    spring:  rabbitmq:    host: 192.168.67.131    port: 5672    username: guest    password: guest    virtual-host: /
  3. 配置交换机

    @Configurationpublic class RabbitMqConfig {    /*定义交换机名称*/    public static final String USER_INFO_EXCHANGE_NAME = "user_info_exchange";    @Bean    public Exchange userInfoExchange() {        return ExchangeBuilder.topicExchange(USER_INFO_EXCHANGE_NAME).durable(true).build();    }}
  4. 发送消息

        @Test    public void sendMessage() {        String registerMsg = "user register..." + new Date();        // 1. 发送一条注册消息        rabbitTemplate.convertAndSend(RabbitMqConfig.USER_INFO_EXCHANGE_NAME,                "user.register.user1", registerMsg, msg -> {                    msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);                    return msg;                });        log.info("消息发送完成:{}", registerMsg);        String loginMsg = "user login..." + new Date();        // 2. 发送一条登录消息        rabbitTemplate.convertAndSend(RabbitMqConfig.USER_INFO_EXCHANGE_NAME,                "user.login.user1", loginMsg, msg -> {                    msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);                    return msg;                });        log.info("消息发送完成:{}", loginMsg);    }

rabbitmq-consumer

  1. 新建module,引入依赖

    org.springframework.bootspring-boot-starter-amqp
  2. yaml配置地址

    spring:  rabbitmq:    host: 192.168.67.131    port: 5672    username: guest    password: guest    virtual-host: /
  3. 配置队列绑定关系

    @Configurationpublic class RabbitMqConfig {    @Value("${spring.rabbitmq.host}")    private String host;    @Value("${spring.rabbitmq.port}")    private Integer port;    @Value("${spring.rabbitmq.username}")    private String username;    @Value("${spring.rabbitmq.password}")    private String password;    /*定义交换机名称*/    public static final String USER_INFO_EXCHANGE_NAME = "user_info_exchange";    /*定义队列名称*/    public static final String USER_REGISTER_QUEUE_NAME = "user_register_queue";    public static final String USER_LOGIN_QUEUE_NAME = "user_login_queue";    public static final String USER_REGISTER_ROUTING_KEY = "user.register.#";    public static final String USER_LOGIN_ROUTING_KEY = "user.login.#";    @Bean    public Exchange userInfoExchange() {        return ExchangeBuilder.topicExchange(USER_INFO_EXCHANGE_NAME).durable(true).build();    }    @Bean    public Queue userRegisterQueue() {        return QueueBuilder.durable(USER_REGISTER_QUEUE_NAME).build();    }    @Bean    public Queue userLoginQueue() {        return QueueBuilder.durable(USER_LOGIN_QUEUE_NAME).build();    }    @Bean    public Binding userRegisterBinding() {        return BindingBuilder.bind(userRegisterQueue()).to(userInfoExchange()).with(USER_REGISTER_ROUTING_KEY).noargs();    }    @Bean    public Binding userLoginBinding() {        return BindingBuilder.bind(userLoginQueue()).to(userInfoExchange()).with(USER_LOGIN_ROUTING_KEY).noargs();    }}
  4. 监听器

    @Component@Slf4jpublic class UserInfoListener {    @RabbitListener(queues = RabbitMqConfig.USER_REGISTER_QUEUE_NAME)    public void userRegister(String msg){        log.info(msg);    }    @RabbitListener(queues = RabbitMqConfig.USER_LOGIN_QUEUE_NAME)    public void userLogin(String msg){        log.info(msg);    }}
  5. 结果

    启动producer项目和consumer项目,producer发送消息,consumer接收到消息:

    producer:

    2023-07-08 10:26:10.660  INFO 7432 --- [main] com.yt.rabbit.RabbitProducerTest: 消息发送完成:user register...Sat Jul 08 10:26:10 CST 20232023-07-08 10:26:10.663  INFO 7432 --- [main] com.yt.rabbit.RabbitProducerTest: 消息发送完成:user login...Sat Jul 08 10:26:10 CST 2023

    consumer:

    2023-07-08 10:26:10.661  INFO 25108 --- [ntContainer#1-1] c.yt.rabbitmq.listener.UserInfoListener  : user register...Sat Jul 08 10:26:10 CST 20232023-07-08 10:26:10.665  INFO 25108 --- [ntContainer#0-1] c.yt.rabbitmq.listener.UserInfoListener  : user login...Sat Jul 08 10:26:10 CST 2023

交换器类型

交换器类型有四种,fanout,topic,direct,headers。
接下来在代码中创建三种交换器类型,对应的routingKey和queue绑定如表格所示。发送对应消息,看看是否能接收到 [Y/N]。headers类型不演示。

ExchangeExchangeTypeRoutingKeyMessageKeyQueueReceive
fanout_exchangefanoutfanout.test.key1
fanout.#
xxx.yyy.zzzfanout_test_queue1
fanout_test_queue2
Y
Y
topic_exchangetopictopic.test.#
topic.#
topic.*
topic.test.key1topic_test_queue1
topic_test_queue2
topic_test_queue3
Y
Y
N
direct_exchangedirectdirect.test.key1
direct.test.#
direct.test.key3
direct.test.key1
direct.test.key2
direct.test.key3
direct_test_queue1
direct_test_queue2
direct_test_queue3 && direct_test_queue4
Y
N
Y && Y

direct 的Routingkey是全匹配,通配符不起作用,所以direct_test_queue2没有接收到消息。
topic 的通配符,*正好匹配一个词,#可以匹配一个或多个词,所以topic_test_queue3没有接收到消息。