privatelongsendProducerData(long now){ Cluster cluster = metadata.fetch(); //result.nextReadyCheckDelayMs表示下次检查是否ready的时间,也是//selecotr会阻塞的时间 RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now); if (!result.unknownLeaderTopics.isEmpty()) { for (String topic : result.unknownLeaderTopics) this.metadata.add(topic);
log.debug("Requesting metadata update due to unknown leader topics from the batched records: {}", result.unknownLeaderTopics); this.metadata.requestUpdate(); }
Iterator<Node> iter = result.readyNodes.iterator(); long notReadyTimeout = Long.MAX_VALUE; while (iter.hasNext()) { Node node = iter.next(); if (!this.client.ready(node, now)) { iter.remove(); notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now)); } }
// create produce requests Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now); addToInflightBatches(batches); if (guaranteeMessageOrder) { // Mute all the partitions drained for (List<ProducerBatch> batchList : batches.values()) { for (ProducerBatch batch : batchList) this.accumulator.mutePartition(batch.topicPartition); } }
long pollTimeout = sendProducerData(now); // poll最终会调用selector,pollTimeout也就是selector阻塞的时间 client.poll(pollTimeout, now);
selector
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/** * Check for data, waiting up to the given timeout. * * @param timeoutMs Length of time to wait, in milliseconds, which must be non-negative * @return The number of keys ready */ privateintselect(long timeoutMs)throws IOException { if (timeoutMs < 0L) thrownew IllegalArgumentException("timeout should be >= 0");
if (timeoutMs == 0L) returnthis.nioSelector.selectNow(); else returnthis.nioSelector.select(timeoutMs); }
public ReadyCheckResult ready(Cluster cluster, long nowMs){ Set<Node> readyNodes = new HashSet<>(); // 初始化为最大值 long nextReadyCheckDelayMs = Long.MAX_VALUE; Set<String> unknownLeaderTopics = new HashSet<>();
if (result.batchIsFull || result.newBatchCreated) { log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition); this.sender.wakeup(); }