kafka-顺序消息实现

场景

在购物付款的时候,订单会有不同的订单状态,对应不同的状态事件,比如:待支付,支付成功,支付失败等等,我们会将这些消息推送给消息队列 ,后续的服务会根据订单状态进行不同的业务处理,这就要求订单状态推送就要有状态的保证

解决方案

  • 生产者将相同的key的订单状态事件推送到kafka的同一分区
  • kafka 消费者接收消息
  • 消费者将消息提交给线程池
  • 线程池根据接收到的消息,将订单状态事件使用路由策略选择其中一个线程,将具有相同路由key的事件发送到同一个线程的阻塞队列中
  • 单个线程不停的从阻塞队列获取订单状态消息消费

代码实现

引入依赖
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.2.2</version><relativePath/> </parent><groupId>com.example</groupId><artifactId>boot-kafka</artifactId><version>0.0.1-SNAPSHOT</version><name>boot-kafka</name><description>boot-kafka</description><properties><java.version>17</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka-test</artifactId><scope>test</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>2.0.39</version></dependency></dependencies>
使用到的DTO
@Datapublic class InterOrderDto extends OrderDto implements OrderMessage{/** * 属于哪个分区 */private String partition;@Overridepublic String getUniqueNo() {return getOrderNo();}}@Datapublic class InterOrderDto extends OrderDto implements OrderMessage{/** * 属于哪个分区 */private String partition;@Overridepublic String getUniqueNo() {return getOrderNo();}}public interface OrderMessage {/** * 线程池路由key * @return */String getUniqueNo();}
定义topic

这里是 3个分区,2个副本

@Configurationpublic class KafkaConfiguration {@Beanpublic NewTopic topic(){return new NewTopic(Constants.TOPIC_ORDER,3,(short) 2);}}public interface Constants { String TOPIC_ORDER = "order";}
消费者

消费者:OrderListener

@Component@Slf4jpublic class OrderListener {@Autowiredprivate OrderThreadPool<OrderWorker, InterOrderDto> orderThreadPool;@KafkaListener(topics = Constants.TOPIC_ORDER, groupId = "orderGroup", concurrency = "3")public void logListener(ConsumerRecord<String, String> record) {log.debug("> receive log event: {}-{}", record.partition(), record.value());try {OrderDto orderDto = JSON.parseObject(record.value(), OrderDto.class);InterOrderDto interOrderDto = new InterOrderDto();BeanUtils.copyProperties(orderDto, interOrderDto);interOrderDto.setPartition(record.partition() + "");orderThreadPool.dispatch(interOrderDto);} catch (Exception e) {log.error("# kafka log listener error: {}", record.value(), e);}}}

线程池: OrderThreadPool

/** * @Date: 2024/1/24 10:23 * 线程池实现 * * @param W: worker * @param D: message */@Slf4jpublic class OrderThreadPool<W extends SingleThreadWorker<D>, D extends OrderMessage> {private List<W> workers;private int size;public OrderThreadPool(int size, Supplier<W> provider) {this.size = size;workers = new ArrayList<>(size);for (int i = 0; i < size; i++) {workers.add(provider.get());}if (CollectionUtils.isEmpty(workers)) {throw new RuntimeException("worker size is 0");}start();}/** * route message to single thread * * @param data */public void dispatch(D data) {W w = getUniqueQueue(data.getUniqueNo());w.offer(data);}private W getUniqueQueue(String uniqueNo) {int queueNo = uniqueNo.hashCode() % size;for (W worker : workers) {if (queueNo == worker.getQueueNo()) {return worker;}}throw new RuntimeException("worker 路由失败");}/** * start worker, only start once */private void start() {for (W worker : workers) {new Thread(worker, "OWorder-" + worker.getQueueNo()).start();}}/** * 关闭所有 workder, 等待所有任务执行完 */public void shutdown() {for (W worker : workers) {worker.shutdown();}}}

工作线程:SingleThreadWorker, 内部使用阻塞队列使其串行化

/** * @Date: 2024/1/24 10:58 * single thread with a blocking-queue */@Slf4jpublic abstract class SingleThreadWorker<T> implements Runnable {private static AtomicInteger cnt = new AtomicInteger(0);private BlockingQueue<T> queue;private boolean started = true;/** * worker 唯一id */@Getterprivate int queueNo;public SingleThreadWorker(int size) {this.queue = new LinkedBlockingQueue<>(size);this.queueNo = cnt.getAndIncrement();log.info("init worker {}", this.queueNo);}/** * 提交消息 * * @param data */public void offer(T data) {try {queue.put(data);} catch (InterruptedException e) {log.info("{} offer error: {}", Thread.currentThread().getName(), JSON.toJSONString(data), e);}}@Overridepublic void run() {log.info("{} worker start take ", Thread.currentThread().getName());while (started) {try {T data = queue.take();doConsumer(data);} catch (InterruptedException e) {log.error("queue take error", e);}}}/** * do real consume message * * @param data */protected abstract void doConsumer(T data);/** * consume rest of message in the queue when thread-pool shutdown */public void shutdown() {this.started = false;ArrayList<T> rest = new ArrayList<>();int i = queue.drainTo(rest);if (i > 0) {log.info("{} has rest in queue {}", Thread.currentThread().getName(), i);for (T t : rest) {doConsumer(t);}}}}

