try { while(running) { ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE); for (TopicPartition partition : records.partitions()) { List<ConsumerRecord<String, String>> partitionRecords = records.records(partition); for (ConsumerRecord<String, String> record : partitionRecords) { System.out.println(record.offset() + ": " + record.value()); } long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); //此处为offset+1 consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1))); } } } finally { consumer.close(); }
The committed offset should always be the offset of the next message that your application will read. Thus, when calling commitSync(offsets) you should add one to the offset of the last message processed
//可以看出allConsumed是从state.value().position中获取相应partition的offset public Map<TopicPartition, OffsetAndMetadata> allConsumed(){ Map<TopicPartition, OffsetAndMetadata> allConsumed = new HashMap<>(); for (PartitionStates.PartitionState<TopicPartitionState> state : assignment.partitionStates()) { if (state.value().hasValidPosition()) allConsumed.put(state.topicPartition(), new OffsetAndMetadata(state.value().position)); } return allConsumed; }
final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(timer); | V final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords(); | V List<ConsumerRecord<K, V>> records = fetchRecords(nextInLineRecords, recordsRemaining); | V long nextOffset = partitionRecords.nextFetchOffset; log.trace("Returning fetched records at offset {} for assigned partition {} and update " + "position to {}", position, partitionRecords.partition, nextOffset); //这里就是所谓的offset+1,也就是开头问题的答案! subscriptions.position(partitionRecords.partition, nextOffset);