一.基础kafka部署

zookeeper,kafka,kafka-ui

docker-compose.yml

注意点:192.168.1.20 是宿主机的ip

version: "3"services:zookeeper:image: wurstmeister/zookeepercontainer_name: zookeeperrestart: alwaysports:- 2181:2181environment:ZOOKEEPER_CLIENT_PORT: 2181kafka:image: wurstmeister/kafkarestart: alwayscontainer_name: kafkadepends_on:- zookeeperports:- 9092:9092environment:KAFKA_BROKER_ID: 0KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.1.20:9092KAFKA_ADVERTISED_PORT: 9092KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092KAFKA_PORT: 9092 KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "false"KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0KAFKA_HEAP_OPTS: "-Xmx512M -Xmx512M"kafka-ui:image: provectuslabs/kafka-ui:latestcontainer_name: kafka-uirestart: alwaysports:- 10010:8080environment:- DYNAMIC_CONFIG_ENABLED=true- SERVER_SERVLET_CONTEXT_PATH=/ui-kafka- KAFKA_CLUSTERS_0_NAME=local- KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:9092- KAFKA_CLUSTERS_0_PROPERTIES_SECURITY_PROTOCOL=PLAINTEXTdepends_on:- zookeeper- kafka

kafka-ui

地址:http://localhost:10010/ui-kafka/

java生产者

org.springframework.kafkaspring-kafka2.9.0

import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerConfig;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class KafkaProducerTest {public static void main(String[] args) {Properties properties = new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");//KEY: 是kafka用于做消息投递计算具体投递到对应的主题的哪一个partition而需要的properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());//VALUE: 实际发送消息的内容properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());//2.创建kafka生产者对象 传递properties属性参数集合KafkaProducer producer = new KafkaProducer(properties);for (int i = 0; i < 100; i++) {ProducerRecord record = new ProducerRecord("topic_test", i + " : testx123测试");//4.发送消息producer.send(record);System.out.println("发送成功: " + i);}//5.关闭生产者producer.close();}}

java消费者

public class KafkaConsumerTest {public static void main(String[] args) {Properties props = new Properties();// bootstrap.servers:kafka服务器地址,多个用逗号隔开props.put("bootstrap.servers", "127.0.0.1:9092");props.put("group.id", "topic-test-group"); // 消费组groupIdprops.put("auto.offset.reset", "earliest");// 序列化方式props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer consumer = new KafkaConsumer(props);consumer.subscribe(Collections.singletonList("topic_test")); // 订阅的topicwhile (true) {ConsumerRecords records = consumer.poll(Duration.ofMillis(1000L));for (ConsumerRecord record : records) {System.out.printf("主题 = %s, 分区 = %d, 位移 = %d, " + "消息键 = %s, 消息值 = %s\n",record.topic(), record.partition(), record.offset(), record.key(), record.value());}if (!records.isEmpty()) {try {// 提交消费位移consumer.commitSync();} catch (CommitFailedException exception) {System.out.println("commit failed....");}}}}}

二.加密部署