思考kafka在做幂等性的时候,producerId在内存中什么时候清理掉

情景

  最近在看kafka幂等性的源码的时候,在思考一个问题,既然幂等性是通过producerId + Sequence Number来判断是否重复,那么应该在broker缓存中,有保存producerId 和 Sequence Number,那么如果长时间一直使用,是否会由于 producerId 和 Sequence Number 的增长,造成OOM呢?在网上没找到答案,所以本文通过源码,来找到这个答案

代码追踪

  本文不再从具体的producer代码分析了,直接从ApiKeys的代码开始,直接追踪,前面有不理解的朋友,请参考:【转载】万字长文干货 | Kafka 事务性之幂等性实现 

  【步骤1】kafka.server.KafkaApis#handle

 1 def handle(request: RequestChannel.Request) {  2   try {  3     ...  4     request.header.apiKey match {  5       //处理发上来的请求  6       case ApiKeys.PRODUCE => handleProduceRequest(request)  7       ...  8     }  9   } catch { 10     ... 11   } finally { 12     ... 13   } 14 }

  

  【步骤2】kafka.server.KafkaApis#handleProduceRequest

 1 def handleProduceRequest(request: RequestChannel.Request) {  2   ...  3   4   if (authorizedRequestInfo.isEmpty)  5     ...  6   else {  7     val internalTopicsAllowed = request.header.clientId == AdminUtils.AdminClientId  8   9     //【重点】添加数据 10     replicaManager.appendRecords( 11       timeout = produceRequest.timeout.toLong, 12       requiredAcks = produceRequest.acks, 13       internalTopicsAllowed = internalTopicsAllowed, 14       isFromClient = true, 15       entriesPerPartition = authorizedRequestInfo, 16       responseCallback = sendResponseCallback, 17       processingStatsCallback = processingStatsCallback) 18  19     ... 20   } 21 }

  

  【步骤3】kafka.server.ReplicaManager#appendRecords

 1 def appendRecords(timeout: Long,  2                   requiredAcks: Short,  3                   internalTopicsAllowed: Boolean,  4                   isFromClient: Boolean,  5                   entriesPerPartition: Map[TopicPartition, MemoryRecords],  6                   responseCallback: Map[TopicPartition, PartitionResponse] => Unit,  7                   delayedProduceLock: Option[Lock] = None,  8                   processingStatsCallback: Map[TopicPartition, RecordsProcessingStats] => Unit = _ => ()) {  9   if (isValidRequiredAcks(requiredAcks)) { 10     ... 11     //【重点】添加到本地Log 12     val localProduceResults = appendToLocalLog(internalTopicsAllowed = internalTopicsAllowed, 13       isFromClient = isFromClient, entriesPerPartition, requiredAcks) 14     ... 15   } else { 16     ... 17   } 18 }

  

  【步骤4】kafka.server.ReplicaManager#appendToLocalLog

private def appendToLocalLog(internalTopicsAllowed: Boolean,                              isFromClient: Boolean,                              entriesPerPartition: Map[TopicPartition, MemoryRecords],                              requiredAcks: Short): Map[TopicPartition, LogAppendResult] = {   trace(sAppend [$entriesPerPartition] to local log)   entriesPerPartition.map { case (topicPartition, records) =>     brokerTopicStats.topicStats(topicPartition.topic).totalProduceRequestRate.mark()     brokerTopicStats.allTopicsStats.totalProduceRequestRate.mark()      // reject appending to internal topics if it is not allowed     if (Topic.isInternal(topicPartition.topic) && !internalTopicsAllowed) {       ...     } else {       try {         //获取分区操作对象         val partitionOpt = getPartition(topicPartition)         val info = partitionOpt match {           case Some(partition) =>             if (partition eq ReplicaManager.OfflinePartition)               throw new KafkaStorageException(sPartition $topicPartition is in an offline log directory on broker $localBrokerId)             //【重点】在分区中添加数据             partition.appendRecordsToLeader(records, isFromClient, requiredAcks)            case None => throw new UnknownTopicOrPartitionException(Partition %s doesn't exist on %d             .format(topicPartition, localBrokerId))         }          ...       } catch {         ...     }   } }

  

  【步骤5】kafka.cluster.Partition#appendRecordsToLeader

 1 def appendRecordsToLeader(records: MemoryRecords, isFromClient: Boolean, requiredAcks: Int = 0): LogAppendInfo = {  2   val (info, leaderHWIncremented) = inReadLock(leaderIsrUpdateLock) {  3     leaderReplicaIfLocal match {  4       case Some(leaderReplica) =>  5         ...  6   7         //【重点】  8         val info = log.appendAsLeader(records, leaderEpoch = this.leaderEpoch, isFromClient)  9         ... 10       case None => 11         ... 12     } 13   } 14    15   ... 16 }

  

  【步骤6】kafka.log.Log#appendAsLeader

def appendAsLeader(records: MemoryRecords, leaderEpoch: Int, isFromClient: Boolean = true): LogAppendInfo = {   //【重点】添加数据   append(records, isFromClient, assignOffsets = true, leaderEpoch) }

 

  【步骤7】kafka.log.Log#append

1 private def append(records: MemoryRecords, isFromClient: Boolean, assignOffsets: Boolean, leaderEpoch: Int): LogAppendInfo = { 2   maybeHandleIOException(sError while appending records to $topicPartition in dir ${dir.getParent}) { 3     //【重点】检查和验证数据 4     val appendInfo = analyzeAndValidateRecords(records, isFromClient = isFromClient) 5     ... 6   } 7 }

  

  【步骤8】kafka.log.Log#analyzeAndValidateRecords

 1 private def analyzeAndValidateProducerState(records: MemoryRecords, isFromClient: Boolean):  2 (mutable.Map[Long, ProducerAppendInfo], List[CompletedTxn], Option[BatchMetadata]) = {  3   ...  4     5   //遍历所有批次  6   for (batch <- records.batches.asScala if batch.hasProducerId) {  7     //【重点】这里就是producerId的缓存,返回的结果是 ProducerStateEntry 类型  8     val maybeLastEntry = producerStateManager.lastEntry(batch.producerId)  9     ... 10     if (isFromClient) { 11       //【重点】重点就是findDuplicateBatch方法,是检查每个数据,是否发送重复,这个方法来自ProducerStateEntry类里面 12       maybeLastEntry.flatMap(_.findDuplicateBatch(batch)).foreach { duplicate => 13         return (updatedProducers, completedTxns.toList, Some(duplicate)) 14       } 15     } 16      17     ... 18   } 19   ... 20 }

  

  【步骤9】kafka.log.ProducerStateEntry#findDuplicateBatch

def findDuplicateBatch(batch: RecordBatch): Option[BatchMetadata] = {   //如果这个批次的producerEpoch(生产代)不等于当前生产代,就没有重复   if (batch.producerEpoch != producerEpoch)      None   else     //【重点】检查重复的sequence num     batchWithSequenceRange(batch.baseSequence, batch.lastSequence) }

  

  【步骤10】kafka.log.ProducerStateEntry#batchWithSequenceRange

1 def batchWithSequenceRange(firstSeq: Int, lastSeq: Int): Option[BatchMetadata] = { 2   //【重点】过滤出重复的数据,只要offset范围在缓存范围有重合,代表有重复数据 3   val duplicate = batchMetadata.filter { metadata => 4     firstSeq == metadata.firstSeq && lastSeq == metadata.lastSeq 5   } 6   duplicate.headOption 7 }

  

  在这里,我们就知道在【步骤8】,里面的这句,就是producerId缓存的地方,进去查看下

1 //【重点】这里就是producerId的缓存,返回的结果是 ProducerStateEntry 类型 2  8     val maybeLastEntry = producerStateManager.lastEntry(batch.producerId)

  

  kafka.log.ProducerStateManager#lastEntry

  可以看到 producerId 就缓存在producers里

//可以看到 producerId 就缓存在producers里 def lastEntry(producerId: Long): Option[ProducerStateEntry] = producers.get(producerId)

 

  查看下producers是什么类型

1 private val producers = mutable.Map.empty[Long, ProducerStateEntry]

  

  在上下文搜索下,有没有 删除 prodcuerId的动作,找到下面三个地方

  kafka.log.ProducerStateManager#truncateHead

 1 def truncateHead(logStartOffset: Long) {  2   val evictedProducerEntries = producers.filter { case (_, producerState) =>  3     !isProducerRetained(producerState, logStartOffset)  4   }  5   6   val evictedProducerIds = evictedProducerEntries.keySet  7   8   //【重点】删除producerId  9   producers --= evictedProducerIds 10   removeEvictedOngoingTransactions(evictedProducerIds) 11   removeUnreplicatedTransactions(logStartOffset) 12  13   if (lastMapOffset < logStartOffset) 14     lastMapOffset = logStartOffset 15  16   deleteSnapshotsBefore(logStartOffset) 17   lastSnapOffset = latestSnapshotOffset.getOrElse(logStartOffset) 18 }

  向上追踪

  kafka.log.ProducerStateManager#truncateAndReload

 1 def truncateAndReload(logStartOffset: Long, logEndOffset: Long, currentTimeMs: Long) {  2   // remove all out of range snapshots  3   deleteSnapshotFiles(logDir, { snapOffset =>  4     snapOffset > logEndOffset || snapOffset <= logStartOffset  5   })  6   7   if (logEndOffset != mapEndOffset) {  8     producers.clear()  9     ongoingTxns.clear() 10  11     // since we assume that the offset is less than or equal to the high watermark, it is 12     // safe to clear the unreplicated transactions 13     unreplicatedTxns.clear() 14     loadFromSnapshot(logStartOffset, currentTimeMs) 15   } else { 16     //【重点】 17     truncateHead(logStartOffset) 18   } 19 }

 

  kafka.log.Log#recoverSegment

    这个方法都是在Log初始化的时候,才会调用,所以排除

 

  kafka.log.Log#loadProducerState

    这个方法,向上追踪,一个来源于Log初始化,所以排除

    一个来源于 kafka.log.Log#truncateTo,是follower副本向leader副本同步数据的时候触发,这个比较可疑

 

  kafka.log.Log#truncateTo

 1 private[log] def truncateTo(targetOffset: Long): Boolean = {  2   maybeHandleIOException(sError while truncating log to offset $targetOffset for $topicPartition in dir ${dir.getParent}) {  3     if (targetOffset < 0)  4       throw new IllegalArgumentException(sCannot truncate partition $topicPartition to a negative offset (%d)..format(targetOffset))  5     if (targetOffset >= logEndOffset) {  6       info(sTruncating to $targetOffset has no effect as the largest offset in the log is ${logEndOffset - 1})  7       false  8     } else {  9       info(sTruncating to offset $targetOffset) 10       lock synchronized { 11         checkIfMemoryMappedBufferClosed() 12         if (segments.firstEntry.getValue.baseOffset > targetOffset) { 13           truncateFullyAndStartAt(targetOffset) 14         } else { 15           val deletable = logSegments.filter(segment => segment.baseOffset > targetOffset) 16           deletable.foreach(deleteSegment) 17            18           //【重点】清理 19           activeSegment.truncateTo(targetOffset) 20           updateLogEndOffset(targetOffset) 21           this.recoveryPoint = math.min(targetOffset, this.recoveryPoint) 22           this.logStartOffset = math.min(targetOffset, this.logStartOffset) 23           _leaderEpochCache.clearAndFlushLatest(targetOffset) 24           loadProducerState(targetOffset, reloadFromCleanShutdown = false) 25         } 26         true 27       } 28     } 29   } 30 }

 

  kafka.cluster.Partition#truncateTo

1 def truncateTo(offset: Long, isFuture: Boolean) { 2   // The read lock is needed to prevent the follower replica from being truncated while ReplicaAlterDirThread 3   // is executing maybeDeleteAndSwapFutureReplica() to replace follower replica with the future replica. 4   inReadLock(leaderIsrUpdateLock) { 5     //【重点】 6     logManager.truncateTo(Map(topicPartition -> offset), isFuture = isFuture) 7   } 8 }

  

  kafka.server.ReplicaFetcherThread#maybeTruncate

override def maybeTruncate(fetchedEpochs: Map[TopicPartition, EpochEndOffset]): ResultWithPartitions[Map[TopicPartition, Long]] = {   val fetchOffsets = scala.collection.mutable.HashMap.empty[TopicPartition, Long]   val partitionsWithError = mutable.Set[TopicPartition]()    fetchedEpochs.foreach { case (tp, epochOffset) =>     try {       val replica = replicaMgr.getReplicaOrException(tp)       val partition = replicaMgr.getPartition(tp).get       //如果 epochOffset 有异常       if (epochOffset.hasError) {         info(sRetrying leaderEpoch request for partition ${replica.topicPartition} as the leader reported an error: ${epochOffset.error})         partitionsWithError += tp       } else {         val fetchOffset =           if (epochOffset.endOffset == UNDEFINED_EPOCH_OFFSET) {             warn(sBased on follower's leader epoch, leader replied with an unknown offset in ${replica.topicPartition}.  +               sThe initial fetch offset ${partitionStates.stateValue(tp).fetchOffset} will be used for truncation.)             partitionStates.stateValue(tp).fetchOffset           } else if (epochOffset.endOffset >= replica.logEndOffset.messageOffset)             logEndOffset(replica, epochOffset)           else             epochOffset.endOffset         //【重点】         partition.truncateTo(fetchOffset, isFuture = false)         replicaMgr.replicaAlterLogDirsManager.markPartitionsForTruncation(brokerConfig.brokerId, tp, fetchOffset)         fetchOffsets.put(tp, fetchOffset)       }     } catch {       case e: KafkaStorageException =>         info(sFailed to truncate $tp, e)         partitionsWithError += tp     }   }    ResultWithPartitions(fetchOffsets, partitionsWithError) }

  

  kafka.server.AbstractFetcherThread#doWork

override def doWork() {   //【重点】可能清理   maybeTruncate()   val fetchRequest = inLock(partitionMapLock) {     val ResultWithPartitions(fetchRequest, partitionsWithError) = buildFetchRequest(states)     if (fetchRequest.isEmpty) {       trace(sThere are no active partitions. Back off for $fetchBackOffMs ms before sending a fetch request)       partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS)     }     handlePartitionsWithErrors(partitionsWithError)     fetchRequest   }   if (!fetchRequest.isEmpty)     processFetchRequest(fetchRequest) }

  

  kafka.utils.ShutdownableThread#run

 1 override def run(): Unit = {  2   info(Starting)  3   try {  4     while (isRunning)  5       //【重点】一直执行,直到isRunning=false  6       doWork()  7   } catch {  8     case e: FatalExitError =>  9       shutdownInitiated.countDown() 10       shutdownComplete.countDown() 11       info(Stopped) 12       Exit.exit(e.statusCode()) 13     case e: Throwable => 14       if (isRunning) 15         error(Error due to, e) 16   } finally { 17      shutdownComplete.countDown() 18   } 19   info(Stopped) 20 }

  

  ShutdownableThread 的实现类有 kafka.server.AbstractFetcherThread

  kafka.server.AbstractFetcherThread 的实现类有 kafka.server.ReplicaFetcherThread

  kafka.server.ReplicaFetcherThread  这个类被创建,然后运行,就会自动清理了

  kafka.server.ReplicaFetcherManager#createFetcherThread

1 override def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): AbstractFetcherThread = { 2   val prefix = threadNamePrefix.map(tp => s${tp}:).getOrElse() 3   val threadName = s${prefix}ReplicaFetcherThread-$fetcherId-${sourceBroker.id} 4   //新建ReplicaFetcherThread 5   new ReplicaFetcherThread(threadName, fetcherId, sourceBroker, brokerConfig, replicaManager, metrics, time, quotaManager) 6 }

  

  kafka.server.AbstractFetcherManager#addFetcherForPartitions

 1 def addFetcherForPartitions(partitionAndOffsets: Map[TopicPartition, BrokerAndInitialOffset]) {  2   lock synchronized {  3     val partitionsPerFetcher = partitionAndOffsets.groupBy { case(topicPartition, brokerAndInitialFetchOffset) =>  4       BrokerAndFetcherId(brokerAndInitialFetchOffset.broker, getFetcherId(topicPartition.topic, topicPartition.partition))}  5   6     def addAndStartFetcherThread(brokerAndFetcherId: BrokerAndFetcherId, brokerIdAndFetcherId: BrokerIdAndFetcherId) {  7       //【重点2】  8       val fetcherThread = createFetcherThread(brokerAndFetcherId.fetcherId, brokerAndFetcherId.broker)  9       fetcherThreadMap.put(brokerIdAndFetcherId, fetcherThread) 10       //【重点3】 11       fetcherThread.start 12     } 13  14     for ((brokerAndFetcherId, initialFetchOffsets) <- partitionsPerFetcher) { 15       val brokerIdAndFetcherId = BrokerIdAndFetcherId(brokerAndFetcherId.broker.id, brokerAndFetcherId.fetcherId) 16       fetcherThreadMap.get(brokerIdAndFetcherId) match { 17         case Some(f) if f.sourceBroker.host == brokerAndFetcherId.broker.host && f.sourceBroker.port == brokerAndFetcherId.broker.port => 18           // reuse the fetcher thread 19         case Some(f) => 20           f.shutdown() 21           addAndStartFetcherThread(brokerAndFetcherId, brokerIdAndFetcherId) 22         case None => 23           //【重点1】 24           addAndStartFetcherThread(brokerAndFetcherId, brokerIdAndFetcherId) 25       } 26  27       fetcherThreadMap(brokerIdAndFetcherId).addPartitions(initialFetchOffsets.map { case (tp, brokerAndInitOffset) => 28         tp -> brokerAndInitOffset.initOffset 29       }) 30     } 31   } 32  33   info(Added fetcher for partitions %s.format(partitionAndOffsets.map { case (topicPartition, brokerAndInitialOffset) => 34     [ + topicPartition + , initOffset  + brokerAndInitialOffset.initOffset +  to broker  + brokerAndInitialOffset.broker + ] })) 35 }

  

  kafka.server.AbstractFetcherManager#resizeThreadPool

 1 def resizeThreadPool(newSize: Int): Unit = {  2   def migratePartitions(newSize: Int): Unit = {  3     fetcherThreadMap.foreach { case (id, thread) =>  4       val removedPartitions = thread.partitionsAndOffsets  5       removeFetcherForPartitions(removedPartitions.keySet)  6       if (id.fetcherId >= newSize)  7         thread.shutdown()  8       //【重点】  9       addFetcherForPartitions(removedPartitions) 10     } 11   } 12   lock synchronized { 13     val currentSize = numFetchersPerBroker 14     info(sResizing fetcher thread pool size from $currentSize to $newSize) 15     numFetchersPerBroker = newSize 16     if (newSize != currentSize) { 17       // We could just migrate some partitions explicitly to new threads. But this is currently 18       // reassigning all partitions using the new thread size so that hash-based allocation 19       // works with partition add/delete as it did before. 20       migratePartitions(newSize) 21     } 22     shutdownIdleFetcherThreads() 23   } 24 }

  

  kafka.server.DynamicThreadPool#reconfigure

 1 override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = {  2   if (newConfig.numIoThreads != oldConfig.numIoThreads) {  3     //【重点】  4     server.requestHandlerPool.resizeThreadPool(newConfig.numIoThreads)  5   }  6   if (newConfig.numNetworkThreads != oldConfig.numNetworkThreads)  7     server.socketServer.resizeThreadPool(oldConfig.numNetworkThreads, newConfig.numNetworkThreads)  8   if (newConfig.numReplicaFetchers != oldConfig.numReplicaFetchers)  9     //【重点】 10     server.replicaManager.replicaFetcherManager.resizeThreadPool(newConfig.numReplicaFetchers) 11   if (newConfig.numRecoveryThreadsPerDataDir != oldConfig.numRecoveryThreadsPerDataDir) 12     server.getLogManager.resizeRecoveryThreadPool(newConfig.numRecoveryThreadsPerDataDir) 13   if (newConfig.backgroundThreads != oldConfig.backgroundThreads) 14     //【重点】 15     server.kafkaScheduler.resizeThreadPool(newConfig.backgroundThreads) 16 }

  

  kafka.server.DynamicBrokerConfig#updateCurrentConfig

 1 private def updateCurrentConfig(): Unit = {  2   val newProps = mutable.Map[String, String]()  3   newProps ++= staticBrokerConfigs  4   overrideProps(newProps, dynamicDefaultConfigs)  5   overrideProps(newProps, dynamicBrokerConfigs)  6   val oldConfig = currentConfig  7   val (newConfig, brokerReconfigurablesToUpdate) = processReconfiguration(newProps, validateOnly = false)  8   if (newConfig ne currentConfig) {  9     currentConfig = newConfig 10     kafkaConfig.updateCurrentConfig(newConfig) 11  12     // Process BrokerReconfigurable updates after current config is updated 13     //【重点】 14     brokerReconfigurablesToUpdate.foreach(_.reconfigure(oldConfig, newConfig)) 15   } 16 }

  

  kafka.server.DynamicBrokerConfig#updateBrokerConfig

 1 private[server] def updateBrokerConfig(brokerId: Int, persistentProps: Properties): Unit = CoreUtils.inWriteLock(lock) {  2   try {  3     val props = fromPersistentProps(persistentProps, perBrokerConfig = true)  4     dynamicBrokerConfigs.clear()  5     dynamicBrokerConfigs ++= props.asScala  6     //【重点】  7     updateCurrentConfig()  8   } catch {  9     case e: Exception => error(sPer-broker configs of $brokerId could not be applied: $persistentProps, e) 10   } 11 }

  

  kafka.server.DynamicBrokerConfig#initialize

1 private[server] def initialize(zkClient: KafkaZkClient): Unit = { 2   currentConfig = new KafkaConfig(kafkaConfig.props, false, None) 3   val adminZkClient = new AdminZkClient(zkClient) 4   updateDefaultConfig(adminZkClient.fetchEntityConfig(ConfigType.Broker, ConfigEntityName.Default)) 5   val props = adminZkClient.fetchEntityConfig(ConfigType.Broker, kafkaConfig.brokerId.toString) 6   val brokerConfig = maybeReEncodePasswords(props, adminZkClient) 7   //【重点】 8   updateBrokerConfig(kafkaConfig.brokerId, brokerConfig) 9 }

  

  kafka.server.KafkaServer#startup

       在KafkaServer的启动方法中,初始化Broker的配置

 1 def startup() {  2   try {  3     ...  4       5     val canStartup = isStartingUp.compareAndSet(false, true)  6     if (canStartup) {  7       ...  8         9       // initialize dynamic broker configs from ZooKeeper. Any updates made after this will be 10       // applied after DynamicConfigManager starts. 11       //【重点】 12       config.dynamicConfig.initialize(zkClient) 13  14       ... 15     } 16   } 17   catch { 18     ... 19   } 20 }

  自此,我们就追踪到了,清理producerId的源码了,在kafkerServer启动后,初始化Borker的配置的时候,就会启动一个while循环一直执行dowork()方法,一直做清理的操作,所以不用担心producerId占用内存导致OOM