Отправка сообщений в Kafka

В предыдущих статьях я вкратце рассказал об Apache Kafka, некоторых нюансах его внутреннего устройства, а так же о способах развёртывания. В этой же статье я хочу поговорить об использовании стандартной клиентской библиотеки Kafka для отправки сообщений в проектах на платформе Java.

Зависимости

Для работы с Kafka в проекте на языке программирования для платформы Java нам потребуется клиент, который мы можем добавить в зависимости проекта:

Настройка отправителя

Первым делом для отправки сообщений нужно сконфигурировать отправителя в проекте.

Минимальная настройка отправителя

Для минимальной настройки отправителя нужно указать адреса серверов для подключения и сериализаторы ключей и значений сообщений.

Клас KafkaProducer является типизированным, K определяет тип ключей сообщений, а V — их содержимого.

Клиент Kafka предоставляет следующие сериализаторы:

  • BooleanSerializer
  • ByteArraySerializer
  • ByteBufferSerializer
  • BytesSerializer
  • DoubleSerializer
  • FloatSerializer
  • IntegerSerializer
  • ListSerializer
  • LongSerializer
  • ShortSerializer
  • StringSerializer
  • UUIDSerializer
  • VoidSerializer

Все сериализаторы сериализуют данные исходного типа в массив байтов. Вы можете создать и использовать собственный сериализатор, реализующий интерфейс Serializer.

Настройка транзакционного отправителя

Для использования транзакций при отправке сообщений необходимо указать в настройках транзакционный идентификатор отправителя (transactional.id):

Транзакции могут использовать только идемпонентные отправители.

Настройка неидемпонентного отправителя

Начиная с Kafka 3.0.0 все отправители по умолчанию являются идемпонентными и ожидают подтверждения о доставки сообщений от всех реплик. Идемпонентные отправители гарантируют уникальность отправляемых сообщений в партиции в рамках сеанса работы отправителя.

Если вам не требуется идемпонентность, то её можно отключить параметром enable.idempotence=false, количество подтверждений указать при помощи параметра acks, который может принимать следующие значения:

  • all — ожидание подтверждения доставки сообщения всем репликам (по умолчанию)
  • 0 — не ожидать подтверждений вообще
  • 1 — ожидать подтверждения от лидирующей реплики
  • Любое валидное количество реплик

Отправка сообщений

Для отправки сообщений в KafkaProducer существуют две версии метода send:

  • Future<RecordMetadata> send(ProducerRecord<K, V> record)
  • Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback)

Отправка сообщений в Kafka асинхронная, как видно из Future, возвращаемого методом send.

Класс ProducerRecord содержит свойства отправляемого сообщения:

  • topic — название целевого топика
  • partition — целевая партиция топика
  • headers — заголовки сообщения
  • key — ключ сообщения
  • value — само сообщение
  • timestamp — метка времени

Для удобства использования ProducerRecord предоставляет несколько перегруженных конструкторов, минимальный из которых принимает только название топика (topic) и сообщение (value), максимальный — все указанные свойства.

Методу send может быть передан экземпляр класса Callback, который содержит единственный метод void onCompletion(RecordMetadata metadata, Exception exception), вызываемый по окончании отправки сообщения.

В случае успеха отправки вы получите экземпляр класса RecordMetadata, который содержит метаданные добавленного сообщения:

  • offset — смещение добавленного сообщения
  • timestamp — метка времени сообщения
  • serializedKeySize — размер сериализованного ключа в байтах
  • serializedValueSize — размер сериализованного записанного сообщения в байтах
  • topicPartition — партиция, в которую было записано сообщение

Получить экземпляр класса RecordMetadata вы можете из возвращаемого Future или в Callback.

Давайте разберём несколько вариантов отправки сообщений:

Если нам требуется отправить несколько сообщений и нужны гарантии доставки всех сообщений, то мы можем сделать это при помощи транзакций: