文章目录

  • 一、生产者源码
    • 1、初始化
    • 2、发送数据到缓冲区
      • 2.1 发送总体流程
      • 2.2 分区选择
      • 2.3 发送消息大小校验
      • 2.4 内存池
    • 3、sender 线程发送数据
  • 二、消费者源码
    • 1、初始化
    • 2、消费者订阅主题
    • 3、消费者拉取和处理数据
      • 3.1 消费总体流程
      • 3.2 消费者/消费者组初始化
      • 3.3 拉取数据
      • 3.4 消费者 Offset 提交
  • 三、服务端源码

kafka官网:https://kafka.apache.org/downloads

一、生产者源码

1、初始化

生产者 main 线程初始化,点击 main()方法中的 KafkaProducer()

KafkaProducer(ProducerConfig config,Serializer<K> keySerializer,Serializer<V> valueSerializer,ProducerMetadata metadata,KafkaClient kafkaClient,ProducerInterceptors<K, V> interceptors,Time time) {try {this.producerConfig = config;this.time = time;// 获取事务idString transactionalId = config.getString(ProducerConfig.TRANSACTIONAL_ID_CONFIG);// 获取客户端idthis.clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);......// 监控kafka运行情况JmxReporter jmxReporter = new JmxReporter();jmxReporter.configure(config.originals(Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)));reporters.add(jmxReporter);MetricsContext metricsContext = new KafkaMetricsContext(JMX_PREFIX,config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX));this.metrics = new Metrics(metricConfig, reporters, time, metricsContext);// 获取分区器this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG,Partitioner.class,Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);// key和value的序列化if (keySerializer == null) {this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, Serializer.class);this.keySerializer.configure(config.originals(Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)), true);}......// 拦截器处理(拦截器可以有多个)List<ProducerInterceptor<K, V>> interceptorList = (List) config.getConfiguredInstances(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,ProducerInterceptor.class,Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));......// 单条日志大小 默认1mthis.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);// 缓冲区大小 默认32mthis.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);// 压缩,默认是nonethis.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);int deliveryTimeoutMs = configureDeliveryTimeout(config, log);this.apiVersions = new ApiVersions();this.transactionManager = configureTransactionState(config, logContext);// 缓冲区对象 默认是32m// 批次大小 默认16k// 压缩方式,默认是none// liner.ms 默认是0//内存池this.accumulator = new RecordAccumulator(logContext,config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),this.compressionType,lingerMs(config),retryBackoffMs,deliveryTimeoutMs,metrics,PRODUCER_METRIC_GROUP_NAME,time,apiVersions,transactionManager,new BufferPool(this.totalMemorySize, config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), metrics, time, PRODUCER_METRIC_GROUP_NAME));// 连接上kafka集群地址List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG),config.getString(ProducerConfig.CLIENT_DNS_LOOKUP_CONFIG));// 获取元数据if (metadata != null) {this.metadata = metadata;} else {this.metadata = new ProducerMetadata(retryBackoffMs,config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG),config.getLong(ProducerConfig.METADATA_MAX_IDLE_CONFIG),logContext,clusterResourceListeners,Time.SYSTEM);this.metadata.bootstrap(addresses);}this.errors = this.metrics.sensor("errors");this.sender = newSender(logContext, kafkaClient, this.metadata);String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;// 把sender线程放到后台this.ioThread = new KafkaThread(ioThreadName, this.sender, true);// 启动sender线程this.ioThread.start();config.logUnused();AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics, time.milliseconds());log.debug("Kafka producer started");}......}

生产者 sender 线程初始化,KafkaProducer.java中点击 newSender()方法,查看发送线程初始化

Sender newSender(LogContext logContext, KafkaClient kafkaClient, ProducerMetadata metadata) {// 缓存请求的个数 默认是5个int maxInflightRequests = configureInflightRequests(producerConfig);// 请求超时时间,默认30sint requestTimeoutMs = producerConfig.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(producerConfig, time, logContext);ProducerMetrics metricsRegistry = new ProducerMetrics(this.metrics);Sensor throttleTimeSensor = Sender.throttleTimeSensor(metricsRegistry.senderMetrics);// 创建一个客户端对象// clientId客户端id// maxInflightRequests缓存请求的个数 默认是5个// RECONNECT_BACKOFF_MS_CONFIG 重试时间// RECONNECT_BACKOFF_MAX_MS_CONFIG 总的重试时间// 发送缓冲区大小send.buffer.bytes默认128kb// 接收数据缓存 receive.buffer.bytes 默认是32kbKafkaClient client = kafkaClient != null " />: new NetworkClient(new Selector(producerConfig.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),this.metrics, time, "producer", channelBuilder, logContext),metadata,clientId,maxInflightRequests,producerConfig.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),producerConfig.getLong(ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG),producerConfig.getInt(ProducerConfig.SEND_BUFFER_CONFIG),producerConfig.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),requestTimeoutMs,producerConfig.getLong(ProducerConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG),producerConfig.getLong(ProducerConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG),time,true,apiVersions,throttleTimeSensor,logContext);// 0 :生产者发送过来,不需要应答;1 :leader收到,应答;-1 :leader和isr队列里面所有的都收到了应答short acks = configureAcks(producerConfig, log);// 创建sender线程return new Sender(logContext,client,metadata,this.accumulator,maxInflightRequests == 1,producerConfig.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),acks,producerConfig.getInt(ProducerConfig.RETRIES_CONFIG),metricsRegistry.senderMetrics,time,requestTimeoutMs,producerConfig.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG),this.transactionManager,apiVersions);}

Sender 对象被放到了一个线程中启动,所有需要点击 newSender()方法中的 Sender,并找到 sender 对象中的 run()方法

@Overridepublic void run() {......while (!forceClose && ((this.accumulator.hasUndrained() || this.client.inFlightRequestCount() > 0) || hasPendingTransactionalRequests())) {try {// sender 线程从缓冲区准备拉取数据,刚启动拉不到数据runOnce();} catch (Exception e) {log.error("Uncaught error in kafka producer I/O thread: ", e);}}......}

2、发送数据到缓冲区

2.1 发送总体流程

从send()方法进入

public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {ProducerRecord<K, V> interceptRecord = record;for (ProducerInterceptor<K, V> interceptor : this.interceptors) {try {// 拦截器对数据进行加工interceptRecord = interceptor.onSend(interceptRecord);......return interceptRecord;}//从拦截器处理中返回,点击 doSend()方法private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {TopicPartition tp = null;try {throwIfProducerClosed();// first make sure the metadata for the topic is availablelong nowMs = time.milliseconds();ClusterAndWaitTime clusterAndWaitTime;try {// 获取元数据clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), nowMs, maxBlockTimeMs);} catch (KafkaException e) {if (metadata.isClosed())throw new KafkaException("Producer closed while send in progress", e);throw e;}nowMs += clusterAndWaitTime.waitedOnMetadataMs;long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);Cluster cluster = clusterAndWaitTime.cluster;// 序列化相关操作byte[] serializedKey;try {serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());......// 分区操作int partition = partition(record, serializedKey, serializedValue, cluster);tp = new TopicPartition(record.topic(), partition);setReadOnly(record.headers());Header[] headers = record.headers().toArray();int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),compressionType, serializedKey, serializedValue, headers);// 保证数据大小能够传输(序列化后的压缩后的)ensureValidRecordSize(serializedSize);......// accumulator缓存追加数据result是是否添加成功的结果RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs);......// 批次大小已经满了 获取有一个新批次创建if (result.batchIsFull || result.newBatchCreated) {log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);// 唤醒发送线程this.sender.wakeup();}return result.future;......}

2.2 分区选择

private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {Integer partition = record.partition();// 如果指定分区,按照指定分区配置return partition != null " />:partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);}//点击 partition,跳转到 Partitioner 接口,选择默认的分区器 DefaultPartitionerpublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster, int numPartitions) {// 没有指定keyif (keyBytes == null) {// 按照粘性分区处理return stickyPartitionCache.partition(topic, cluster);}// 如果指定key,按照key的hashcode值 对分区数求模// hash the keyBytes to choose a partitionreturn Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;}

2.3 发送消息大小校验

private void ensureValidRecordSize(int size) {// 单条信息最大值 maxRequestSize 1mif (size > maxRequestSize)throw new RecordTooLargeException("The message is " + size +" bytes when serialized which is larger than " + maxRequestSize + ", which is the value of the " +ProducerConfig.MAX_REQUEST_SIZE_CONFIG + " configuration.");// totalMemorySize缓存大小 默认32mif (size > totalMemorySize)throw new RecordTooLargeException("The message is " + size +" bytes when serialized which is larger than the total memory buffer you have configured with the " +ProducerConfig.BUFFER_MEMORY_CONFIG +" configuration.");}

2.4 内存池

public RecordAppendResult append(TopicPartition tp, long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long maxTimeToBlock, boolean abortOnNewBatch, long nowMs) throws InterruptedException {......try {// check if we have an in-progress batch// 获取或者创建一个队列(按照每个主题的分区)Deque<ProducerBatch> dq = getOrCreateDeque(tp);synchronized (dq) {if (closed)throw new KafkaException("Producer closed while send in progress");// 尝试向队列里面添加数据(正常添加不成功)RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs);if (appendResult != null)return appendResult;}// we don't have an in-progress record batch try to allocate a new batchif (abortOnNewBatch) {// Return a result that will cause another call to append.return new RecordAppendResult(null, false, false, true);}byte maxUsableMagic = apiVersions.maxUsableProduceMagic();// this.batchSize 默认16k数据大小17kint size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));log.trace("Allocating a new {} byte message buffer for topic {} partition {} with remaining timeout {}ms", size, tp.topic(), tp.partition(), maxTimeToBlock);// 申请内存内存池分配内存双端队列buffer = free.allocate(size, maxTimeToBlock);// Update the current time in case the buffer allocation blocked above.nowMs = time.milliseconds();synchronized (dq) {// Need to check if producer is closed again after grabbing the dequeue lock.if (closed)throw new KafkaException("Producer closed while send in progress");RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs);if (appendResult != null) {// Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...return appendResult;}// 封装内存bufferMemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);// 再次封装(得到真正的批次大小)ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, nowMs);FutureRecordMetadata future = Objects.requireNonNull(batch.tryAppend(timestamp, key, value, headers,callback, nowMs));// 向队列的末尾添加批次dq.addLast(batch);incomplete.add(batch);// Don't deallocate this buffer in the finally block as it's being used in the record batchbuffer = null;return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true, false);}} finally {if (buffer != null)free.deallocate(buffer);appendsInProgress.decrementAndGet();}}

3、sender 线程发送数据

void runOnce() {// 事务相关操作if (transactionManager != null) {try {transactionManager.maybeResolveSequences();// do not continue sending if the transaction manager is in a failed stateif (transactionManager.hasFatalError()) {RuntimeException lastError = transactionManager.lastError();if (lastError != null)maybeAbortBatches(lastError);client.poll(retryBackoffMs, time.milliseconds());return;}// Check whether we need a new producerId. If so, we will enqueue an InitProducerId// request which will be sent belowtransactionManager.bumpIdempotentEpochAndResetIdIfNeeded();if (maybeSendAndPollTransactionalRequest()) {return;}} catch (AuthenticationException e) {// This is already logged as error, but propagated here to perform any clean ups.log.trace("Authentication exception while processing transactional request", e);transactionManager.authenticationFailed(e);}}long currentTimeMs = time.milliseconds();// 发送数据long pollTimeout = sendProducerData(currentTimeMs);// 获取发送结果client.poll(pollTimeout, currentTimeMs);}private long sendProducerData(long now) {// 获取元数据Cluster cluster = metadata.fetch();// get the list of partitions with data ready to send// 1、判断32m缓存是否准备好RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);// 如果 Leader 信息不知道,是不能发送数据的// if there are any partitions whose leaders are not known yet, force metadata updateif (!result.unknownLeaderTopics.isEmpty()) {// The set of topics with unknown leader contains topics with leader election pending as well as// topics which may have expired. Add the topic again to metadata to ensure it is included// and request metadata update, since there are messages to send to the topic.for (String topic : result.unknownLeaderTopics)this.metadata.add(topic, now);log.debug("Requesting metadata update due to unknown leader topics from the batched records: {}",result.unknownLeaderTopics);this.metadata.requestUpdate();}......// create produce requests// 发送每个节点数据,进行封装,这样一个分区的就可以打包一起发送Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);addToInflightBatches(batches);......// 发送请求sendProduceRequests(batches, now);return pollTimeout;}// 是否准备发送public ReadyCheckResult ready(Cluster cluster, long nowMs) {Set<Node> readyNodes = new HashSet<>();long nextReadyCheckDelayMs = Long.MAX_VALUE;Set<String> unknownLeaderTopics = new HashSet<>();boolean exhausted = this.free.queued() > 0;for (Map.Entry<TopicPartition, Deque<ProducerBatch>> entry : this.batches.entrySet()) {Deque<ProducerBatch> deque = entry.getValue();synchronized (deque) {// When producing to a large number of partitions, this path is hot and deques are often empty.// We check whether a batch exists first to avoid the more expensive checks whenever possible.ProducerBatch batch = deque.peekFirst();if (batch != null) {TopicPartition part = entry.getKey();Node leader = cluster.leaderFor(part);if (leader == null) {// This is a partition for which leader is not known, but messages are available to send.// Note that entries are currently not removed from batches when deque is empty.unknownLeaderTopics.add(part.topic());} else if (!readyNodes.contains(leader) && !isMuted(part)) {long waitedTimeMs = batch.waitedTimeMs(nowMs);// 如果不是第一次拉取,且等待时间小于重试时间 默认100ms ,backingOff=trueboolean backingOff = batch.attempts() > 0 && waitedTimeMs < retryBackoffMs;// 如果backingOff是true 取retryBackoffMs; 如果不是第一次拉取取lingerMs,默认0long timeToWaitMs = backingOff " />: lingerMs;// 批次大小满足发送条件boolean full = deque.size() > 1 || batch.isFull();// 如果超时,也要发送boolean expired = waitedTimeMs >= timeToWaitMs;boolean transactionCompleting = transactionManager != null && transactionManager.isCompleting();// full linger.msboolean sendable = full|| expired|| exhausted|| closed|| flushInProgress()|| transactionCompleting;if (sendable && !backingOff) {readyNodes.add(leader);} else {long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);// Note that this results in a conservative estimate since an un-sendable partition may have// a leader that will later be found to have sendable data. However, this is good enough// since we'll just wake up and then sleep again for the remaining time.nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);}}}}}return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics);}

二、消费者源码

1、初始化

点击 main()方法中的 KafkaConsumer ()

KafkaConsumer(ConsumerConfig config, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {try {// 消费组平衡GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig(config,GroupRebalanceConfig.ProtocolType.CONSUMER);// 获取消费者组idthis.groupId = Optional.ofNullable(groupRebalanceConfig.groupId);// 客户端idthis.clientId = config.getString(CommonClientConfigs.CLIENT_ID_CONFIG);......// 客户端请求服务端等待时间request.timeout.ms 默认是30sthis.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);this.defaultApiTimeoutMs = config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);this.time = Time.SYSTEM;this.metrics = buildMetrics(config, time, clientId);// 重试时间 100this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);// 拦截器List<ConsumerInterceptor<K, V>> interceptorList = (List) config.getConfiguredInstances(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,ConsumerInterceptor.class,Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId));this.interceptors = new ConsumerInterceptors<>(interceptorList);// key和value 的反序列化if (keyDeserializer == null) {this.keyDeserializer = config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Deserializer.class);this.keyDeserializer.configure(config.originals(Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId)), true);} else {config.ignore(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);this.keyDeserializer = keyDeserializer;}if (valueDeserializer == null) {this.valueDeserializer = config.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Deserializer.class);this.valueDeserializer.configure(config.originals(Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId)), false);} else {config.ignore(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);this.valueDeserializer = valueDeserializer;}// offset从什么位置开始消费 默认,latestOffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.valueOf(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase(Locale.ROOT));this.subscriptions = new SubscriptionState(logContext, offsetResetStrategy);ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keyDeserializer,valueDeserializer, metrics.reporters(), interceptorList);// 元数据// retryBackoffMs 重试时间// 是否允许访问系统主题 exclude.internal.topics默认是true,表示不允许// 是否允许自动创建topicallow.auto.create.topics 默认是truethis.metadata = new ConsumerMetadata(retryBackoffMs,config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG),!config.getBoolean(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG),config.getBoolean(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG),subscriptions, logContext, clusterResourceListeners);// 连接kafka集群List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG), config.getString(ConsumerConfig.CLIENT_DNS_LOOKUP_CONFIG));this.metadata.bootstrap(addresses);String metricGrpPrefix = "consumer";FetcherMetricsRegistry metricsRegistry = new FetcherMetricsRegistry(Collections.singleton(CLIENT_ID_METRIC_TAG), metricGrpPrefix);ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config, time, logContext);this.isolationLevel = IsolationLevel.valueOf(config.getString(ConsumerConfig.ISOLATION_LEVEL_CONFIG).toUpperCase(Locale.ROOT));Sensor throttleTimeSensor = Fetcher.throttleTimeSensor(metrics, metricsRegistry);int heartbeatIntervalMs = config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG);ApiVersions apiVersions = new ApiVersions();// 创建客户端对象// 连接重试时间 默认50ms// 最大连接重试时间 默认1s// 发送缓存 默认128kb// 接收缓存默认64kb// 客户端请求服务端等待时间request.timeout.ms 默认是30sNetworkClient netClient = new NetworkClient(new Selector(config.getLong(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, channelBuilder, logContext),this.metadata,clientId,100, // a fixed large enough value will suffice for max in-flight requestsconfig.getLong(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG),config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG),config.getInt(ConsumerConfig.SEND_BUFFER_CONFIG),config.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG),config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG),config.getLong(ConsumerConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG),config.getLong(ConsumerConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG),time,true,apiVersions,throttleTimeSensor,logContext);// 消费者客户端// 客户端请求服务端等待时间request.timeout.ms 默认是30sthis.client = new ConsumerNetworkClient(logContext,netClient,metadata,time,retryBackoffMs,config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG),heartbeatIntervalMs); //Will avoid blocking an extended period of time to prevent heartbeat thread starvation// 消费者分区分配策略this.assignors = ConsumerPartitionAssignor.getAssignorInstances(config.getList(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG),config.originals(Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId)));// no coordinator will be constructed for the default (null) group id//为消费者组准备的// auto.commit.interval.ms自动提交offset时间 默认5sthis.coordinator = !groupId.isPresent() " />null :new ConsumerCoordinator(groupRebalanceConfig,logContext,this.client,assignors,this.metadata,this.subscriptions,metrics,metricGrpPrefix,this.time,enableAutoCommit,config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG),this.interceptors,config.getBoolean(ConsumerConfig.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED));// 配置抓数据的参数// fetch.min.bytes 默认最少一次抓取1个字节// fetch.max.bytes 默认最多一次抓取50m// fetch.max.wait.ms 抓取等待最大时间 500ms// max.partition.fetch.bytes 默认是1m// max.poll.records默认一次处理500条this.fetcher = new Fetcher<>(logContext,this.client,config.getInt(ConsumerConfig.FETCH_MIN_BYTES_CONFIG),config.getInt(ConsumerConfig.FETCH_MAX_BYTES_CONFIG),config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG),config.getInt(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG),config.getInt(ConsumerConfig.MAX_POLL_RECORDS_CONFIG),config.getBoolean(ConsumerConfig.CHECK_CRCS_CONFIG),config.getString(ConsumerConfig.CLIENT_RACK_CONFIG),this.keyDeserializer,this.valueDeserializer,this.metadata,this.subscriptions,metrics,metricsRegistry,this.time,this.retryBackoffMs,this.requestTimeoutMs,isolationLevel,apiVersions);this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics, metricGrpPrefix);config.logUnused();AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics, time.milliseconds());log.debug("Kafka consumer initialized");} catch (Throwable t) {// call close methods if internal objects are already constructed; this is to prevent resource leak. see KAFKA-2121// we do not need to call `close` at all when `log` is null, which means no internal objects were initialized.if (this.log != null) {close(0, true);}// now propagate the exceptionthrow new KafkaException("Failed to construct kafka consumer", t);}}

2、消费者订阅主题

点击自己编写的 CustomConsumer.java 中的 subscribe ()方法

public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {acquireAndEnsureOpen();try {maybeThrowInvalidGroupIdException();// 要订阅的主题如果为null ,直接抛异常if (topics == null)throw new IllegalArgumentException("Topic collection to subscribe to cannot be null");// 要订阅的主题如果为空if (topics.isEmpty()) {// treat subscribing to empty topic list as the same as unsubscribingthis.unsubscribe();} else {// 正常的处理操作for (String topic : topics) {// 如果为空抛异常if (Utils.isBlank(topic))throw new IllegalArgumentException("Topic collection to subscribe to cannot contain null or empty topic");}throwIfNoAssignorsConfigured();fetcher.clearBufferedDataForUnassignedTopics(topics);log.info("Subscribed to topic(s): {}", Utils.join(topics, ", "));//订阅主题(判断你是否需要更新订阅的主题;主题了一个监听器listener)if (this.subscriptions.subscribe(new HashSet<>(topics), listener))// 更新订阅信息metadata.requestUpdateForNewTopics();}} finally {release();}}public synchronized boolean subscribe(Set<String> topics, ConsumerRebalanceListener listener) {// 注册负载均衡监听器registerRebalanceListener(listener);// 按照主题自动订阅模式setSubscriptionType(SubscriptionType.AUTO_TOPICS);// 判断是否需要更改订阅的主题return changeSubscription(topics);}private boolean changeSubscription(Set<String> topicsToSubscribe) {// 如果传入的topics 和以前订阅的主题一致,那就不需要更改对应订阅的主题if (subscription.equals(topicsToSubscribe))return false;subscription = topicsToSubscribe;return true;}

3、消费者拉取和处理数据

3.1 消费总体流程

点击自己编写的 CustomConsumer.java 中的 poll ()方法

private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout) {acquireAndEnsureOpen();try {this.kafkaConsumerMetrics.recordPollStart(timer.currentTimeMs());if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) {throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");}do {client.maybeTriggerWakeup();if (includeMetadataInTimeout) {// 1、消费者或者消费者组的初始化// try to update assignment metadata BUT do not need to block on the timer for join groupupdateAssignmentMetadataIfNeeded(timer, false);} else {while (!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE), true)) {log.warn("Still waiting for metadata");}}// 2 抓取数据final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(timer);if (!records.isEmpty()) {// before returning the fetched records, we can send off the next round of fetches// and avoid block waiting for their responses to enable pipelining while the user// is handling the fetched records.//// NOTE: since the consumed position has already been updated, we must not allow// wakeups or any other errors to be triggered prior to returning the fetched records.if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) {client.transmitSends();}// 3 拦截器处理数据return this.interceptors.onConsume(new ConsumerRecords<>(records));}} while (timer.notExpired());return ConsumerRecords.empty();} finally {release();this.kafkaConsumerMetrics.recordPollEnd(timer.currentTimeMs());}}

3.2 消费者/消费者组初始化

public boolean poll(Timer timer, boolean waitForJoinGroup) {maybeUpdateSubscriptionMetadata();invokeCompletedOffsetCommitCallbacks();if (subscriptions.hasAutoAssignedPartitions()) {// 如果没有指定分区分配策略直接抛异常if (protocol == null) {throw new IllegalStateException("User configured " + ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG +" to empty while trying to subscribe for group protocol to auto assign partitions");}// Always update the heartbeat last poll time so that the heartbeat thread does not leave the// group proactively due to application inactivity even if (say) the coordinator cannot be found.// 3s心跳pollHeartbeat(timer.currentTimeMs());// 判断Coordinator 是否准备好了if (coordinatorUnknown() && !ensureCoordinatorReady(timer)) {return false;}......maybeAutoCommitOffsetsAsync(timer.currentTimeMs());return true;}

3.3 拉取数据

private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches(Timer timer) {long pollTimeout = coordinator == null " />.remainingMs() :Math.min(coordinator.timeToNextPoll(timer.currentTimeMs()), timer.remainingMs());// if data is available already, return it immediately// 第一次拉取不到数据final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();if (!records.isEmpty()) {return records;}// send any new fetches (won't resend pending fetches)// 开始拉取数据,里面放了一个监听函数,fetcher.sendFetches();// We do not want to be stuck blocking in poll if we are missing some positions// since the offset lookup may be backing off after a failure// NOTE: the use of cachedSubscriptionHashAllFetchPositions means we MUST call// updateAssignmentMetadataIfNeeded before this method.if (!cachedSubscriptionHashAllFetchPositions && pollTimeout > retryBackoffMs) {pollTimeout = retryBackoffMs;}log.trace("Polling for fetches with timeout {}", pollTimeout);Timer pollTimer = time.timer(pollTimeout);client.poll(pollTimer, () -> {// since a fetch might be completed by the background thread, we need this poll condition// to ensure that we do not block unnecessarily in poll()return !fetcher.hasAvailableFetches();});timer.update(pollTimer.currentTimeMs());return fetcher.fetchedRecords();}// 首先抓取数据为空,然后发送请求监听并将数据放入队列,最后再抓取数据,拦截器处理数据

3.4 消费者 Offset 提交

三、服务端源码

生产者消费者源码使用java编写,而服务端源码使用scala编写

程序入口在core→src→main→scala→Kafka→kafka.scala

def main(args: Array[String]): Unit = {try {// 获取相关参数val serverProps = getPropsFromArgs(args)// 创建服务val server = buildServer(serverProps)try {if (!OperatingSystem.IS_WINDOWS && !Java.isIbmJdk)new LoggingSignalHandler().register()} catch {case e: ReflectiveOperationException =>warn("Failed to register optional signal handler that logs a message when the process is terminated " +s"by a signal. Reason for registration failure is: $e", e)}// attach shutdown handler to catch terminating signals as well as normal terminationExit.addShutdownHook("kafka-shutdown-hook", {try server.shutdown()catch {case _: Throwable =>fatal("Halting Kafka.")// Calling exit() can lead to deadlock as exit() can be called multiple times. Force exit.Exit.halt(1)}})// 启动服务try server.startup()catch {case _: Throwable =>// KafkaServer.startup() calls shutdown() in case of exceptions, so we invoke `exit` to set the status codefatal("Exiting Kafka.")Exit.exit(1)}server.awaitShutdown()}catch {case e: Throwable =>fatal("Exiting Kafka due to fatal exception", e)Exit.exit(1)}Exit.exit(0)}