Запуск Apache Kafka в кластере

Как и любой другой компонент распределённой информационной системы Apache Kafka в условиях реальной эксплуатации рекомендуется разворачивать в кластере для обеспечения отказоустойчивости. Кластер Kafka может быть развёрнут в двух вариантах: с использованием KRaft и Apache ZooKeeper.

Роли нод в кластере

В рамках кластера ноды Kafka могут иметь две роли:

  • Брокер (broker) — ноды данного типа обеспечивают обмен сообщениями с клиентами: отправителями и получателями.
  • Контроллер (controler) — ноды данного типа отвечают за жизненный цикл кластера.

При использовании Apache ZooKeeper контроллер выбирается автоматически из числа брокеров, а при использовании KRaft некоторому количеству нод назначается роль контроллера, и из их числа автоматически выбирается лидер.

KRaft и Apache ZooKeeper

Контроллер отвечает за поддержание элементов кластера в актуальном состоянии, но нужно ещё где-то хранить метаданные кластера: список топиков, их партиции и настройки реплицирования, читателей, их группы, смещения, и т.д.

До Apache Kafka 3.0.0 роль хранилища метаданных была отведена Apache ZooKeeper — сервису, который часто используется в качестве хранилища метаданных, конфигураций и используется при реализации механизма поиска сервисов (service discovery). Недостаток использования ZooKeeper заключается в разделении данных кластера между контроллером и ZooKeeper, что усложняет сопровождение кластера Kafka, так как техническим специалистам нужно сопровождать не только кластер Kafka, но и кластер ZooKeeper.

С версии Kafka 3.0.0 появилась замена ZooKeeper — KRaft (Kafka Raft), реализация алгоритма согласования Raft, позволяющая кворуму контроллеров самостоятельно выбирать активный контроллер, который в свою очередь отвечает за хранение метаданных кластера и его жизненный цикл. Таким образом кластер ZooKeeper можно заменить на кворум контроллеров Apache Kafka, который будет обслуживать кластер.

Начиная с версии Kafka 3.3.1 реализация KRaft является стабильной и рекомендуется к использованию вместо Apache ZooKeeper, а с версии Kafka 4.0 поддержка ZooKeeper будет прекращена.

Далее в данной статье будет рассмотрено развёртывание кластера с KRaft.

Кластер Kafka с KRaft

В качестве примера я буду запускать кластер, состоящий из трёх контроллеров и трёх брокеров, это разумное минимальное количество нод для обеспечения базовой отказоустойчивости. Для развёртывания кластера мне нужно подготовить файлы настроек для каждой ноды, сгенерировать идентификатор кластера и отформатировать директории логов. Все шесть нод будут располагаться в одной системе, поэтому для них будут использоваться разные порты и директории данных, но в условиях реальной эксплуатации каждая нода должна быть расположена в отдельной системе и на разных физических серверах для обеспечения отказоустойчивости.

Дистрибутив Apache Kafka содержит в директории config/kraft уже готовые файлы параметров для брокера, контроллера и сервера (смешанный режим), которые мы можем использовать в качестве основы:

  • broker.properties
  • controller.properties
  • server.properties

Файлы настроек в данной статье приведены для примера и не рассчитаны на использование в условиях реальной эксплуатации. Для более корректной настройки нод ознакомьтесь с официальной документацией.

Ниже приведён пример минимальной настройки контроллера:

config/kraft/controller_i1.properties

Так как все ноды разворачиваются на одной системе, значения свойств listeners и log.idrs должны быть разными в файлах свойств для разных нод. Для двух остальных контроллеров нужно будет создать аналогичные файлы. Так же файлы свойств нужно создать и для брокеров:

config/kraft/broker_i4.properties

Для двух остальных брокеров нужно создать аналогичные файлы.

Теперь нужно сгенерировать идентификатор кластера, сделать это можно следующей командой:

Далее нужно отформатировать директории логов, которые указаны в файлах свойств в свойстве log.dirs, например для брокера с идентификатором 4 команда будет следующей:

Сделать это нужно для всех нод кластера.

Теперь всё готово к запуску кластера, каждую ноду нужно запустить, сначала контроллеры, затем — брокеры. Пример команды запуска для первого контроллера:

После запуска всех нод можно проверить состояние кластера:

Теперь для проверки можно создать топик, партиции которого будут реплицироваться на трёх нодах:

По Leader и Replicas можно видеть, как настроена репликация патриций.

Кластер Kafka с KRaft в Docker

Apache Kafka в контейнерах запускается в режиме KRaft, для настройки каждой ноды мы можем использовать файлы свойств и переменные окружения.

Для запуска ноды Kafka с настройками из файла нужно смонтировать файл свойств в директорию /mnt/shared/config в контейнере. Допустим есть файл с параметрами ноды /opt/kafka/config/docker/controller-i1/server.properties, тогда команда для запуска будет выглядеть следующим образом:

Вместо файла свойств можно использовать переменные окружения. Переменные окружения соответствуют параметрам из файла свойств, имеют префикс KAFKA_, а точки заменены на _, например, переменная окружения KAFKA_NODE_ID соответствует свойству node.id.

Запуск контроллера в контейнере с переменными окружения выглядит следующим образом:

Как видно, localhost я заменил на 172.17.0.1 — адрес хост-системы из сети 172.17.0.0/16 для Docker, если у вашей хост-системы другой адрес в сети Docker, то замените 172.17.0.1 на него. Поскольку все ноды разворачиваются изолированно в своих контейнерах, можно использовать одни и те же директории данных и порты в настройках.

Если требуется задать идентификатор кластера, отличный от стандартного, то это можно сделать при помощи переменной кружения CLUSTER_ID.

Проверить состояние кластера можно, выполнив следующую команду на одном из контроллеров:

Ошибка UnknownHostException

Для брокеров же требуется дополнительная настройка, так как с настройками по умолчанию при попытке подключения к Kafka из-за пределов контейнера вы получите ошибку подключения вроде такой:

Даже если вы используете ip-адрес в качестве адреса брокера, через который взаимодействуете с кластером, то внутри кластера всё равно используются названия хостов. И клиент, получив от брокера информацию о кластере, пытается взаимодействовать с кластером, используя названия хостов. В данном случае это внутренние для Docker названия хостов, о которых внешний клиент ничего не знает, что и приводит к ошибке.

Чтобы решить эту проблему, нужно правильно сконфигурировать параметры listeners и advertisedListeners (KAFKA_LISTENERS и KAFKA_ADVERTISED_LISTENERS в apache/kafka и KAFKA_CFG_LISTENERS и KAFKA_CFG_ADVERTISED_LISTENERS в bitnami/kafka).

Порт 19092 будет использован для взаимодействий внутри Docker, а 9092 будет прокинут на порт 29092 в хост-систему для внешних клиентов. Для остальных двух сервисов меняться будет только порт хост-системы: 39092 и 49092.

Кластер Kafka с KRaft в Docker Compose

При помощи Docker Compose вы можете описать весь кластер одним файлом:

Поскольку Docker Compose позволяет обращаться к сервисам по их названию, в описании кворума я использовал названия сервисов. К сожалению использовать scale и уменьшить таким образом файл в 3 раза не получится, так как каждая нода должна иметь уникальный в рамках кластера идентификатор.

Полезные ссылки