工作线程实现:OrderWorker, 这里就单独处理订单事件

/** * @Date: 2024/1/24 13:42 * 具体消费者 */@Slf4jpublic class OrderWorker extends SingleThreadWorker<InterOrderDto>{public OrderWorker(int size) {super(size);}@Overrideprotected void doConsumer(InterOrderDto data) {log.info("{} consume msg: {}", Thread.currentThread().getName(), JSON.toJSONString(data));}}

生产者

生产者:OrderController, 模拟发送不同的事件类型的订单

@RestControllerpublic class OrderController {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@GetMapping("/send")public String send() throws InterruptedException {int size = 1000;for (int i = 0; i < size; i++) {OrderDto orderDto = new InterOrderDto();orderDto.setOrderNo(i + "");orderDto.setPayStatus(getStatus(0));orderDto.setTimestamp(System.currentTimeMillis());//相同的key发送到相同的分区kafkaTemplate.send(Constants.TOPIC_ORDER, orderDto.getOrderNo(), JSON.toJSONString(orderDto));TimeUnit.MILLISECONDS.sleep(10);orderDto.setPayStatus(getStatus(1));orderDto.setTimestamp(System.currentTimeMillis());kafkaTemplate.send(Constants.TOPIC_ORDER, orderDto.getOrderNo(), JSON.toJSONString(orderDto));TimeUnit.MILLISECONDS.sleep(10);orderDto.setPayStatus(getStatus(2));orderDto.setTimestamp(System.currentTimeMillis());kafkaTemplate.send(Constants.TOPIC_ORDER, orderDto.getOrderNo(), JSON.toJSONString(orderDto));}return "success";}private String getStatus(int status){return status == 0 " />"待支付" : status == 1 ? "已支付" : "支付失败";}}

application.properties 配置

# kafka地址spring.kafka.bootstrap-servers=192.168.x.x:9092spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializerspring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
启动类
@Slf4j@SpringBootApplicationpublic class BootKafkaApplication {public static void main(String[] args) {SpringApplication.run(BootKafkaApplication.class, args);}/** * 配置线程池 * @return */@Beanpublic OrderThreadPool<OrderWorker, InterOrderDto> orderThreadPool(){OrderThreadPool<OrderWorker, InterOrderDto> threadPool =new OrderThreadPool<>(3, () -> new OrderWorker(100));Runtime.getRuntime().addShutdownHook(new Thread(() -> {log.info("shutdown orderThreadPool");//容器关闭时让工作线程中的任务都被消费完threadPool.shutdown();}));return threadPool;}}

测试

访问: http://localhost:8080/send, 结果:

OWorder-0 worker start take OWorder-0 consume msg: {"orderNo":"0","partition":"2","payStatus":"待支付","timestamp":1706084482134,"uniqueNo":"0"}OWorder-0 consume msg: {"orderNo":"0","partition":"2","payStatus":"已支付","timestamp":1706084482271,"uniqueNo":"0"}OWorder-0 consume msg: {"orderNo":"0","partition":"2","payStatus":"支付失败","timestamp":1706084482282,"uniqueNo":"0"}OWorder-0 consume msg: {"orderNo":"3","partition":"2","payStatus":"待支付","timestamp":1706084482326,"uniqueNo":"3"}OWorder-0 consume msg: {"orderNo":"3","partition":"2","payStatus":"已支付","timestamp":1706084482336,"uniqueNo":"3"}OWorder-0 consume msg: {"orderNo":"3","partition":"2","payStatus":"支付失败","timestamp":1706084482347,"uniqueNo":"3"}OWorder-0 consume msg: {"orderNo":"6","partition":"1","payStatus":"待支付","timestamp":1706084482391,"uniqueNo":"6"}OWorder-0 consume msg: {"orderNo":"6","partition":"1","payStatus":"已支付","timestamp":1706084482401,"uniqueNo":"6"}OWorder-0 consume msg: {"orderNo":"6","partition":"1","payStatus":"支付失败","timestamp":1706084482412,"uniqueNo":"6"}

可以发现,在我们工作线程中,事件消费是有序的

good luck!