SpringBoot-Learning系列之Kafka整合

本系列是一个独立的SpringBoot学习系列,本着 What Why How 的思想去整合Java开发领域各种组件。

  • 消息系统

    • 主要应用场景
      • 流量消峰(秒杀 抢购)、应用解耦(核心业务与非核心业务之间的解耦)
      • 异步处理、顺序处理
      • 实时数据传输管道
      • 异构语言架构系统之间的通信
        • 如 C语言的CS客户端的HIS系统与java语言开发的互联网在线诊疗系统的交互
  • Kafka是什么

    kafka是一个消息队列产品,基于Topic partitions的设计,能达到非常高的消息发送处理性能。是java领域常用的消息队列。

    核心概念:

    • 生产者(Producer) 生产者应用向主题队列中投送消息数据
    • 消费者 (Consumer) 消费者应用从订阅的Kafka的主题队列中获取数据、处理数据等后续操作
    • 主题 (Topic) 可以理解为生产者与消费者交互的桥梁
    • 分区 (Partition) 默认一个主题有一个分区,用户可以设置多个分区。每个分区可以有多个副本(Replica)。分区的作用是,将数据划分为多个小块,提高并发性和可扩展性。每个分区都有一个唯一的标识符,称为分区号。消息按照键(key)来进行分区,相同键的消息会被分配到同一个分区中。分区可以有不同的消费者同时消费。副本的作用是提供数据的冗余和故障恢复。每个分区可以有多个副本,其中一个被称为领导者(leader),其他副本被称为追随者(follower)。领导者负责处理读写请求,而追随者只负责复制领导者的数据。如果领导者宕机或不可用,某个追随者会被选举为新的领导者,保证数据的可用性。
  • windows 安装kafka

    本地环境DockerDeskTop+WSL2,基于Docker方式安装Kafka

    2.8.0后不需要依赖zk了

    • 拉取镜像

      docker pull wurstmeister/zookeeperdocker pull wurstmeister/kafka
    • 创建网络

      docker network create kafka-net --driver bridge
    • 安装zk

      docker run --net=kafka-net --name zookeeper -p 21810:2181 -d wurstmeister/zookeeper
    • 安装kafka

      docker run -d --name kafka --publish 9092:9092 \--link zookeeper \--env KAFKA_ZOOKEEPER_CONNECT=172.31.192.1:2181 \--env KAFKA_ADVERTISED_HOST_NAME=172.31.192.1 \--env KAFKA_ADVERTISED_PORT=9092  \--volume /etc/localtime:/etc/localtime \wurstmeister/kafka:latest
    • 测试

      telnet localhost:9092
  • SpringBoot集成

    SpringBoot3.1.0+jdk17

    • pom依赖

      ```4.0.0org.springframework.bootspring-boot-starter-parent3.1.0 io.github.vino42springboot-kafka1.0-SNAPSHOT171717UTF-8org.projectlomboklomboktrueorg.springframework.bootspring-boot-starter-testtestorg.springframework.bootspring-boot-starter-loggingorg.springframework.bootspring-boot-starter-weborg.springframework.bootspring-boot-starter-loggingorg.springframework.bootspring-boot-starter-log4j2org.springframework.kafkaspring-kafkaorg.apache.kafkakafka-clientsorg.apache.kafkakafka-clients3.5.1com.google.code.gsongson2.10.1cn.hutoolhutool-all5.8.21org.springframework.bootspring-boot-maven-plugin3.1.0```
    • 配置

      spring:  kafka:    bootstrap-servers: 172.31.192.1:9092    producer:      retries: 0      # 每次批量发送消息的数量      batch-size: 16384      buffer-memory: 33554432      # 指定消息key和消息体的编解码方式      key-serializer: org.apache.kafka.common.serialization.StringSerializer      value-serializer: org.apache.kafka.common.serialization.StringSerializer    listener:      missing-topics-fatal: false#      MANUALpoll()拉取一批消息,处理完业务后,手动调用Acknowledgment.acknowledge()先将offset存放到map本地缓存,在下一次poll之前从缓存拿出来批量提交      #      MANUAL_IMMEDIATE每处理完业务手动调用Acknowledgment.acknowledge()后立即提交      #      RECORD当每一条记录被消费者监听器(ListenerConsumer)处理之后提交      #      BATCH当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交      #      TIME当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交      #      COUNT当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交      #      COUNT_TIMETIME或COUNT满足其中一个时提交      ack-mode: manual_immediate    consumer:      group-id: test      # 是否自动提交      enable-auto-commit: false      max-poll-records: 100      #      用于指定消费者在启动时、重置消费偏移量时的行为。      #      earliest:消费者会将消费偏移量重置为最早的可用偏移量,也就是从最早的消息开始消费。      #      latest:消费者会将消费偏移量重置为最新的可用偏移量,也就是只消费最新发送的消息。      #      none:如果找不到已保存的消费偏移量,消费者会抛出一个异常      auto-offset-reset: earliest      auto-commit-interval: 100      # 指定消息key和消息体的编解码方式      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer      properties:        max.poll.interval.ms: 3600000server:  port: 8888spring:  kafka:    bootstrap-servers: 172.31.192.1:9092    producer:      retries: 0      # 每次批量发送消息的数量      batch-size: 16384      buffer-memory: 33554432      # 指定消息key和消息体的编解码方式      key-serializer: org.apache.kafka.common.serialization.StringSerializer      value-serializer: org.apache.kafka.common.serialization.StringSerializer    listener:      missing-topics-fatal: false      ack-mode: manual_immediate    consumer:      group-id: test      enable-auto-commit: false      max-poll-records: 100      auto-offset-reset: earliest      auto-commit-interval: 100      # 指定消息key和消息体的编解码方式      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer      properties:        max.poll.interval.ms: 3600000
    • 生产者代码示例

      package io.github.vino42.publiser;import com.google.gson.Gson;import com.google.gson.GsonBuilder;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.stereotype.Component;/** * ===================================================================================== * * @Created :   2023/8/30 21:29 * @Compiler :  jdk 17 * @Author :    VINO * @Copyright : VINO * @Decription : kafak 消息生产者 * ===================================================================================== */@Componentpublic class KafkaPublishService {    @Autowired    KafkaTemplate kafkaTemplate;    /**     * 这里为了简单 直接发送json字符串     *     * @param json     */    public void send(String topic, String json) {        kafkaTemplate.send(topic, json);    }}
          @RequestMapping("/send")    public String send() {        IntStream.range(0, 10000).forEach(d -> {            kafkaPublishService.send("test", RandomUtil.randomString(16));        });        return "ok";    }
    • 消费者

      @Component@Slf4jpublic class CustomKafkaListener {    @org.springframework.kafka.annotation.KafkaListener(topics = "test")    public void listenUser(ConsumerRecord record, Acknowledgment acknowledgment) {        try {            String key = String.valueOf(record.key());            String body = record.value();            log.info("\n=====\ntopic:test,key{},message:{}\n=====\n", key, body);            log.info("\n=====\ntopic:test,key{},payLoadJson:{}\n=====\n", key, body);        } catch (Exception e) {            e.printStackTrace();        } finally {            //手动ack            acknowledgment.acknowledge();        }    }}

SpringBoot Learning系列 是笔者总结整理的一个SpringBoot学习集合。可以说算是一个SpringBoot学习的大集合。欢迎Star关注。谢谢观看。


关注公众号不迷路