Получение сообщений из Kafka

Получение сообщений из Kafka на первый взгляд выглядит достаточно просто, но на деле обладает большим количеством нюансов, связанных с членством получателя в группе, её перебалансировкой, получением ранее отправленных сообщений, фиксацией смещений и семантиками доставки. Все эти вопросы я постарался осветить в этой статье.

Настройка получателя

Для получения сообщений из Kafka с использованием стандартного клиента для Java необходимо создать и настроить получателя. Классы, необходимые для получения сообщений также есть в библиотеке kafka-clients:

Для получения сообщений используется класс KafkaConsumer<K, V>. Данный класс является типизированным, K определяет тип данных ключей сообщений, V — тип данных содержимого сообщений. Параметры получателя определены в классе ConsumerConfig. Минимальные настройки получателя должны включать в себя адрес сервера для подключения и десериализаторы ключей и содержимого сообщений. Если получатель должен быть частью группы получателей и иметь возможность подписываться на топики, то в настройках получателя должен быть указан идентификатор группы получателей.

После настройки получателя его можно использовать для получения сообщений, подписываясь на топики или указывая вручную топики и партиции, из которых должны запрашиваться сообщения.

Подписка на топики

Подписка на топики возможна только для получателей, являющихся участниками группы получателей. При подписке на топики Kafka распределяет получателей между партициями топика таким образом, чтобы только один получатель в рамках группы получал сообщения из отдельно взятой партиции, но группа получателей получала сообщения из всех партиций. Таким образом Kafka пытается гарантировать доставку всех сообщений группе получателей, а так же уникальность сообщений в рамках группы. Несколько групп получателей, подписанные на один и тот же топик, получают одни и те же сообщения независимо друг от друга, так как Kafka сохраняет смещения (индексы прочитанных сообщений) для каждой группы индивидуально.

Если партиций больше, чем получателей, то некоторые получатели будут получать сообщения из нескольких партиций. Если получателей больше, чем партиций, то некоторые получатели будут бездействовать.

Для подписки на топики используется один из вариантов метода subsribe:

  • void subscribe(Collection<String> topics)
  • void subscribe(Collection<String> topics, ConsumerRebalanceListener callback)
  • void subscribe(Pattern pattern)
  • void subscribe(Pattern pattern, ConsumerRebalanceListener callback)

При помощи коллекции строк (Collection<String> topics) или регулярного выражения (Pattern pattern) задаются названия топиков, на которые получатель должен быть подписан. Получатель может быть подписан на несколько топиков одновременно. Повторный вызов метода subsribe отменяет предыдущие подписки. Вызов метода subscribe с пустой коллекцией в качестве аргумента отменяет подписки и равносилен вызову метода unsubsribe.

Интерфейс ConsumerRebalanceListener объявляет три метода, которые будут вызываться при перебалансировке партиций между получателями:

  • void onPartitionsRevoked(Collection<TopicPartition> partitions) вызывается, когда у получателя забираются партиции
  • void onPartitionsAssigned(Collection<TopicPartition> partitions) вызывается, когда получателю назначаются партиции
  • void onPartitionsLost(Collection<TopicPartition> partitions) вызывается в исключительных ситуациях, когда партиции становятся недоступны получателю, по умолчанию вызов передаётся методу onPartitionsRevoked.

Несколько примеров подписки на топики:

Стоит помнить, что появление нового подписчика в группе приводит к её перебалансировке — перераспределению партиций между подписчиками. Это весьма ресурсоёмкая операция, особенно для высоко нагруженных топиков. Использование статического членства участников в группе может снизить количество перебалансировок, либо и вовсе их исключить при перезапуске сервисов-получателей.

Статическое и динамическое членство в группе

Если в настройках получателя не указан параметр group.instance.id (ConsumerConfig.GROUP_INSTANCE_ID_CONFIG), то для получателя в данном случае применяется динамическое членство в группе, и идентификатор участника группы генерируется автоматически. Появление участника с новым идентификатором в свою очередь приводит к перебалансировке группы.

