文章目录

    • 添加 RocketMQ 依赖
    • 消费者 Consumer
      • YAML 配置
      • 创建监听器
      • 消息过滤
        • Tag 过滤
    • 生产者 Producer
      • YAML 配置
      • 发送同步消息
      • 发送异步消息
      • 发送单向消息
      • 发送延迟消息
      • 发送顺序消息
      • 发送批量消息
      • 发送集合消息

添加 RocketMQ 依赖

  1. 在 Maven 仓库【https://mvnrepository.com/】中搜索 RocketMQ 依赖:

  2. 在 SpringBoot 项目的 Pom.xml 文件中添加对应 MQ 版本的依赖:

    <dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.2</version></dependency>

消费者 Consumer

YAML 配置

在 SpringBoot 项目的 yml 配置文件中添加以下配置:

rocketmq:name-server: 192.168.68.121:9876 # rocketMq的nameServer地址

创建监听器

创建一个 MQMsgListener 类用于监听 RocketMQ 的消息,类上标注注解:@Component@RocketMQMessageListener,该类需要实现 RocketMQListener 接口,并使用泛型指定接收的消息类型:

@Component@RocketMQMessageListener(topic = "delayTopic",consumerGroup="boot-mq-group-consumer")public class MQMsgListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt message) {String msgId = message.getMsgId();String msg = new String(message.getBody());System.out.println("消息id:"+msgId+"消息内容:"+msg);}}

@RocketMQMessageListener 注解参数如下:

参数描述
topic消费者订阅的主题
consumerGroup消费者组
consumeMode消费模式:并发接收消息 | 有序接收消息【ConsumeMode.CONCURRENTLY or ConsumeMode.ORDERLY
messageModel消息模式:集群模式 | 广播模式【MessageModel.CLUSTERING or MessageModel.BROADCASTING
selectorType过滤消息的方式:Tag | SQL92【SelectorType.TAG or SelectorType.SQL92
selectorExpression过滤消息的表达式:Tag | SQL92【`tag1
maxReconsumeTimes消息消费失败后,可被重复投递的最大次数。消息重试只针对集群消费模式生效。
delayLevelWhenNextConsume并发模式的消息重试策略。-1,无需重试,直接放入死信队列(%DLQ%+消费组)

消息过滤

Tag 过滤

消费者订阅的Tag和发送者设置的消息Tag相互匹配,则消息被投递给消费端进行消费。

编写并启动消费者项目订阅 tagTopic 主题:

@Component@RocketMQMessageListener(topic = "tagTopic",consumerGroup = "boot-mq-group-consumer",selectorType = SelectorType.TAG,selectorExpression = "java")public class MQMsgListener implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println(message);}}

编写生产者 Controller,使用 RocketMQTemplate 的 syncSend() 方法发送一个带 Tag 的同步消息:

@RestControllerpublic class ProducerController {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@GetMapping("/send/tag")public String sendSyncMessage() {SendResult result = rocketMQTemplate.syncSend("tagTopic:java", "这是一个带有 java tag 的消息");return "发送状态:" + result.getSendStatus() + "
消息id:"
+ result.getMsgId();}}

运行项目,访问接口:http://localhost:8080/send/tag

查看 RocketMQ 控制台,可以看到消息带有 java tag:

查看消费者项目的 IDEA 控制台:

生产者 Producer

YAML 配置

在 SpringBoot 项目的 yml 配置文件中添加以下配置:

rocketmq:name-server: 192.168.68.121:9876 # rocketMq的nameServer地址producer:group: boot-mq-group-producer # 生产者组名

注:生产者需要标注生产者组名,否则会报异常:'org.apache.rocketmq.spring.core.RocketMQTemplate' that could not be found.

发送同步消息

编写 Controller,使用 RocketMQTemplate 的 syncSend() 方法发送同步消息,并将消息发送的结果进行打印:

@RestControllerpublic class ProducerController {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@GetMapping("/send/sync/{msg}")public String sendSyncMessage(@PathVariable String msg){SendResult result = rocketMQTemplate.syncSend("syncTopic", msg);return "发送状态:"+result.getSendStatus()+"
消息id:"
+result.getMsgId();}}

运行项目,访问接口:http://localhost:8080/send/sync/同步消息

访问控制台,查看【syncTopic】主题,可以看到队列中存在一条消息:

发送异步消息

不同于同步消息,异步消息在发出后,并不会等待服务端返回响应,直接继续向下执行,发送方通过回调接口接收服务端响应,并处理响应结果。

编写 Controller,使用 RocketMQTemplate 的 asyncSend() 方法发送异步消息,并使用回调接口打印发送的结果:

@RestControllerpublic class ProducerController {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@GetMapping("/send/async/{msg}")public String sendAsyncMessage(@PathVariable String msg) {rocketMQTemplate.asyncSend("asyncTopic", msg, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.println("异步消息发送成功");}@Overridepublic void onException(Throwable throwable) {System.out.println("异步消息发送失败");}});System.out.println("异步消息已发送完成");return "发送异步消息";}}

运行项目,访问接口:http://localhost:8080/send/async/异步消息,查看 IDEA 控制台:

访问控制台,查看【asyncTopic】主题,可以看到队列中存在一条消息:

发送单向消息

编写 Controller,使用 RocketMQTemplate 的 sendOneWay() 方法发送单向消息:

@RestControllerpublic class ProducerController {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@GetMapping("/send/oneWay/{msg}")public String sendOneWayMessage(@PathVariable String msg) {rocketMQTemplate.sendOneWay("oneWayTopic",msg);return "单向消息发送成功";}}

运行项目,访问接口:http://localhost:8080/send/oneWay/单向消息

访问控制台,查看【oneWayTopic】主题,可以看到队列中存在一条消息:

发送延迟消息

编写并启动消费者项目订阅 delayTopic 主题:

@Component@RocketMQMessageListener(topic = "delayTopic",consumerGroup="boot-mq-group-consumer")public class MQMsgListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt message) {String msgId = message.getMsgId();String msg = new String(message.getBody());System.out.println("消息id:"+msgId+"\n消息内容:"+msg+"\n消息收到时间:"+new Date());}}

编写生产者 Controller,使用 RocketMQTemplate 的 syncSend() 方法发送同步消息:

@RestControllerpublic class ProducerController {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@GetMapping("/send/delay/{msg}")public String sendDelayMessage(@PathVariable String msg) {Message<String> message = MessageBuilder.withPayload(msg).build();// 延迟级别 "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"SendResult result = rocketMQTemplate.syncSend("delayTopic", message, 2000, 3);return "发送状态:" + result.getSendStatus() + "
消息id:"
+ result.getMsgId()+"
消息发送时间:"
+new Date();}}

运行项目,访问接口:http://localhost:8080/send/delay/延迟消息

查看消费者项目的 IDEA 控制台,可以看到过去了10s,对应我们设置的延迟级别。

发送顺序消息

编写订单类,用于模拟【下订单->发短信->物流->签收】的顺序流程:

public class Order {//订单号private String orderId;//订单名称private String orderName;//订单的流程顺序private String seq;}

编写并启动两个消费者项目订阅 orderlyTopic 主题,并将消费模式设置为顺序消费模式:

@Component@RocketMQMessageListener(topic = "orderlyTopic",consumerGroup="boot-mq-group-consumer",consumeMode = ConsumeMode.ORDERLY)public class MQMsgListener implements RocketMQListener<Order> {@Overridepublic void onMessage(Order message) {System.out.println("消费者:"+message);}}

编写生产者 Controller,使用 RocketMQTemplate 的 syncSendOrderly() 方法发送同步顺序消息:

@RestControllerpublic class ProducerController {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@GetMapping("/send/orderly")public String sendOrderlyMessage() {List<Order> orders = Arrays.asList(new Order(UUID.randomUUID().toString(), "下订单", "1"),new Order(UUID.randomUUID().toString(), "发短信", "1"),new Order(UUID.randomUUID().toString(), "物流", "1"),new Order(UUID.randomUUID().toString(), "签收", "1"),new Order(UUID.randomUUID().toString(), "下订单", "2"),new Order(UUID.randomUUID().toString(), "发短信", "2"),new Order(UUID.randomUUID().toString(), "物流", "2"),new Order(UUID.randomUUID().toString(), "签收", "2"));//控制流程:下订单->发短信->物流->签收//将 seq 作为 hashKey,这样 seq 相同的会放在同一个队列里面,顺序消费orders.forEach(order -> {rocketMQTemplate.syncSendOrderly("orderlyTopic",order,order.getSeq());});return "发送成功";}}

运行项目,访问接口:http:localhost:8080/send/orderly

查看 RocketMQ 控制台,可以看到我们的消息分别存储在两个队列中:

查看消费者项目的 IDEA 控制台,按照消息的顺序进行消费:

发送批量消息

编写并启动消费者项目订阅 batchOrderly 主题:

@Component@RocketMQMessageListener(topic = "batchOrderly",consumerGroup="boot-mq-group-consumer")public class MQMsgListener implements RocketMQListener<Order> {@Overridepublic void onMessage(Order message) {System.out.println(Thread.currentThread().getName()+":"+message);}}

编写生产者 Controller,将消息打包成 Collection msgs 传入 syncSend() 方法中发送:

@RestControllerpublic class ProducerController {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@GetMapping("/send/batch")public String sendOrderlyMessage() {List<Message> messages = Arrays.asList(MessageBuilder.withPayload(new Order(UUID.randomUUID().toString(), "下订单", "1")).build(),MessageBuilder.withPayload(new Order(UUID.randomUUID().toString(), "下订单", "1")).build(),MessageBuilder.withPayload(new Order(UUID.randomUUID().toString(), "下订单", "1")).build(),MessageBuilder.withPayload(new Order(UUID.randomUUID().toString(), "下订单", "1")).build());return rocketMQTemplate.syncSend("batchOrderly", messages).getSendStatus().toString();}}

运行项目,访问接口:http:localhost:8080/send/batch

查看 RocketMQ 控制台,可以看到队列中一次传入4条消息:

查看消费者项目的 IDEA 控制台,多个线程并发进行消费:

发送集合消息

编写并启动消费者项目订阅 listTopic 主题:

@Component@RocketMQMessageListener(topic = "listTopic",consumerGroup="boot-mq-group-consumer")public class MQMsgListener implements RocketMQListener<List<Order>> {@Overridepublic void onMessage(List<Order> orders) {orders.forEach(o -> {System.out.println(Thread.currentThread().getName()+":"+o);});}}

编写生产者 Controller,将集合传入 syncSend() 方法中发送:

@RestControllerpublic class ProducerController {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@GetMapping("/send/list")public String sendOrderlyMessage() {List<Order> orders = Arrays.asList(new Order(UUID.randomUUID().toString(), "下订单", "1"),new Order(UUID.randomUUID().toString(), "下订单", "1"),new Order(UUID.randomUUID().toString(), "下订单", "1"),new Order(UUID.randomUUID().toString(), "下订单", "1"));rocketMQTemplate.syncSend("listTopic",orders);return "发送成功";}}

运行项目,访问接口:http:localhost:8080/send/list

查看 RocketMQ 控制台,可以看到队列中一条消息:

查看消费者项目的 IDEA 控制台,进行消费: