Apache Kafka является одной из наиболее популярных систем для обмена сообщениями в распределённых информационных системах. В этой статье рассматривается базовое устройство Kafka, терминология, установка и запуск для локальной разработки, а так же выполнение базовых операций из командной строки и из приложения на языке программирования Java.
Что такое Kafka?
Apache Kafka — это система стриминга событий (или сообщений). В основе процесса обмена сообщениями в Kafka лежит стрим или поток данных — абстрактная упорядоченная структура данных, которая предусматривает только добавление новых элементов в её конец.
В отличие от очередей сообщений, чтение сообщений из стрима не приводит к их удалению, таким образом Apache Kafka, как и другие стриминговые системы, предполагает накопление сообщений для дальнейшего предоставления их всем заинтересованным получателям. Сообщения могут храниться в системе, сколько необходимо, хоть бесконечно долго. В целом такое поведение больше соответствует системам накопления логов вроде Grafana Loki, Logstash и других, нежели очередям сообщений.
В Apache Kafka стрим называется топиком и может быть разделён на несколько партиций для повышения производительности системы. Отправитель, создатель или продюсер (producer) — это приложение, которое создаёт новые сообщения в топиках. Отправитель может указать ключ и значение сообщения. Ключ используется для определения партиции, в которую будет добавлено сообщение, если отправитель не указал её явно. Все сообщения с одним ключом попадают в одну партицию. Если отправитель при добавлении нового сообщения не указывает партицию или ключ, то сообщения равномерно распределяются между партициями.
Ключи и значения Kafka хранит в виде последовательности байтов, что даёт большую гибкость при выборе типов данных, которые могут быть использованы в клиентах.
Получатель подписывается на топик и периодически запрашивает новые сообщения. Apache Kafka запоминает, на каком элементе получатель закончил чтение из топика, и при следующем запросе он получит следующие непрочитанные сообщения, даже если будет перезапущен.
Получатели и отправители не зависят друг от друга: каждая группа получателей получает все сообщения, вне зависимости от получения сообщения другими группами, а отправители могут параллельно отправлять сообщения в одни и те же топики и не ждут, когда сообщение будет прочитано получателями.
Установка и запуск
Apache Kafka, как и многие другие инструменты, может быть установлена локально или развёрнута в контейнере, например в Docker.
Локальная установка
Для локальной установки вам потребуется JRE 8+, но желательно использовать JRE 17+, так как с релизом Apache Kafka 4.0 поддержка JRE 8 и 11 будет прекращена.
Для локальной установки вам потребуется скачать дистрибутив, выбирайте вариант с любой версией Scala, в данный момент это непринципиально. Дистрибутив помимо Kafka содержит Apache ZooKeeper — сервис для хранения метаданных и конфигов сервисов, который используется в качестве реестра сервисов при развёртывании кластера Kafka. Вместо ZooKeeper может быть использован KRaft — Apache Kafka Raft, собственный протокол для обмена метаданными в кластере, который в итоге должен убрать зависимость Kafka от ZooKeeper. Вы можете выбрать, как будет запускать Kafka — с KRaft или с ZooKeeper.
Запуск с KRaft
Перейдите в терминале в директорию с распакованным дистрибутивом Kafka и выполняйте следующие действия.
1 2 3 4 5 6 7 8 |
# Для запуска с KRaft нужно сгенерировать идентификатор кластера $ KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)" # Далее нужно форматировать директорию логов $ bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties # Теперь можно запускать сам сервис: $ bin/kafka-server-start.sh config/kraft/server.properties |
Запуск с ZooKeeper
Для запуска Kafka с ZooKeeper нужно так же вы полнить команды в директории с дистрибутивом сервиса:
1 2 3 4 5 |
# Для запуска с Apache ZooKeeper нужно сначала запустить ZooKeeper $ bin/zookeeper-server-start.sh config/zookeeper.properties # Теперь можно запускать Kafka $ bin/kafka-server-start.sh config/server.properties |
Обратите внимание на то, что при запуске с KRaft и ZooKeeper используются разные файлы свойств для сервиса. Через некоторое время после выполнения kafka-server-start.sh
сервис запустится и станет доступен.
Для остановки сервиса достаточно использовать комбинацию клавиш Ctrl+C
сначала в терминале с Apache Kafka, затем — в терминале с ZooKeeper.
Запуск в контейнере
Для запуска в контейнере доступно два официальных образа: apache/kafka
и apache/kafka-native
, использующий нативный образ, собранный при помощи GraalVM.
Простейший вариант запуска:
1 2 |
# Порт для взаимодействия с сервисом - 9092, пробросим его $ docker run -p 9092:9092 apache/kafka:latest |
После запуска сервиса он будет доступен по адресу localhost:9092
, и будет можно приступать к его использованию.
Инструменты CLI
Дистрибутив Apache Kafka в директории bin содержит утилиты командной строки, позволяющие работать с сервисом. В данной статье я рассмотрю некоторые из них:
kafka-console-producer.sh
— утилита для публикации сообщенийkafka-console-consumer.sh
— утилита для получения сообщенийkafka-topics.sh
— утилита для работы с топиками
Для ознакомления со справкой к каждой утилите выполните её с флагом --help
.
Работа с топиками
Давайте создадим топик selmag.order-management.order
с 5 партициями:
1 2 |
$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic selmag.order-management.order --partitions 5 Created topic selmag.order-management.order. |
Создавать топик вручную не обязательно, он будет создан автоматически с настройками по умолчанию при отправке сообщения в него.
Получить список топиков и подробную информацию о них можно при помощи той же утилиты:
1 2 3 4 5 6 7 |
$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --list selmag.order-management.order # наш топик # Получим подробную информацию о топике $ bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic selmag.order-management.order Topic: selmag.order-management.order TopicId: l6T4lSfbT-mGIgW1oq8XPA PartitionCount: 1 ReplicationFactor: 1 Configs: Topic: selmag.order-management.order Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Elr: N/A LastKnownElr: N/A |
Удалить топик тоже можно при помощи этой утилиты:
1 |
$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic selmag.order-management.order |
Получение событий
Для подписки на события можно использовать утилиту kafka-console-consumer.sh
:
1 2 3 4 5 |
# --from-beginning используется для получения всех сообщений с самого начала # в противном случае получатель получит только сообщения, возникшие после запуска $ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic selmag.order-management.order --from-beginning # здесь будут выводиться сообщения, нажмите Ctrl+C, когда надоест |
Но пока что на экране ничего нет, так как в топике нет сообщений. Откройте новое окно терминала и отправьте несколько сообщений.
Отправка событий
Отправить сообщения в топик можно при помощи kafka-console-producer.sh
:
1 2 3 4 5 6 7 |
# После запуска утилита захватит пользовательский ввод # При нажатии на ввод будет отправляться сообщение # Для выхода нажмите Ctrl+C $ bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic selmag.order-management.order >{"orderId":1,"version":1,"type":"created","userId":1,"timestamp":"2024-10-30T15:01:02Z"} >{"orderId":1,"version":2,"type":"offer_added","offerId":1,"amount":"1","timestamp":"2024-10-30T15:01:04Z"} >{"orderId":1,"version":3,"type":"offer_added","offerId":2,"amount":"3","timestamp":"2024-10-30T15:01:07Z"} |
Все три отправленные сообщения должны появиться в окне получателя.
В следующей статье будет рассмотрены базовые примеры работы с Apache Kafka в приложении на языке программирования Java.