查看一下压缩策略

bin/kafka-topics.sh –describe –zookeeper xxxx:2181 –topic SHI_TOPIC1

Topic:SHI_TOPIC1 PartitionCount:1 ReplicationFactor:1 Configs:cleanup.policy=compact

Topic: SHI_TOPIC1 Partition: 0 Leader: 0 Replicas: 0 Isr: 0

Configs:cleanup.policy=compact :

然后再检查一下自己发送消息的时候是不是没有传 key

[参考链接](()

问题堆栈信息

org.springframework.kafka.listener.ListenerExecutionFailedException: invokeHandler Failed;

nested exception is java.lang.IllegalStateException: No Acknowledgment available as an argument,

the listener container must have a MANUAL AckMode to populate the Acknowledgment.;

nested exception is java.lang.IllegalStateException:

No Acknowledgment available as an argument, the listener container must have a MANUAL AckMode to populate the Acknowledgment.

问题原因

解决方案

问题堆栈信息

Failed to start bean ‘org.springframework.kafka.config.internalKafkaListenerEndpointRegistry’; nested exception is java.lang.IllegalStateException: Consumer cannot be configured for auto commit for ackMode MANUAL_IMMEDIATE

问题原因

不能再配置中既配置kafka.consumer.enable-auto-commit=true 自动提交; 然后又在监听器中使用手动提交

例如:

kafka.consumer.enable-auto-commit=true

@Autowired

private ConsumerFactory consumerFactory;

@Bean

public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer> kafkaManualAckListenerContainerFactory() {

ConcurrentKafkaListenerContainerFactory factory =

new ConcurrentKafkaListenerContainerFactory();

factory.setConsumerFactory(consumerFactory);

//设置提交偏移量的方式 当Acknowledgment.acknowledge()侦听器调用该方法时,立即提交偏移量

factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);

return factory;

}

/**

  • 手动ack 提交记录

  • @param data

  • @param ack

  • @throws InterruptedException

*/

@KafkaListener(id = “consumer-id2”,topics = “SHI_TOPIC1”,concurrency = “1”,

clientIdPrefix = “myClientId2”,containerFactory = “kafkaManualAckListenerContainerFactory”)

public void consumer2(String data, Acknowledgment ack) {

log.info(“consumer-id2-手动ack,提交记录,data:{}”,data);

ack.acknowledge();

}

解决方法:

将自动提交关掉,或者去掉手动提交;

如果你想他们都同时存在,某些情况自动提交;某些情况手动提交; 那你创建 一个新的

consumerFactory 将它的是否自动提交设置为false;比如

@Configuration

@EnableKafka

public class KafkaConfig {

@Autowired

private KafkaProperties properties;

/**

  • 创建一个新的消费者工厂

  • 创建多个工厂的时候 SpringBoot就不会自动帮忙创建工厂了;所以默认的还是自己创建一下

  • @return

*/

@Bean

public ConsumerFactory kafkaConsumerFactory() {

Map map = properties.buildConsumerProperties();

DefaultKafkaConsumerFactory factory = new DefaultKafkaConsumerFactory( map);

return factory;

}

/**

  • 创建一个新的消费者工厂

  • 但是修改为不自动提交

  • @return

*/

@Bean

public ConsumerFactory kafkaManualConsumerFactory() {

Map map = properties.buildConsumerProperties();

map.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);

DefaultKafkaConsumerFactory factory = new DefaultKafkaConsumerFactory( map);

return factory;

}

/**

  • 手动提交的监听器工厂 (使用的消费组工厂必须 kafka.consumer.enable-auto-commit = false)

  • @return

*/

@Bean

public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer> kafkaManualAckListenerContainerFactory() {

ConcurrentKafkaListenerContainerFactory factory =

new ConcurrentKafkaListenerContainerFactory();

factory.setConsumerFactory(kafkaManualConsumerFactory());

//设置 《一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义》无偿开源 威信搜索公众号【编程进阶路】 提交偏移量的方式 当Acknowledgment.acknowledge()侦听器调用该方法时,立即提交偏移量

factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);

return factory;

}

Copyright © maxssl.com 版权所有 浙ICP备2022011180号