Kafka 生产者源码

image

2.1 初始化

image

image

2.1.1 程序入口

从用户自己编写的 main 方法开始阅读

package com.atguigu.kafka.producer;  import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer;  import java.util.Properties; import java.util.stream.IntStream;  public class CustomProducer {     public static void main(String[] args) {         // 1. 创建 kafka 生产者的配置对象         Properties properties = new Properties();         // 2. 给 kafka 配置对象添加配置信息:bootstrap.servers         properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, hadoop102:9092);         // key,value 序列化(必须):key.serializer,value.serializer         properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());         properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());         // 3. 创建 kafka 生产者对象         try (KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties)) {             // 4. 调用 send 方法,发送消息             IntStream.range(0, 7)                     .mapToObj(i -> new ProducerRecord<>(first, i, test, atguigu))                     .forEach(kafkaProducer::send);         }     } } 

2.1.2 生产者 main 线程初始化

点击 main()方法中的 KafkaProducer(),跳转到 KafkaProducer 构造方法。

org.apache.kafka.clients.producer.KafkaProducer#KafkaProducer(org.apache.kafka.clients.producer.ProducerConfig, org.apache.kafka.common.serialization.Serializer, org.apache.kafka.common.serialization.Serializer, org.apache.kafka.clients.producer.internals.ProducerMetadata, org.apache.kafka.clients.KafkaClient, org.apache.kafka.clients.producer.internals.ProducerInterceptors<K,V>, org.apache.kafka.common.utils.Time)

// visible for testing @SuppressWarnings(unchecked) KafkaProducer(ProducerConfig config,               Serializer<K> keySerializer,               Serializer<V> valueSerializer,               ProducerMetadata metadata,               KafkaClient kafkaClient,               ProducerInterceptors<K, V> interceptors,               Time time) {     try {         this.producerConfig = config;         this.time = time;          // 获取事务ID         String transactionalId = config.getString(ProducerConfig.TRANSACTIONAL_ID_CONFIG);          // 获取客户端ID         this.clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);          LogContext logContext;         if (transactionalId == null)             logContext = new LogContext(String.format([Producer clientId=%s] , clientId));         else             logContext = new LogContext(String.format([Producer clientId=%s, transactionalId=%s] , clientId, transactionalId));         log = logContext.logger(KafkaProducer.class);         log.trace(Starting the Kafka producer);          Map<String, String> metricTags = Collections.singletonMap(client-id, clientId);         MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG))                 .timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)                 .recordLevel(Sensor.RecordingLevel.forName(config.getString(ProducerConfig.METRICS_RECORDING_LEVEL_CONFIG)))                 .tags(metricTags);         List<MetricsReporter> reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,                 MetricsReporter.class,                 Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));         // 监控Kafka运行情况         JmxReporter jmxReporter = new JmxReporter();         jmxReporter.configure(config.originals(Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)));         reporters.add(jmxReporter);         MetricsContext metricsContext = new KafkaMetricsContext(JMX_PREFIX,                 config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX));         this.metrics = new Metrics(metricConfig, reporters, time, metricsContext);         this.producerMetrics = new KafkaProducerMetrics(metrics);         // 获取分区器         this.partitioner = config.getConfiguredInstance(                 ProducerConfig.PARTITIONER_CLASS_CONFIG,                 Partitioner.class,                 Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));         long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);         // key和value的序列化         if (keySerializer == null) {             this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,                                                                                      Serializer.class);             this.keySerializer.configure(config.originals(Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)), true);         } else {             config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);             this.keySerializer = keySerializer;         }         if (valueSerializer == null) {             this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,                                                                                        Serializer.class);             this.valueSerializer.configure(config.originals(Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)), false);         } else {             config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);             this.valueSerializer = valueSerializer;         }          // 拦截器处理(拦截器可以有多个)         List<ProducerInterceptor<K, V>> interceptorList = (List) config.getConfiguredInstances(                 ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,                 ProducerInterceptor.class,                 Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));         if (interceptors != null)             this.interceptors = interceptors;         else             this.interceptors = new ProducerInterceptors<>(interceptorList);         ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keySerializer,                 valueSerializer, interceptorList, reporters);         // 控制单条日志大小 默认1m         this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);         // RecordAccumulator 缓冲区大小 默认32m         this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);         // 压缩 默认是none         this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));          this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);         int deliveryTimeoutMs = configureDeliveryTimeout(config, log);          this.apiVersions = new ApiVersions();         this.transactionManager = configureTransactionState(config, logContext);         // 缓冲区对象 默认是32m         // batch.size 缓冲区一批数据最大值,默认 16k         // 压缩方式 默认是none         // linger.ms 默认是0         // 重试间隔时间,默认值 100ms。         // delivery.timeout.ms 默认值 2 分钟。         // request.timeout.ms 默认值 30s。         this.accumulator = new RecordAccumulator(logContext,                 config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),                 this.compressionType,                 lingerMs(config),                 retryBackoffMs,                 deliveryTimeoutMs,                 metrics,                 PRODUCER_METRIC_GROUP_NAME,                 time,                 apiVersions,                 transactionManager,                 new BufferPool(this.totalMemorySize, config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), metrics, time, PRODUCER_METRIC_GROUP_NAME));          // 连接上Kafka集群地址         List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(                 config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG),                 config.getString(ProducerConfig.CLIENT_DNS_LOOKUP_CONFIG));         // 获取元数据         if (metadata != null) {             this.metadata = metadata;         } else {             // metadata.max.age.ms 默认值 5 分钟。生产者每隔多久需要更新一下自己的元数据             // metadata.max.idle.ms 默认值 5 分钟。网络最多空闲时间设置,超过该阈值,就关闭该网络             this.metadata = new ProducerMetadata(retryBackoffMs,                     config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG),                     config.getLong(ProducerConfig.METADATA_MAX_IDLE_CONFIG),                     logContext,                     clusterResourceListeners,                     Time.SYSTEM);             this.metadata.bootstrap(addresses);         }         this.errors = this.metrics.sensor(errors);         // 初始化sender线程         this.sender = newSender(logContext, kafkaClient, this.metadata);         String ioThreadName = NETWORK_THREAD_PREFIX +  |  + clientId;         // 把sender线程放到后台         this.ioThread = new KafkaThread(ioThreadName, this.sender, true);         // 启动sender线程         this.ioThread.start();         config.logUnused();         AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics, time.milliseconds());         log.debug(Kafka producer started);     } catch (Throwable t) {         // call close methods if internal objects are already constructed this is to prevent resource leak. see KAFKA-2121         close(Duration.ofMillis(0), true);         // now propagate the exception         throw new KafkaException(Failed to construct kafka producer, t);     } } 

2.1.3 生产者 sender 线程初始化

点击 newSender()方法,查看发送线程初始化。

org.apache.kafka.clients.producer.KafkaProducer#newSender

// visible for testing Sender newSender(LogContext logContext, KafkaClient kafkaClient, ProducerMetadata metadata) {     // 缓存请求的个数 默认是5个     int maxInflightRequests = configureInflightRequests(producerConfig);     // 请求超时时间 默认30s     int requestTimeoutMs = producerConfig.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);     ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(producerConfig, time, logContext);     ProducerMetrics metricsRegistry = new ProducerMetrics(this.metrics);     Sensor throttleTimeSensor = Sender.throttleTimeSensor(metricsRegistry.senderMetrics);     // 创建一个客户端对象     // clientId 客户端id     // maxInflightRequests 缓存请求的个数 默认是5个     // RECONNECT_BACKOFF_MS_CONFIG 默认值 50ms。重试时间间隔     // RECONNECT_BACKOFF_MAX_MS_CONFIG 默认值 1000ms。重试的总时间。每次重试失败时,呈指数增加重试时间,直至达到此最大值     // SEND_BUFFER_CONFIG 默认值 128k。 socket 发送数据的缓冲区大小     // RECEIVE_BUFFER_CONFIG  默认值 32k。socket 接收数据的缓冲区大小     // requestTimeoutMs 默认值 30s。     // SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG 默认值 10s。生产者和服务器通信连接建立的时间。如果在超时之前没有建立连接,将关闭通信。     // SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG 默认值 30s。生产者和服务器通信,每次连续连接失败时,连接建立超时将呈指数增加,直至达到此最大值。     KafkaClient client = kafkaClient != null ? kafkaClient : new NetworkClient(             new Selector(producerConfig.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),                     this.metrics, time, producer, channelBuilder, logContext),             metadata,             clientId,             maxInflightRequests,             producerConfig.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),             producerConfig.getLong(ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG),             producerConfig.getInt(ProducerConfig.SEND_BUFFER_CONFIG),             producerConfig.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),             requestTimeoutMs,             producerConfig.getLong(ProducerConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG),             producerConfig.getLong(ProducerConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG),             time,             true,             apiVersions,             throttleTimeSensor,             logContext);     // 0:生产者发送过来不需要应答;1:leader收到,应答;-1:leader和isr队列里面所有的都受到了应答     short acks = configureAcks(producerConfig, log);     // 创建Sender线程     return new Sender(logContext,             client,             metadata,             this.accumulator,             maxInflightRequests == 1,             producerConfig.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),             acks,             producerConfig.getInt(ProducerConfig.RETRIES_CONFIG),             metricsRegistry.senderMetrics,             time,             requestTimeoutMs,             producerConfig.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG),             this.transactionManager,             apiVersions); } 

Sender 对象被放到了一个线程中启动,所有需要点击 newSender()方法中的 Sender,并 找到 sender 对象中的 run()方法。

org.apache.kafka.clients.producer.internals.Sender#Sender

/**  * The main run loop for the sender thread  */ @Override public void run() {     log.debug(Starting Kafka producer I/O thread.);      // main loop, runs until close is called     while (running) {         try {             // sender线程从缓冲区准备拉取数据,刚启动拉不到数据              runOnce();         } catch (Exception e) {             log.error(Uncaught error in kafka producer I/O thread: , e);         }     }     .... } 

2.2 发送数据到缓冲区

image

2.2.1 发送总体流程

点击自己编写的 CustomProducer.java 中的 send()方法。

IntStream.range(0, 7)         .mapToObj(i -> new ProducerRecord<>(first, i, test, atguigu))         .forEach(kafkaProducer::send); 
@Override public Future<RecordMetadata> send(ProducerRecord<K, V> record) {     return send(record, null); } 
@Override public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {     // intercept the record, which can be potentially modified; this method does not throw exceptions     // 拦截器处理发送的数据     ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);     return doSend(interceptedRecord, callback); } 

点击 onSend()方法,进行拦截器相关处理。

org.apache.kafka.clients.producer.internals.ProducerInterceptors#onSend

public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {     ProducerRecord<K, V> interceptRecord = record;     for (ProducerInterceptor<K, V> interceptor : this.interceptors) {         try {             // 拦截器处理             interceptRecord = interceptor.onSend(interceptRecord);         } catch (Exception e) {             // do not propagate interceptor exception, log and continue calling other interceptors             // be careful not to throw exception from here             if (record != null)                 log.warn(Error executing interceptor onSend callback for topic: {}, partition: {}, record.topic(), record.partition(), e);             else                 log.warn(Error executing interceptor onSend callback, e);         }     }     return interceptRecord; } 

从拦截器处理中返回,点击 doSend()方法。

org.apache.kafka.clients.producer.KafkaProducer#doSend

/**  * Implementation of asynchronously send a record to a topic.  */ private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {     TopicPartition tp = null;     try {         throwIfProducerClosed();         // first make sure the metadata for the topic is available         long nowMs = time.milliseconds();         ClusterAndWaitTime clusterAndWaitTime;         try {             // 获取元数据             clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), nowMs, maxBlockTimeMs);         } catch (KafkaException e) {             if (metadata.isClosed())                 throw new KafkaException(Producer closed while send in progress, e);             throw e;         }         nowMs += clusterAndWaitTime.waitedOnMetadataMs;         long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);         Cluster cluster = clusterAndWaitTime.cluster;         // 序列化相关操作         byte[] serializedKey;         try {             serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());         } catch (ClassCastException cce) {             throw new SerializationException(Can't convert key of class  + record.key().getClass().getName() +                      to class  + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +                      specified in key.serializer, cce);         }         byte[] serializedValue;         try {             serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());         } catch (ClassCastException cce) {             throw new SerializationException(Can't convert value of class  + record.value().getClass().getName() +                      to class  + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +                      specified in value.serializer, cce);         }         // 分区操作         int partition = partition(record, serializedKey, serializedValue, cluster);         tp = new TopicPartition(record.topic(), partition);          setReadOnly(record.headers());         Header[] headers = record.headers().toArray();          int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),                 compressionType, serializedKey, serializedValue, headers);         // 保证数据大小能够传输(序列化后的 压缩后的)         ensureValidRecordSize(serializedSize);         long timestamp = record.timestamp() == null ? nowMs : record.timestamp();         if (log.isTraceEnabled()) {             log.trace(Attempting to append record {} with callback {} to topic {} partition {}, record, callback, record.topic(), partition);         }         // producer callback will make sure to call both 'callback' and interceptor callback         Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);          if (transactionManager != null && transactionManager.isTransactional()) {             transactionManager.failIfNotReadyForSend();         }         // accumulator缓存 追加数据,result是是否添加成功的结果         RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,                 serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs);          if (result.abortForNewBatch) {             int prevPartition = partition;             partitioner.onNewBatch(record.topic(), cluster, prevPartition);             partition = partition(record, serializedKey, serializedValue, cluster);             tp = new TopicPartition(record.topic(), partition);             if (log.isTraceEnabled()) {                 log.trace(Retrying append due to new batch creation for topic {} partition {}. The old partition was {}, record.topic(), partition, prevPartition);             }             // producer callback will make sure to call both 'callback' and interceptor callback             interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);              result = accumulator.append(tp, timestamp, serializedKey,                 serializedValue, headers, interceptCallback, remainingWaitMs, false, nowMs);         }          if (transactionManager != null && transactionManager.isTransactional())             transactionManager.maybeAddPartitionToTransaction(tp);          // 批次大小已经满了或者创建了一个新的批次         if (result.batchIsFull || result.newBatchCreated) {             log.trace(Waking up the sender since topic {} partition {} is either full or getting a new batch, record.topic(), partition);             // 唤醒发送线程             this.sender.wakeup();         }         return result.future;         // handling exceptions and record the errors;         // for API exceptions return them in the future,         // for other exceptions throw directly     } catch (ApiException e) {         log.debug(Exception occurred during message send:, e);         if (callback != null)             callback.onCompletion(null, e);         this.errors.record();         this.interceptors.onSendError(record, tp, e);         return new FutureFailure(e);     } catch (InterruptedException e) {         this.errors.record();         this.interceptors.onSendError(record, tp, e);         throw new InterruptException(e);     } catch (KafkaException e) {         this.errors.record();         this.interceptors.onSendError(record, tp, e);         throw e;     } catch (Exception e) {         // we notify interceptor about all exceptions, since onSend is called before anything else in this method         this.interceptors.onSendError(record, tp, e);         throw e;     } } 

2.2.2 分区选择

详解默认分区规则。

org.apache.kafka.clients.producer.KafkaProducer#partition

/**  * computes partition for given record.  * if the record has partition returns the value otherwise  * calls configured partitioner class to compute the partition.  */ private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {     Integer partition = record.partition();     // 如果指定分区,按照指定分区配置     return partition != null ?             partition :             // 分区器选择分区             partitioner.partition(                     record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster); } 

点击 partition,跳转到 Partitioner 接口。选中 partition,点击 ctrl+ h,查找接口实现类

/**  * Compute the partition for the given record.  *  * @param topic The topic name  * @param key The key to partition on (or null if no key)  * @param keyBytes The serialized key to partition on( or null if no key)  * @param value The value to partition on or null  * @param valueBytes The serialized value to partition on or null  * @param cluster The current cluster metadata  */ int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster); 

选择默认的分区器 DefaultPartitioner

org.apache.kafka.clients.producer.internals.DefaultPartitioner#partition(java.lang.String, java.lang.Object, byte[], java.lang.Object, byte[], org.apache.kafka.common.Cluster, int)

/**  * Compute the partition for the given record.  *  * @param topic The topic name  * @param numPartitions The number of partitions of the given {@code topic}  * @param key The key to partition on (or null if no key)  * @param keyBytes serialized key to partition on (or null if no key)  * @param value The value to partition on or null  * @param valueBytes serialized value to partition on or null  * @param cluster The current cluster metadata  */ public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster,                      int numPartitions) {     // 没有指定key     if (keyBytes == null) {         // 按照粘性分区处理         return stickyPartitionCache.partition(topic, cluster);     }     // 如果指定key,按照key的hash值对分区取模     // hash the keyBytes to choose a partition     return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } 

2.2.3 发送消息大小校验

详解缓冲区大小

org.apache.kafka.clients.producer.KafkaProducer#ensureValidRecordSize

/**  * Validate that the record size isn't too large  */ private void ensureValidRecordSize(int size) {     // 单条消息最大值 maxRequestSize 1m     if (size > maxRequestSize)         throw new RecordTooLargeException(The message is  + size +                  bytes when serialized which is larger than  + maxRequestSize + , which is the value of the  +                 ProducerConfig.MAX_REQUEST_SIZE_CONFIG +  configuration.);     // totalMemorySize 缓存大小 32m     if (size > totalMemorySize)         throw new RecordTooLargeException(The message is  + size +                  bytes when serialized which is larger than the total memory buffer you have configured with the  +                 ProducerConfig.BUFFER_MEMORY_CONFIG +                  configuration.); } 

2.2.4 内存池

RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,         serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs); 
/**  * Add a record to the accumulator, return the append result  * <p>  * The append result will contain the future metadata, and flag for whether the appended batch is full or a new batch is created  * <p>  *  * @param tp The topic/partition to which this record is being sent  * @param timestamp The timestamp of the record  * @param key The key for the record  * @param value The value for the record  * @param headers the Headers for the record  * @param callback The user-supplied callback to execute when the request is complete  * @param maxTimeToBlock The maximum time in milliseconds to block for buffer memory to be available  * @param abortOnNewBatch A boolean that indicates returning before a new batch is created and  *                        running the partitioner's onNewBatch method before trying to append again  * @param nowMs The current time, in milliseconds  */ public RecordAppendResult append(TopicPartition tp,                                  long timestamp,                                  byte[] key,                                  byte[] value,                                  Header[] headers,                                  Callback callback,                                  long maxTimeToBlock,                                  boolean abortOnNewBatch,                                  long nowMs) throws InterruptedException {     // We keep track of the number of appending thread to make sure we do not miss batches in     // abortIncompleteBatches().     appendsInProgress.incrementAndGet();     ByteBuffer buffer = null;     if (headers == null) headers = Record.EMPTY_HEADERS;     try {         // check if we have an in-progress batch         // 获取或者创建一个队列(按照每个主题的分区)         Deque<ProducerBatch> dq = getOrCreateDeque(tp);         synchronized (dq) {             if (closed)                 throw new KafkaException(Producer closed while send in progress);             // 尝试向队列里面添加数据(正常添加不成功)             RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs);             if (appendResult != null)                 return appendResult;         }          // we don't have an in-progress record batch try to allocate a new batch         if (abortOnNewBatch) {             // Return a result that will cause another call to append.             return new RecordAppendResult(null, false, false, true);         }          byte maxUsableMagic = apiVersions.maxUsableProduceMagic();         // 取批次大小(默认 16k)和消息大小的最大值(上限默认 1m)。这样设计的主要原因是有可能一条消息的大小大于批次大小。         int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));         log.trace(Allocating a new {} byte message buffer for topic {} partition {} with remaining timeout {}ms, size, tp.topic(), tp.partition(), maxTimeToBlock);         // 申请内存 内存池分配内存         buffer = free.allocate(size, maxTimeToBlock);          // Update the current time in case the buffer allocation blocked above.         nowMs = time.milliseconds();         synchronized (dq) {             // Need to check if producer is closed again after grabbing the dequeue lock.             if (closed)                 throw new KafkaException(Producer closed while send in progress);              RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs);             if (appendResult != null) {                 // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...                 return appendResult;             }              // 分装内存buffer             MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);             // 再次封装(得到真正的批次大小)             ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, nowMs);             FutureRecordMetadata future = Objects.requireNonNull(batch.tryAppend(timestamp, key, value, headers,                     callback, nowMs));              // 向队列的末尾添加批次             dq.addLast(batch);             incomplete.add(batch);              // Don't deallocate this buffer in the finally block as it's being used in the record batch             buffer = null;             return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true, false);         }     } finally {         // 释放内存         if (buffer != null)             free.deallocate(buffer);         appendsInProgress.decrementAndGet();     } } 

2.3 sender 线程发送数据

image

详解发送线程。

if (result.batchIsFull || result.newBatchCreated) {     log.trace(Waking up the sender since topic {} partition {} is either full or getting a new batch, record.topic(), partition);     this.sender.wakeup(); } 

进入 sender 发送线程的 run()方法。

// main loop, runs until close is called while (running) {     try {         runOnce();     } catch (Exception e) {         log.error(Uncaught error in kafka producer I/O thread: , e);     } } 
/**  * Run a single iteration of sending  *  */ void runOnce() {     if (transactionManager != null) {         try {             // 事务相关操作             transactionManager.maybeResolveSequences();              // do not continue sending if the transaction manager is in a failed state             if (transactionManager.hasFatalError()) {                 RuntimeException lastError = transactionManager.lastError();                 if (lastError != null)                     maybeAbortBatches(lastError);                 client.poll(retryBackoffMs, time.milliseconds());                 return;             }              // Check whether we need a new producerId. If so, we will enqueue an InitProducerId             // request which will be sent below             transactionManager.bumpIdempotentEpochAndResetIdIfNeeded();              if (maybeSendAndPollTransactionalRequest()) {                 return;             }         } catch (AuthenticationException e) {             // This is already logged as error, but propagated here to perform any clean ups.             log.trace(Authentication exception while processing transactional request, e);             transactionManager.authenticationFailed(e);         }     }      long currentTimeMs = time.milliseconds();     // 发送数据     long pollTimeout = sendProducerData(currentTimeMs);     // 获取发送结果     client.poll(pollTimeout, currentTimeMs); } 

org.apache.kafka.clients.producer.internals.Sender#sendProducerData

private long sendProducerData(long now) {     // 获取元数据     Cluster cluster = metadata.fetch();     // 判断32m缓存是否准备好     // get the list of partitions with data ready to send     RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);      // if there are any partitions whose leaders are not known yet, force metadata update     if (!result.unknownLeaderTopics.isEmpty()) {         // The set of topics with unknown leader contains topics with leader election pending as well as         // topics which may have expired. Add the topic again to metadata to ensure it is included         // and request metadata update, since there are messages to send to the topic.         for (String topic : result.unknownLeaderTopics)             this.metadata.add(topic, now);          log.debug(Requesting metadata update due to unknown leader topics from the batched records: {},             result.unknownLeaderTopics);         this.metadata.requestUpdate();     }      // 删除掉没有准备好发送的数据     // remove any nodes we aren't ready to send to     Iterator<Node> iter = result.readyNodes.iterator();     long notReadyTimeout = Long.MAX_VALUE;     while (iter.hasNext()) {         Node node = iter.next();         if (!this.client.ready(node, now)) {             iter.remove();             notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));         }     }      // 发送每个节点数据,进行封装     // create produce requests     Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);     addToInflightBatches(batches);     if (guaranteeMessageOrder) {         // Mute all the partitions drained         for (List<ProducerBatch> batchList : batches.values()) {             for (ProducerBatch batch : batchList)                 this.accumulator.mutePartition(batch.topicPartition);         }     }      accumulator.resetNextBatchExpiryTime();     List<ProducerBatch> expiredInflightBatches = getExpiredInflightBatches(now);     List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(now);     expiredBatches.addAll(expiredInflightBatches);      // Reset the producer id if an expired batch has previously been sent to the broker. Also update the metrics     // for expired batches. see the documentation of @TransactionState.resetIdempotentProducerId to understand why     // we need to reset the producer id here.     if (!expiredBatches.isEmpty())         log.trace(Expired {} batches in accumulator, expiredBatches.size());     for (ProducerBatch expiredBatch : expiredBatches) {         String errorMessage = Expiring  + expiredBatch.recordCount +  record(s) for  + expiredBatch.topicPartition             + : + (now - expiredBatch.createdMs) +  ms has passed since batch creation;         failBatch(expiredBatch, new TimeoutException(errorMessage), false);         if (transactionManager != null && expiredBatch.inRetry()) {             // This ensures that no new batches are drained until the current in flight batches are fully resolved.             transactionManager.markSequenceUnresolved(expiredBatch);         }     }     sensors.updateProduceRequestMetrics(batches);      // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately     // loop and try sending more data. Otherwise, the timeout will be the smaller value between next batch expiry     // time, and the delay time for checking data availability. Note that the nodes may have data that isn't yet     // sendable due to lingering, backing off, etc. This specifically does not include nodes with sendable data     // that aren't ready to send since they would cause busy looping.     long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);     pollTimeout = Math.min(pollTimeout, this.accumulator.nextExpiryTimeMs() - now);     pollTimeout = Math.max(pollTimeout, 0);     if (!result.readyNodes.isEmpty()) {         log.trace(Nodes with data ready to send: {}, result.readyNodes);         // if some partitions are already ready to be sent, the select time would be 0;         // otherwise if some partition already has some data accumulated but not ready yet,         // the select time will be the time difference between now and its linger expiry time;         // otherwise the select time will be the time difference between now and the metadata expiry time;         pollTimeout = 0;     }     // 发送请求     sendProduceRequests(batches, now);     return pollTimeout; } 

org.apache.kafka.clients.producer.internals.RecordAccumulator#ready

/**  * Get a list of nodes whose partitions are ready to be sent, and the earliest time at which any non-sendable  * partition will be ready; Also return the flag for whether there are any unknown leaders for the accumulated  * partition batches.  * <p>  * A destination node is ready to send data if:  * <ol>  * <li>There is at least one partition that is not backing off its send  * <li><b>and</b> those partitions are not muted (to prevent reordering if  *   {@value org.apache.kafka.clients.producer.ProducerConfig#MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION}  *   is set to one)</li>  * <li><b>and <i>any</i></b> of the following are true</li>  * <ul>  *     <li>The record set is full</li>  *     <li>The record set has sat in the accumulator for at least lingerMs milliseconds</li>  *     <li>The accumulator is out of memory and threads are blocking waiting for data (in this case all partitions  *     are immediately considered ready).</li>  *     <li>The accumulator has been closed</li>  * </ul>  * </ol>  */ public ReadyCheckResult ready(Cluster cluster, long nowMs) {     Set<Node> readyNodes = new HashSet<>();     long nextReadyCheckDelayMs = Long.MAX_VALUE;     Set<String> unknownLeaderTopics = new HashSet<>();      boolean exhausted = this.free.queued() > 0;     for (Map.Entry<TopicPartition, Deque<ProducerBatch>> entry : this.batches.entrySet()) {         Deque<ProducerBatch> deque = entry.getValue();         synchronized (deque) {             // When producing to a large number of partitions, this path is hot and deques are often empty.             // We check whether a batch exists first to avoid the more expensive checks whenever possible.             ProducerBatch batch = deque.peekFirst();             if (batch != null) {                 TopicPartition part = entry.getKey();                 Node leader = cluster.leaderFor(part);                 if (leader == null) {                     // This is a partition for which leader is not known, but messages are available to send.                     // Note that entries are currently not removed from batches when deque is empty.                     unknownLeaderTopics.add(part.topic());                 } else if (!readyNodes.contains(leader) && !isMuted(part)) {                     long waitedTimeMs = batch.waitedTimeMs(nowMs);                     // 如果不是第一次拉取该批次数据,且等待时间没有超过重试时间,backingOff=true                     boolean backingOff = batch.attempts() > 0 && waitedTimeMs < retryBackoffMs;                     // 如果 backingOff=true,选择重试时间,如果不是重试,选择 lingerMs                     long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;                     // 批次大小满足发送条件                     boolean full = deque.size() > 1 || batch.isFull();                     // 如果超市,也要发送                     boolean expired = waitedTimeMs >= timeToWaitMs;                     boolean transactionCompleting = transactionManager != null && transactionManager.isCompleting();                     boolean sendable = full                         || expired                         || exhausted                         || closed                         || flushInProgress()                         || transactionCompleting;                     if (sendable && !backingOff) {                         readyNodes.add(leader);                     } else {                         long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);                         // Note that this results in a conservative estimate since an un-sendable partition may have                         // a leader that will later be found to have sendable data. However, this is good enough                         // since we'll just wake up and then sleep again for the remaining time.                         nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);                     }                 }             }         }     }     return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics); } 

org.apache.kafka.clients.producer.internals.RecordAccumulator#drain

/**  * Drain all the data for the given nodes and collate them into a list of batches that will fit within the specified  * size on a per-node basis. This method attempts to avoid choosing the same topic-node over and over.  *  * @param cluster The current cluster metadata  * @param nodes The list of node to drain  * @param maxSize The maximum number of bytes to drain  * @param now The current unix time in milliseconds  * @return A list of {@link ProducerBatch} for each node specified with total size less than the requested maxSize.  */ public Map<Integer, List<ProducerBatch>> drain(Cluster cluster, Set<Node> nodes, int maxSize, long now) {     if (nodes.isEmpty())         return Collections.emptyMap();      Map<Integer, List<ProducerBatch>> batches = new HashMap<>();     // 发往同一个 broker 节点的数据,打包为一个请求批次。     for (Node node : nodes) {         List<ProducerBatch> ready = drainBatchesForOneNode(cluster, node, maxSize, now);         batches.put(node.id(), ready);     }     return batches; } 

org.apache.kafka.clients.producer.internals.Sender#sendProduceRequests

/**  * Transfer the record batches into a list of produce requests on a per-node basis  */ private void sendProduceRequests(Map<Integer, List<ProducerBatch>> collated, long now) {     for (Map.Entry<Integer, List<ProducerBatch>> entry : collated.entrySet())         sendProduceRequest(now, entry.getKey(), acks, requestTimeoutMs, entry.getValue()); } 

org.apache.kafka.clients.producer.internals.Sender#sendProduceRequest

/**  * Create a produce request from the given record batches  */ private void sendProduceRequest(long now, int destination, short acks, int timeout, List<ProducerBatch> batches) {     if (batches.isEmpty())         return;      final Map<TopicPartition, ProducerBatch> recordsByPartition = new HashMap<>(batches.size());      // find the minimum magic version used when creating the record sets     byte minUsedMagic = apiVersions.maxUsableProduceMagic();     for (ProducerBatch batch : batches) {         if (batch.magic() < minUsedMagic)             minUsedMagic = batch.magic();     }     ProduceRequestData.TopicProduceDataCollection tpd = new ProduceRequestData.TopicProduceDataCollection();     for (ProducerBatch batch : batches) {         TopicPartition tp = batch.topicPartition;         MemoryRecords records = batch.records();          // down convert if necessary to the minimum magic used. In general, there can be a delay between the time         // that the producer starts building the batch and the time that we send the request, and we may have         // chosen the message format based on out-dated metadata. In the worst case, we optimistically chose to use         // the new message format, but found that the broker didn't support it, so we need to down-convert on the         // client before sending. This is intended to handle edge cases around cluster upgrades where brokers may         // not all support the same message format version. For example, if a partition migrates from a broker         // which is supporting the new magic version to one which doesn't, then we will need to convert.         if (!records.hasMatchingMagic(minUsedMagic))             records = batch.records().downConvert(minUsedMagic, 0, time).records();         ProduceRequestData.TopicProduceData tpData = tpd.find(tp.topic());         if (tpData == null) {             tpData = new ProduceRequestData.TopicProduceData().setName(tp.topic());             tpd.add(tpData);         }         tpData.partitionData().add(new ProduceRequestData.PartitionProduceData()                 .setIndex(tp.partition())                 .setRecords(records));         recordsByPartition.put(tp, batch);     }      String transactionalId = null;     if (transactionManager != null && transactionManager.isTransactional()) {         transactionalId = transactionManager.transactionalId();     }      ProduceRequest.Builder requestBuilder = ProduceRequest.forMagic(minUsedMagic,             new ProduceRequestData()                     .setAcks(acks)                     .setTimeoutMs(timeout)                     .setTransactionalId(transactionalId)                     .setTopicData(tpd));     RequestCompletionHandler callback = response -> handleProduceResponse(response, recordsByPartition, time.milliseconds());      String nodeId = Integer.toString(destination);     // 创建发送请求对象     ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0,             requestTimeoutMs, callback);     // 发送请求     client.send(clientRequest, now);     log.trace(Sent produce request to {}: {}, nodeId, requestBuilder); } 

org.apache.kafka.clients.NetworkClient#doSend(org.apache.kafka.clients.ClientRequest, boolean, long)

private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now) {     ensureActive();     String nodeId = clientRequest.destination();     if (!isInternalRequest) {         // If this request came from outside the NetworkClient, validate         // that we can send data.  If the request is internal, we trust         // that internal code has done this validation.  Validation         // will be slightly different for some internal requests (for         // example, ApiVersionsRequests can be sent prior to being in         // READY state.)         if (!canSendRequest(nodeId, now))             throw new IllegalStateException(Attempt to send a request to node  + nodeId +  which is not ready.);     }     AbstractRequest.Builder<?> builder = clientRequest.requestBuilder();     try {         NodeApiVersions versionInfo = apiVersions.get(nodeId);         short version;         // Note: if versionInfo is null, we have no server version information. This would be         // the case when sending the initial ApiVersionRequest which fetches the version         // information itself.  It is also the case when discoverBrokerVersions is set to false.         if (versionInfo == null) {             version = builder.latestAllowedVersion();             if (discoverBrokerVersions && log.isTraceEnabled())                 log.trace(No version information found when sending {} with correlation id {} to node {}.  +                         Assuming version {}., clientRequest.apiKey(), clientRequest.correlationId(), nodeId, version);         } else {             version = versionInfo.latestUsableVersion(clientRequest.apiKey(), builder.oldestAllowedVersion(),                     builder.latestAllowedVersion());         }         // The call to build may also throw UnsupportedVersionException, if there are essential         // fields that cannot be represented in the chosen version.         // 发送请求         doSend(clientRequest, isInternalRequest, now, builder.build(version));     } catch (UnsupportedVersionException unsupportedVersionException) {         // If the version is not supported, skip sending the request over the wire.         // Instead, simply add it to the local queue of aborted requests.         log.debug(Version mismatch when attempting to send {} with correlation id {} to {}, builder,                 clientRequest.correlationId(), clientRequest.destination(), unsupportedVersionException);         ClientResponse clientResponse = new ClientResponse(clientRequest.makeHeader(builder.latestAllowedVersion()),                 clientRequest.callback(), clientRequest.destination(), now, now,                 false, unsupportedVersionException, null, null);          if (!isInternalRequest)             abortedSends.add(clientResponse);         else if (clientRequest.apiKey() == ApiKeys.METADATA)             metadataUpdater.handleFailedRequest(now, Optional.of(unsupportedVersionException));     } } 

org.apache.kafka.clients.NetworkClient#doSend(org.apache.kafka.clients.ClientRequest, boolean, long, org.apache.kafka.common.requests.AbstractRequest)

private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now, AbstractRequest request) {     String destination = clientRequest.destination();     RequestHeader header = clientRequest.makeHeader(request.version());     if (log.isDebugEnabled()) {         log.debug(Sending {} request with header {} and timeout {} to node {}: {},             clientRequest.apiKey(), header, clientRequest.requestTimeoutMs(), destination, request);     }     Send send = request.toSend(header);     InFlightRequest inFlightRequest = new InFlightRequest(             clientRequest,             header,             isInternalRequest,             request,             send,             now);     // 添加请求到 inFlightRequests     this.inFlightRequests.add(inFlightRequest);     // 发送数据     selector.send(new NetworkSend(clientRequest.destination(), send)); } 

获取服务器端响应

client.poll(pollTimeout, currentTimeMs); 
/**  * Do actual reads and writes to sockets.  *  * @param timeout The maximum amount of time to wait (in ms) for responses if there are none immediately,  *                must be non-negative. The actual timeout will be the minimum of timeout, request timeout and  *                metadata timeout  * @param now The current time in milliseconds  * @return The list of responses received  */ @Override public List<ClientResponse> poll(long timeout, long now) {     ensureActive();      if (!abortedSends.isEmpty()) {         // If there are aborted sends because of unsupported version exceptions or disconnects,         // handle them immediately without waiting for Selector#poll.         List<ClientResponse> responses = new ArrayList<>();         handleAbortedSends(responses);         completeResponses(responses);         return responses;     }      long metadataTimeout = metadataUpdater.maybeUpdate(now);     try {         this.selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs));     } catch (IOException e) {         log.error(Unexpected error during I/O, e);     }      // 获取发送后的响应     // process completed actions     long updatedNow = this.time.milliseconds();     List<ClientResponse> responses = new ArrayList<>();     handleCompletedSends(responses, updatedNow);     handleCompletedReceives(responses, updatedNow);     handleDisconnections(responses, updatedNow);     handleConnections();     handleInitiateApiVersionRequests(updatedNow);     handleTimedOutConnections(responses, updatedNow);     handleTimedOutRequests(responses, updatedNow);     completeResponses(responses);      return responses; }