Получение сообщений из Kafka на первый взгляд выглядит достаточно просто, но на деле обладает большим количеством нюансов, связанных с членством получателя в группе, её перебалансировкой, получением ранее отправленных сообщений, фиксацией смещений и семантиками доставки. Все эти вопросы я постарался осветить в этой статье.
Настройка получателя
Для получения сообщений из Kafka с использованием стандартного клиента для Java необходимо создать и настроить получателя. Классы, необходимые для получения сообщений также есть в библиотеке kafka-clients:
1 2 3 4 5 |
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.9.0</version> </dependency> |
Для получения сообщений используется класс KafkaConsumer<K, V>
. Данный класс является типизированным, K
определяет тип данных ключей сообщений, V
— тип данных содержимого сообщений. Параметры получателя определены в классе ConsumerConfig. Минимальные настройки получателя должны включать в себя адрес сервера для подключения и десериализаторы ключей и содержимого сообщений. Если получатель должен быть частью группы получателей и иметь возможность подписываться на топики, то в настройках получателя должен быть указан идентификатор группы получателей.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
public class KafkaConsumerApplication { public static void main(String[] args) { var properties = new Properties(); // Адрес сервера для подключения properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // Десериализатор ключей сообщений properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // Десериализатор содержимого сообщений properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // Идентификатор группы; не требуется, если получатель не является // частью группы и не использует подписку на топики properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "testing"); // Идентификатор участника группы; // нужен если требуется статическое членство участника в группе properties.setProperty(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "consumer-testing-0"); // Идентификатор клиента; нужен, если нужно сохранение смещений // при чтении сообщений вне группы properties.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "tester"); try(var consumer = new KafkaConsumer<>(props)) { // Работа с получателем } } } |
После настройки получателя его можно использовать для получения сообщений, подписываясь на топики или указывая вручную топики и партиции, из которых должны запрашиваться сообщения.
Подписка на топики
Подписка на топики возможна только для получателей, являющихся участниками группы получателей. При подписке на топики Kafka распределяет получателей между партициями топика таким образом, чтобы только один получатель в рамках группы получал сообщения из отдельно взятой партиции, но группа получателей получала сообщения из всех партиций. Таким образом Kafka пытается гарантировать доставку всех сообщений группе получателей, а так же уникальность сообщений в рамках группы. Несколько групп получателей, подписанные на один и тот же топик, получают одни и те же сообщения независимо друг от друга, так как Kafka сохраняет смещения (индексы прочитанных сообщений) для каждой группы индивидуально.
Если партиций больше, чем получателей, то некоторые получатели будут получать сообщения из нескольких партиций. Если получателей больше, чем партиций, то некоторые получатели будут бездействовать.
Для подписки на топики используется один из вариантов метода subsribe
:
void subscribe(Collection<String> topics)
void subscribe(Collection<String> topics, ConsumerRebalanceListener callback)
void subscribe(Pattern pattern)
void subscribe(Pattern pattern, ConsumerRebalanceListener callback)
При помощи коллекции строк (Collection<String> topics
) или регулярного выражения (Pattern pattern
) задаются названия топиков, на которые получатель должен быть подписан. Получатель может быть подписан на несколько топиков одновременно. Повторный вызов метода subsribe
отменяет предыдущие подписки. Вызов метода subscribe
с пустой коллекцией в качестве аргумента отменяет подписки и равносилен вызову метода unsubsribe
.
Интерфейс ConsumerRebalanceListener
объявляет три метода, которые будут вызываться при перебалансировке партиций между получателями:
void onPartitionsRevoked(Collection<TopicPartition> partitions)
вызывается, когда у получателя забираются партицииvoid onPartitionsAssigned(Collection<TopicPartition> partitions)
вызывается, когда получателю назначаются партицииvoid onPartitionsLost(Collection<TopicPartition> partitions)
вызывается в исключительных ситуациях, когда партиции становятся недоступны получателю, по умолчанию вызов передаётся методуonPartitionsRevoked
.
Несколько примеров подписки на топики:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
public class KafkaConsumerApplication { public static void main(String[] args) { try(var consumer = getConsumer()) { // Подписка на топик topic1 consumer.subscribe(List.of("topic1")); // Подписка на топики topic2 и topic3 // Получатель может быть подписан на несколько топиков // Повторный вызов subscribe отменяет предыдущие подписки consumer.subscribe(List.of("topic2", "topic3")); // Подписка на все топики, название которых начинается с topic[0-9] consumer.subscribe(Pattern.compile("^topic\\d")); // Подписка на топик topic1 с обработчиком событий соединений consumer.subscribe(List.of("topic1"), new MyConsumerRebalanceListener()); } } } class MyConsumerRebalanceListener implements ConsumerRebalanceListener { private static final Logger LOGGER = LoggerFactory.getLogger(MyConsumerRebalanceListener.class); @Override public void onPartitionsRevoked( Collection<TopicPartition> partitions) { LOGGER.info("Revoking partitions {}", partitions); } @Override public void onPartitionsAssigned( Collection<TopicPartition> partitions) { LOGGER.info("Assigning partitions {}", partitions); } } |
Стоит помнить, что появление нового подписчика в группе приводит к её перебалансировке — перераспределению партиций между подписчиками. Это весьма ресурсоёмкая операция, особенно для высоко нагруженных топиков. Использование статического членства участников в группе может снизить количество перебалансировок, либо и вовсе их исключить при перезапуске сервисов-получателей.
Статическое и динамическое членство в группе
Если в настройках получателя не указан параметр group.instance.id
(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG
), то для получателя в данном случае применяется динамическое членство в группе, и идентификатор участника группы генерируется автоматически. Появление участника с новым идентификатором в свою очередь приводит к перебалансировке группы.
Для статического членства получателя в группе нужно указать значение параметра group.instance.id
, при этом оно должно быть уникальным в рамках группы. Статическое членство не исключает перебалансировок группы, но снижает их количество при обычном перезапуске сервиса-получателя.
Самостоятельное назначение топиков и партиций
Получатели могут самостоятельно назначаться на получение сообщений из партиций топиков. Если идентификатор клиента (client.id
или ConsumerConfig.CLIENT_ID_CONFIG
) не указан в конфигурации, то Kafka не будет сохранять смещение для клиента.
Для самостоятельного назначения топиков и партиций получателю используется метод void assign(Collection<TopicPartition> partitions)
. Конструктор класса TopicPartition
принимает два аргумента:
String topic
— название топикаint partition
— номер партиции
Повторный вызов метода assign
отменяет предыдущие назначения. Вызов метода с пустой коллекцией в качестве аргумента отменяет назначения и равносилен вызову метода unassign
.
Несколько примеров назначения топиков и партиций:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
public class KafkaConsumerApplication { public static void main(String[] args) { try(var consumer = getConsumer()) { // Назначение партиции 0 топика topic1 consumer.assign(List.of(new TopicPartition("topic1", 0))); // Назначение партиции 0 топика topic1 // и партиции 1 топика topic2 // Этот вызов отменяет предыдущий consumer.assign(List.of( new TopicPartition("topic1", 0), new TopicPartition("topic2", 1) )); // Получение списка партиций для топика topic1 var partitions = consumer.partitionsFor("topic1").stream() .map(info -> new TopicPartition(info.topic(), info.partition())) .toList(); // Подписка на все партиции топика topic1 consumer.assign(partitions); } } } |
Методы и assign
и subscribe
не могут быть использованы совместно в рамках сеанса работы получателя, но могут быть вызваны последовательно, после отмены предыдущего назначения или подписки.
Получение сообщений
Для получения сообщений используется метод ConsumerRecords<K, V> poll(Duration timeout)
. В качестве аргумента указывается промежуток времени, который получатель ожидает новых сообщений.
Экземпляр возвращаемого класса ConsumerRecords
содержит свойство Map<TopicPartition, List<ConsumerRecord<K, V>>> record
, в котором передаются полученные сообщения. Ключ указывает на партицию, из которой были получены сообщения, а значение содержит непосредственно полученные сообщения. ConsumerRecords
реализует Iterable
, следовательно, вы можете итеративно обходить полученный результат.
Класс ConsumerRecord
содержит следующие свойства:
String topic
— название топикаint partition
— номер партицииlong offset
— смещение или порядковый номер сообщенияlong timestamp
— метка времени, когда было создано сообщениеTimestampType timestampType
— тип метки времениint serializedKeySize
— размер сериализованного ключа в байтахint serializedValueSize
— размер сериализованных данных сообщения в байтахHeaders headers
— заголовки сообщенияK key
— ключ сообщенияV value
— данные сообщенияOptional<Integer> leaderEpoch
— счётчик лидера
Пример получения сообщений с подпиской на топик topic1
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
public class KafkaConsumerApplication { private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumerApplication.class); public static void main(String[] args) { try(var consumer = getConsumer()) { // Подписка на топик topic1 consumer.subscribe(List.of("topic1")); for (var i = 0; i < 10; i++) { // Запрос новых сообщений с ожиданием в 1 секунду var records = consumer.poll(Duration.ofSeconds(1)); records.forEach(record -> { // Обработка сообщений LOGGER.info("Got message from {}({}): {}", record.topic(), record.partition(), record.value()); // Будет выведено что-то вроде // Got message from topic1(0): Hello Kafka! }); } // Отписка от топика, не обязательно consumer.unsubscribe(); } } } |
Получение сообщение с самостоятельным назначением партиций получателю отличается лишь вызовом метода assign
вместо subscribe
.
Получение ранее отправленных сообщений
Если вы попробуете выполнить приведённый выше код, то, наверняка, заметите, что получатель получает только новые сообщения, которые появляются после первого запуска получателя. Так получается из-за того, что при создании группы получателей или получателя в качестве исходного смещения используется порядковый номер последнего добавленного в партицию сообщения, и получатели получают сообщения, которые создаются уже после него.
Если вам требуется получить все сообщения, которые были созданы до появления группы получателей или получателя, то в параметрах получателя вы можете указать параметр auto.offset.reset=earliest
(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG
). Так же вы можете самостоятельно указывать смещение, относительно которого требуется получение новых сообщений, при помощи одного из четырёх методов:
void seek(TopicPartition partition, long offset)
— поиск конкретного смещенияvoid seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata)
— поиск конкретного смещенияvoid seekToBeginning(Collection<TopicPartition> partitions)
— поиск начала указанных партицийvoid seekToEnd(Collection<TopicPartition> partitions)
— поиск конца указанных партиций
Если вы не знаете конкретного смещения, то вы можете найти его по метке времени при помощи метода offsetsForTimes
, который принимает аргументы:
Map<TopicPartition, Long> timestampsToSearch
— указание меток времени, относительно которых нужно искать смещения для указанных партицийDuration duration
— продолжительность ожидания получения результата
Обратите внимание на то, что поиск смещения — ресурсоёмкая задача.
Несколько примеров поиска смещений:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
public class KafkaConsumerApplication { public static void main(String[] args) { try (var consumer = getConsumer()) { consumer.assign(List.of(new TopicPartition("topic1", 0))); // Поиск начала всех партиций, на которые подписан получатель consumer.seekToBeginning(consumer.assignment()); // Поиск смещения 6 партиции 0 топика topic1 consumer.seek(new TopicPartition("topic1", 0), 6); // Поиск конца партиции 0 топика topic1 consumer.seekToEnd(List.of(new TopicPartition("topic1", 0))); // Поиск смещений всех партиций, которые были актуальны 24 часа назад var yesterday = ZonedDateTime.now().minusDays(1).toInstant().toEpochMilli(); var offsets = consumer.offsetsForTimes( consumer.assignment().stream() .collect(Collectors.toMap(Function.identity(), ignored -> yesterday)) ); offsets.forEach((topicPartition, offsetAndTimestamp) -> consumer.seek(topicPartition, offsetAndTimestamp.offset())); } } } |
Если вы используете подписку на топики, то стоит помнить, что при динамическом членстве на перебалансировку группы требуется время, поэтому поиск нужного смещения нужно использовать в методе onPartitionsAssigned
собственной реализации ConsumerRebalanceListener
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
public class KafkaConsumerApplication { public static void main(String[] args) { try (var consumer = getConsumer()) { consumer.subscribe(List.of("topic1"), new MyConsumerRebalanceListener(consumer)); while (notFinished) { var records = consumer.poll(Duration.ofSeconds(1)); // Обработка сообщений } } } } class MyConsumerRebalanceListener implements ConsumerRebalanceListener { private final KafkaConsumer<String, String> consumer; public MyConsumerRebalanceListener(KafkaConsumer<String, String> consumer) { this.consumer = consumer; } @Override public void onPartitionsRevoked( Collection<TopicPartition> partitions) { } @Override public void onPartitionsAssigned( Collection<TopicPartition> partitions) { // Поиск начала назначенных партиций consumer.seekToBeginning(partitions); } } |
Фиксация смещения
По умолчанию отправитель на фоне периодически фиксирует смещения прочитанных сообщений, но такое поведение потенциально может быть причиной потери данных. Представьте себе ситуацию, когда метод poll
вернул 200 сообщений (по умолчанию он может вернуть до 500 сообщений включительно), получатель обработал 70 сообщений, на фоне произошла фиксация максимального смещения, а после этого в логике получателя произошла ошибка, из-за чего оставшиеся 130 сообщений не обработались. Из-за того, что максимальное смещение уже зафиксировано для текущего получателя, он эти сообщения уже не получит и не обработает (если принудительно не вернуть смещение до сообщения, после обработки которого возникла ошибка, но подобные действия в условиях реальной эксплуатации весьма трудоёмки).
Решается данная проблема ручной фиксацией смещения. Для этого нужно отключить автоматическую фиксацию при помощи параметра enable.auto.commit=false
и использовать метод commitSync
или commitAsync
.
Метод commitSync
фиксирует смещения в блокирующем стиле и имеет четыре формы:
void commitSync()
— для фиксации смещений сообщений, полученных при последнем вызове методаpoll
с неограниченным временем ожидания фиксацииvoid commitSync(Duration timeout)
— для фиксации смещений сообщений, полученных при последнем вызове методаpoll
с ограничением времени ожидания фиксацииvoid commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets)
— для фиксации конкретных смещений для каждой партицииvoid commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets, final Duration timeout)
— для фиксации конкретных смещений для каждой партиции с ограничением времени ожидания фиксации
Метод commitAsync
фиксирует смещения в асинхронном стиле и имеет три формы:
void commitAsync()
— для фиксации смещений сообщений, полученных при последнем вызове методаpoll
void commitAsync(OffsetCommitCallback callback)
— для фиксации смещений сообщений, полученных при последнем вызове методаpoll
, методonComplete
экземпляраOffsetCommitCallback
будет вызван по окончании фиксацииvoid commitAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback)
— для фиксации конкретных смещений для каждой партиции с использованием коллбэка.
Интерфейс OffsetCommitCallback
имеет следующую сигнатуру:
1 2 3 4 5 |
public interface OffsetCommitCallback { void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception); } |
Несколько примеров ручной фиксации смещений:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
public class KafkaConsumerApplication { private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumerApplication.class); public static void main(String[] args) { var properties = new Properties(); properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "testing"); properties.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "tester"); // Отключение автоматический фиксации properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, Boolean.FALSE.toString()); try(var consumer = new KafkaConsumer<>(props)) { consumer.subsribe(List.of("topic1")); // Синхронная фиксация смещений полученных сообщений var records = consumer.poll(Duration.ofSeconds()); /* обработка сообщений */ consumer.commitSync(); // Асинхронная фиксация смещений полученных сообщений // с коллбэком records = consumer.poll(Duration.ofSeconds()); /* обработка сообщений */ consumer.commitAsync((offsets, exception) -> { if (exception != null) { LOGGER.error("Couldn't commit offsets!", exception); } }); // Асинхронная фиксация конкретных смещений records = consumer.poll(Duration.ofSeconds()); records.forEach(record -> { /* Обработка сообщения */ consumer.commitAsync( Map.of( new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1) ), (offsets, exception) -> { if (exception != null) { LOGGER.error("Couldn't commit offsets", exception); } }); }); } } } |
Семантики доставки
При получении сообщений и ручной фиксации смещений нам доступно две семантики:
- At least once — гарантия получения сообщения как минимум один раз
- At most once — гарантия получения сообщения максимум один раз
At least once
В случе семантики «at least once» гарантируется получение сообщения получателем минимум один раз. С одной стороны гарантируется отсутствие потерь информации, с другой — не гарантируется повторное получение одного и того же сообщения, что при повторяющемся исключении может привести к зацикливанию обработки на одном сообщении. Сценарий работы получателя следующий:
- Получить сообщения
- Обработать сообщения
- Зафиксировать новые смещения
Если новые смещения фиксируются после обработки всех сообщений, то возникновение исключения при обработке одного из сообщений может приводить к повторному получению сообщений, обработка которых не вызывает проблем.
Код, демонстрирующий этот сценарий:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
public class KafkaConsumerApplication { public static void main(String[] args) { try(var consumer = getConsumer()) { consumer.subsribe(List.of("topic1")); var records = consumer.poll(Duration.ofSeconds()); /* Обработка сообщений. 1 сообщение обработано нормально. 2 сообщение обработано нормально. ... при обработке сообщения n возникает исключение смещения не фиксируются и сообщения 1..n будут получены заново */ consumer.commitSync(); } } } |
Проблему повторного получения успешно обрабатываемых сообщений можно решить фиксируя новое смещение после обработки каждого сообщения, но это не решает проблему зацикливания получения сообщения, приводящего к исключению.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
public class KafkaConsumerApplication { public static void main(String[] args) { try(var consumer = getConsumer()) { consumer.subsribe(List.of("topic1")); var records = consumer.poll(Duration.ofSeconds()); records.forEach(record -> { /* Обработка сообщения */ consumer.commitAsync( Map.of( new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1) ), (offsets, exception) -> {} ); }); } } } |
At most once
В случае семантики «at most once» гарантируется получение сообщения получателем максимум один раз. С одной стороны гарантируется отсутствие повторного получения сообщений, с другой — не гарантируется отсутствие потерь информации. Сценарий работы получателя в данной ситуации следующий:
- Получить сообщения
- Зафиксировать новые смещения
- Обработать сообщения
Если при обработке сообщений возникнет исключение, часть данных может быть потеряна. Код, демонстрирующий этот сценарий:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
public class KafkaConsumerApplication { public static void main(String[] args) { try(var consumer = getConsumer()) { consumer.subsribe(List.of("topic1")); var records = consumer.poll(Duration.ofSeconds()); consumer.commitSync(); /* Обработка сообщений. 1 сообщение обработано нормально. 2 сообщение обработано нормально. ... при обработке сообщения n возникает исключение смещения уже зафиксированы, поэтому сообщения после n будут потеряны */ } } } |
Поэтому нужно искать золотую середину при выборе семантики.