В предыдущих статьях я вкратце рассказал об Apache Kafka, некоторых нюансах его внутреннего устройства, а так же о способах развёртывания. В этой же статье я хочу поговорить об использовании стандартной клиентской библиотеки Kafka для отправки сообщений в проектах на платформе Java.
Зависимости
Для работы с Kafka в проекте на языке программирования для платформы Java нам потребуется клиент, который мы можем добавить в зависимости проекта:
1 2 3 4 5 |
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.9.0</version> </dependency> |
Настройка отправителя
Первым делом для отправки сообщений нужно сконфигурировать отправителя в проекте.
Минимальная настройка отправителя
Для минимальной настройки отправителя нужно указать адреса серверов для подключения и сериализаторы ключей и значений сообщений.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
class KafkaSender { public static void main(String[] args){ var properties = new Properties(); // Адреса для подключения properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // Сериализатор ключей properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // Сериализатор значений properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); try (var producer = new KafkaProducer<String, String>(producerProperties)) { // Здесь можно отправлять сообщения } } } |
Клас KafkaProducer
является типизированным, K
определяет тип ключей сообщений, а V
— их содержимого.
Клиент Kafka предоставляет следующие сериализаторы:
- BooleanSerializer
- ByteArraySerializer
- ByteBufferSerializer
- BytesSerializer
- DoubleSerializer
- FloatSerializer
- IntegerSerializer
- ListSerializer
- LongSerializer
- ShortSerializer
- StringSerializer
- UUIDSerializer
- VoidSerializer
Все сериализаторы сериализуют данные исходного типа в массив байтов. Вы можете создать и использовать собственный сериализатор, реализующий интерфейс Serializer
.
Настройка транзакционного отправителя
Для использования транзакций при отправке сообщений необходимо указать в настройках транзакционный идентификатор отправителя (transactional.id
):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
class KafkaSender { public static void main(String[] args){ var properties = new Properties(); properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // Транзакционный идентификатор properties.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, UUID.randomUUID().toString()); try (var producer = new KafkaProducer<String, String>(producerProperties)) { // Здесь можно отправлять сообщения } } } |
Транзакции могут использовать только идемпонентные отправители.
Настройка неидемпонентного отправителя
Начиная с Kafka 3.0.0 все отправители по умолчанию являются идемпонентными и ожидают подтверждения о доставки сообщений от всех реплик. Идемпонентные отправители гарантируют уникальность отправляемых сообщений в партиции в рамках сеанса работы отправителя.
Если вам не требуется идемпонентность, то её можно отключить параметром enable.idempotence=false
, количество подтверждений указать при помощи параметра acks
, который может принимать следующие значения:
all
— ожидание подтверждения доставки сообщения всем репликам (по умолчанию)0
— не ожидать подтверждений вообще1
— ожидать подтверждения от лидирующей реплики- Любое валидное количество реплик
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
class KafkaSender { public static void main(String[] args){ var properties = new Properties(); properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // Отключение ожидания подтверждения при отправке сообщений properties.setProperty(ProducerConfig.ACKS_CONFIG, "0"); // Отключение идемпотентности // Можно не делать явно, т.к. она отключается при acks отличном от all properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, Boolean.FALSE.toString()); try (var producer = new KafkaProducer<String, String>(producerProperties)) { // Здесь можно отправлять сообщения } } } |
Отправка сообщений
Для отправки сообщений в KafkaProducer существуют две версии метода send
:
Future<RecordMetadata> send(ProducerRecord<K, V> record)
Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback)
Отправка сообщений в Kafka асинхронная, как видно из Future
, возвращаемого методом send
.
Класс ProducerRecord
содержит свойства отправляемого сообщения:
topic
— название целевого топикаpartition
— целевая партиция топикаheaders
— заголовки сообщенияkey
— ключ сообщенияvalue
— само сообщениеtimestamp
— метка времени
Для удобства использования ProducerRecord
предоставляет несколько перегруженных конструкторов, минимальный из которых принимает только название топика (topic
) и сообщение (value
), максимальный — все указанные свойства.
Методу send
может быть передан экземпляр класса Callback
, который содержит единственный метод void onCompletion(RecordMetadata metadata, Exception exception)
, вызываемый по окончании отправки сообщения.
В случае успеха отправки вы получите экземпляр класса RecordMetadata
, который содержит метаданные добавленного сообщения:
offset
— смещение добавленного сообщенияtimestamp
— метка времени сообщенияserializedKeySize
— размер сериализованного ключа в байтахserializedValueSize
— размер сериализованного записанного сообщения в байтахtopicPartition
— партиция, в которую было записано сообщение
Получить экземпляр класса RecordMetadata
вы можете из возвращаемого Future
или в Callback
.
Давайте разберём несколько вариантов отправки сообщений:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
class KafkaSender { public static void main(String[] args){ try (var producer = getProducer()) { // Простейшая асинхронная отправка сообщения producer.send(new ProducerRecord<>("topic1", "Hello Kafka!")); // Отправка с ожиданием результата var recordMetadata = producer.send(new ProducerRecord<>("topic1", "Hello Kafka!")) .get(); // Асинхронная отправка с использованием callback producer.send(new ProducerRecord<>("topic1", "Hello Kafka!"), (recordMetadata, exception) -> { if (exception != null) { LOGGER.error(exception.getMessage(), exception); } else { // обработка успешного результата } }); // Отправка сообщения с точным указанием свойств producer.send( new ProducerRecord<>( // Топик "topic1", // Партиция 1, // Метка времени System.currentTimeMillis(), // Ключ "key1", // Содержимое "Hello Kafka!", // Заголовки List.of(new RecordHeader("foo", "bar".getBytes())) ) ); } catch(ExecutionException executionException) { } } } |
Если нам требуется отправить несколько сообщений и нужны гарантии доставки всех сообщений, то мы можем сделать это при помощи транзакций:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
public class KafkaSender { private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducerApplication.class); public static void main(String[] args) throws InterruptedException { try (var producer = getProducer()) { try { // Инициализация транзакции producer.initTransactions(); // Начало транзакции producer.beginTransaction(); producer.send(new ProducerRecord<>("topic1", "message1")); producer.send(new ProducerRecord<>("topic2", "message3")); producer.send(new ProducerRecord<>("topic3", "message4")); producer.send(new ProducerRecord<>("topic1", "message2")); // Фиксация транзакции producer.commitTransaction(); } catch (Exception exception) { LOGGER.error(exception.getMessage(), exception); // Откат транзакции в случае ошибки producer.abortTransaction(); } } } } |