Для статического членства получателя в группе нужно указать значение параметра group.instance.id, при этом оно должно быть уникальным в рамках группы. Статическое членство не исключает перебалансировок группы, но снижает их количество при обычном перезапуске сервиса-получателя.

Самостоятельное назначение топиков и партиций

Получатели могут самостоятельно назначаться на получение сообщений из партиций топиков. Если идентификатор клиента (client.id или ConsumerConfig.CLIENT_ID_CONFIG) не указан в конфигурации, то Kafka не будет сохранять смещение для клиента.

Для самостоятельного назначения топиков и партиций получателю используется метод void assign(Collection<TopicPartition> partitions). Конструктор класса TopicPartition принимает два аргумента:

  • String topic — название топика
  • int partition — номер партиции

Повторный вызов метода assign отменяет предыдущие назначения. Вызов метода с пустой коллекцией в качестве аргумента отменяет назначения и равносилен вызову метода unassign.

Несколько примеров назначения топиков и партиций:

Методы и assign и subscribe не могут быть использованы совместно в рамках сеанса работы получателя, но могут быть вызваны последовательно, после отмены предыдущего назначения или подписки.

Получение сообщений

Для получения сообщений используется метод ConsumerRecords<K, V> poll(Duration timeout). В качестве аргумента указывается промежуток времени, который получатель ожидает новых сообщений.

Экземпляр возвращаемого класса ConsumerRecords содержит свойство Map<TopicPartition, List<ConsumerRecord<K, V>>> record, в котором передаются полученные сообщения. Ключ указывает на партицию, из которой были получены сообщения, а значение содержит непосредственно полученные сообщения. ConsumerRecords реализует Iterable, следовательно, вы можете итеративно обходить полученный результат.

Класс ConsumerRecord содержит следующие свойства:

  • String topic — название топика
  • int partition — номер партиции
  • long offset — смещение или порядковый номер сообщения
  • long timestamp — метка времени, когда было создано сообщение
  • TimestampType timestampType — тип метки времени
  • int serializedKeySize — размер сериализованного ключа в байтах
  • int serializedValueSize — размер сериализованных данных сообщения в байтах
  • Headers headers — заголовки сообщения
  • K key — ключ сообщения
  • V value — данные сообщения
  • Optional<Integer> leaderEpoch — счётчик лидера

Пример получения сообщений с подпиской на топик topic1:

Получение сообщение с самостоятельным назначением партиций получателю отличается лишь вызовом метода assign вместо subscribe.

Получение ранее отправленных сообщений

Если вы попробуете выполнить приведённый выше код, то, наверняка, заметите, что получатель получает только новые сообщения, которые появляются после первого запуска получателя. Так получается из-за того, что при создании группы получателей или получателя в качестве исходного смещения используется порядковый номер последнего добавленного в партицию сообщения, и получатели получают сообщения, которые создаются уже после него.

Если вам требуется получить все сообщения, которые были созданы до появления группы получателей или получателя, то в параметрах получателя вы можете указать параметр auto.offset.reset=earliest (ConsumerConfig.AUTO_OFFSET_RESET_CONFIG). Так же вы можете самостоятельно указывать смещение, относительно которого требуется получение новых сообщений, при помощи одного из четырёх методов:

  • void seek(TopicPartition partition, long offset) — поиск конкретного смещения
  • void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata) — поиск конкретного смещения
  • void seekToBeginning(Collection<TopicPartition> partitions) — поиск начала указанных партиций
  • void seekToEnd(Collection<TopicPartition> partitions) — поиск конца указанных партиций

Если вы не знаете конкретного смещения, то вы можете найти его по метке времени при помощи метода offsetsForTimes, который принимает аргументы:

  • Map<TopicPartition, Long> timestampsToSearch — указание меток времени, относительно которых нужно искать смещения для указанных партиций
  • Duration duration — продолжительность ожидания получения результата

Обратите внимание на то, что поиск смещения — ресурсоёмкая задача.

Несколько примеров поиска смещений:

Если вы используете подписку на топики, то стоит помнить, что при динамическом членстве на перебалансировку группы требуется время, поэтому поиск нужного смещения нужно использовать в методе onPartitionsAssigned собственной реализации ConsumerRebalanceListener:

Фиксация смещения

По умолчанию отправитель на фоне периодически фиксирует смещения прочитанных сообщений, но такое поведение потенциально может быть причиной потери данных. Представьте себе ситуацию, когда метод poll вернул 200 сообщений (по умолчанию он может вернуть до 500 сообщений включительно), получатель обработал 70 сообщений, на фоне произошла фиксация максимального смещения, а после этого в логике получателя произошла ошибка, из-за чего оставшиеся 130 сообщений не обработались. Из-за того, что максимальное смещение уже зафиксировано для текущего получателя, он эти сообщения уже не получит и не обработает (если принудительно не вернуть смещение до сообщения, после обработки которого возникла ошибка, но подобные действия в условиях реальной эксплуатации весьма трудоёмки).

Решается данная проблема ручной фиксацией смещения. Для этого нужно отключить автоматическую фиксацию при помощи параметра enable.auto.commit=false и использовать метод commitSync или commitAsync.

Метод commitSync фиксирует смещения в блокирующем стиле и имеет четыре формы:

  • void commitSync() — для фиксации смещений сообщений, полученных при последнем вызове метода poll с неограниченным временем ожидания фиксации
  • void commitSync(Duration timeout) — для фиксации смещений сообщений, полученных при последнем вызове метода poll с ограничением времени ожидания фиксации
  • void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets) — для фиксации конкретных смещений для каждой партиции
  • void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets, final Duration timeout) — для фиксации конкретных смещений для каждой партиции с ограничением времени ожидания фиксации

Метод commitAsync фиксирует смещения в асинхронном стиле и имеет три формы:

  • void commitAsync() — для фиксации смещений сообщений, полученных при последнем вызове метода poll
  • void commitAsync(OffsetCommitCallback callback) — для фиксации смещений сообщений, полученных при последнем вызове метода poll, метод onComplete экземпляра OffsetCommitCallback будет вызван по окончании фиксации
  • void commitAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) — для фиксации конкретных смещений для каждой партиции с использованием коллбэка.

Интерфейс OffsetCommitCallback имеет следующую сигнатуру:

Несколько примеров ручной фиксации смещений:

Семантики доставки

При получении сообщений и ручной фиксации смещений нам доступно две семантики:

  • At least once — гарантия получения сообщения как минимум один раз
  • At most once — гарантия получения сообщения максимум один раз

At least once

В случе семантики «at least once» гарантируется получение сообщения получателем минимум один раз. С одной стороны гарантируется отсутствие потерь информации, с другой — не гарантируется повторное получение одного и того же сообщения, что при повторяющемся исключении может привести к зацикливанию обработки на одном сообщении. Сценарий работы получателя следующий:

  1. Получить сообщения
  2. Обработать сообщения
  3. Зафиксировать новые смещения

Если новые смещения фиксируются после обработки всех сообщений, то возникновение исключения при обработке одного из сообщений может приводить к повторному получению сообщений, обработка которых не вызывает проблем.

Код, демонстрирующий этот сценарий:

Проблему повторного получения успешно обрабатываемых сообщений можно решить фиксируя новое смещение после обработки каждого сообщения, но это не решает проблему зацикливания получения сообщения, приводящего к исключению.

At most once

В случае семантики «at most once» гарантируется получение сообщения получателем максимум один раз. С одной стороны гарантируется отсутствие повторного получения сообщений, с другой — не гарантируется отсутствие потерь информации. Сценарий работы получателя в данной ситуации следующий:

  1. Получить сообщения
  2. Зафиксировать новые смещения
  3. Обработать сообщения

Если при обработке сообщений возникнет исключение, часть данных может быть потеряна. Код, демонстрирующий этот сценарий:

Поэтому нужно искать золотую середину при выборе семантики.