第三章 Kafka集成 SpringBoot

SpringBoot 是一个在 JavaEE 开发中非常常用的组件。可以用于 Kafka 的生产者,也可以 用于 SpringBoot 的消费者。

在初始化springboot环境的时候要勾选kafka依赖

org.springframework.kafkaspring-kafkaorg.springframework.kafkaspring-kafka-testtest

3.1 SpringBoot 生产者

(1)修改 SpringBoot 核心配置文件 application.propeties, 添加生产者相关信息

server:port: 8080spring:kafka:bootstrap-servers: hadoop102:9092,hadoop103:9092producer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializergroup-id: test

(2)创建 controller 从浏览器接收数据, 并写入指定的 topic

package cn.jxust.springbootkafka.Controller;/* * *@author pengjx * * */import org.springframework.beans.factory.annotation.Autowired;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;@RestControllerpublic class ProducerController {@Autowiredprivate KafkaTemplate stringKafkaTemplate;@RequestMapping("/atguigu")public String data(String msg){stringKafkaTemplate.send("first",msg);return "ok";}}

(3)在浏览器中给/atguigu 接口发送数据 http://localhost:8080/atguigu” />server:port: 8080spring:kafka:bootstrap-servers: hadoop102:9092,hadoop103:9092producer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializergroup-id: test

(2)创建类消费 Kafka 中指定 topic 的数据

package cn.jxust.springbootkafka.Consumer;/* * *@author pengjx * * */import org.springframework.context.annotation.Configuration;import org.springframework.kafka.annotation.KafkaListener;@Configurationpublic class KafkaConsumer {@KafkaListener(topics = "first")public void getData(String msg){System.out.println("消息是:"+msg);}}

3.3 工具类

KafkaProducer
package cn.jxust.springbootkafka.utils;/* * *@author pengjx * * */import cn.hutool.json.JSONUtil;import lombok.extern.slf4j.Slf4j;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.kafka.support.SendResult;import org.springframework.stereotype.Component;import org.springframework.util.concurrent.ListenableFuture;import org.springframework.util.concurrent.ListenableFutureCallback;@Component@Slf4jpublic class KafkaProducer {@Autowiredprivate KafkaTemplate kafkaTemplate;private static final String TOPIC="first";public void send(Object object){String jsonStr = JSONUtil.toJsonStr(object);log.info("准备发送消息为:{}", jsonStr);ListenableFuture<SendResult> future = kafkaTemplate.send(TOPIC, jsonStr);future.addCallback(new ListenableFutureCallback<SendResult>() {@Overridepublic void onFailure(Throwable ex) {//发送失败的处理log.info(TOPIC + " - 生产者 发送消息失败:" + ex.getMessage());}@Overridepublic void onSuccess(SendResult result) {//成功的处理log.info(TOPIC + " - 生产者 发送消息成功:" + result.toString());}});}}
KafkaConsumer
package cn.jxust.springbootkafka.utils;/* * *@author pengjx * * */import lombok.extern.slf4j.Slf4j;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.stereotype.Component;import java.util.Optional;@Component@Slf4jpublic class KafkaConsumer {@KafkaListener(topics = "first")public void topicTest(ConsumerRecord record){Optional message = Optional.ofNullable(record.value());if(message.isPresent()){Object object = message.get();log.info("topic.group1 消费了: Topic:" + "first" + ",Message:" + object);}}}