С выпуском версии 4.0 Apache Kafka лишится поддержки ZooKeeper, и единственным вариантом развёртывания кластера останется использование KRaft, о котором было рассказано в одной из предыдущих статей. Несмотря на это, я предлагаю в этой статье рассмотреть вариант развёртывания кластера Apache Kafka с кворумом Apache ZooKeeper.
Простой кластер для разработки
Простейший кластер для разработки можно запустить двумя командами, не внося изменений в конфигурации:
1 2 3 4 5 |
# Сначала запускаем ZooKeeper в одном окне терминала $ bin/zookeeper-server-start.sh config/zookeeper.properties # Затем Kafka - в другом $ bin/kafka-server-start.sh config/server.properties |
В результате будет запущен кластер из одного экземпляра ZooKeeper и одного экземпляра Kafka с настройками по умолчанию. Этого вполне достаточно для начала знакомства с Apache Kafka, но я советую разворачивать кластер из трёх экземпляров Kafka для более глубокого погружения в темы партиционирования и реплицирования, которым была посвящена предыдущая статья. Для этого достаточно сделать две копии файла config/server.properties
и изменить идентификаторы брокеров, директории хранения данных и порты, на которых Kafka будет ожидать соединения.
Исходный файл server.properties
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
# Изменяться будет эта часть # Идентификатор брокера должен быть уникальным в рамках кластера broker.id=0 # У каждого брокера должна быть своя директория хранения файлов log.dirs=/tmp/kafka-logs num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 num.partitions=1 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=3 transaction.state.log.replication.factor=3 transaction.state.log.min.isr=1 log.retention.hours=168 log.retention.check.interval.ms=300000 zookeeper.connect=localhost:2181 zookeeper.connection.timeout.ms=18000 group.initial.rebalance.delay.ms=0 |
Файл с настройками второго экземпляра Kafka server_2.properties
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
broker.id=1 log.dirs=/tmp/kafka-logs-1 # Второй экземпляр будет прослушивать соединения на порту 9093 listeners=PLAINTEXT://:9093 # Прочие настройки без изменений num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 num.partitions=1 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=3 transaction.state.log.replication.factor=3 transaction.state.log.min.isr=1 log.retention.hours=168 log.retention.check.interval.ms=300000 zookeeper.connect=localhost:2181 zookeeper.connection.timeout.ms=18000 group.initial.rebalance.delay.ms=0 |
Файл с настройками третьего экземпляра Kafka server_3.properties
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
broker.id=2 log.dirs=/tmp/kafka-logs-2 listeners=PLAINTEXT://:9094 # Прочие настройки без изменений num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 num.partitions=1 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=3 transaction.state.log.replication.factor=3 transaction.state.log.min.isr=1 log.retention.hours=168 log.retention.check.interval.ms=300000 zookeeper.connect=localhost:2181 zookeeper.connection.timeout.ms=18000 group.initial.rebalance.delay.ms=0 |
Теперь осталось запустить 2 и 3 экземпляры Kafka как было показано выше:
1 2 3 4 5 |
# Запуск второго экземпляра Kafka $ bin/kafka-server-start.sh config/server_2.properties # Запуск третьего экземпляра Kafka $ bin/kafka-server-start.sh config/server_3.properties |
Тестовый кластер из трёх экземпляров Kafka готов к использованию, вы можете взаимодействовать с ним, подключаясь к любому из экземпляров, входящему в кластер.
Apache Kafka с ZooKeeper в Docker
Стандартный образ Apache Kafka не поддерживает запуск с Apache ZooKeeper, но если вы очень этого хотите, то исходный образ нужно немного изменить. Стандартный образ содержит файл /etc/kafka/docker/launch
, в котором содержится команда на форматирование директории данных:
1 2 3 4 5 6 7 |
# Invoke the docker wrapper to setup property files and format storage result=$(/opt/kafka/bin/kafka-run-class.sh kafka.docker.KafkaDockerWrapper setup \ --default-configs-dir /etc/kafka/docker \ --mounted-configs-dir /mnt/shared/config \ --final-configs-dir /opt/kafka/config 2>&1) || \ echo $result | grep -i "already formatted" || \ { echo $result && (exit 1) } |
Необходимо скопировать этот файл, удалить указанный код и заменить получившимся файлом исходный в образе:
Dockerfile
1 2 3 4 5 |
FROM apache/kafka:3.9.0 COPY --chown=appuser:appuser launch /etc/kafka/docker/launch CMD ["/etc/kafka/docker/run"] |
Останется собрать модифицированный образ и запустить контейнер:
1 2 3 4 5 |
$ docker build . -t apache/kafka-zoo:3.9.0 $ docker run -p 9092:9092 \ -v $(pwd)/kafka/server.properties:/opt/kafka/config/server.properties \ apache/kafka-zoo:3.9.0 |
Ошибка UnknownHostException
Важным моментом при использовании контейнеров является правильная настройка сервиса, а она будет отличаться от настройки вне контейнеров, в противном случае при попытке подключения к Kafka из-за пределов контейнера вы получите ошибку подключения вроде такой:
1 2 |
[2024-11-12 12:54:06,791] WARN [AdminClient clientId=adminclient-1] Error connecting to node ebb8ea136b33:9092 (id: 6 rack: null) (org.apache.kafka.clients.NetworkClient) java.net.UnknownHostException: ebb8ea136b33 |
Даже если вы используете ip-адрес в качестве адреса брокера, через который взаимодействуете с кластером, то внутри кластера всё равно используются названия хостов. И клиент, получив от брокера информацию о кластере, пытается взаимодействовать с кластером, используя названия хостов. В данном случае это внутренние для Docker названия хостов, о которых внешний клиент ничего не знает, что и приводит к ошибке.
Чтобы решить эту проблему, нужно правильно сконфигурировать параметры listeners
и advertisedListeners
(KAFKA_LISTENERS
и KAFKA_ADVERTISED_LISTENERS
в apache/kafka
и KAFKA_CFG_LISTENERS
и KAFKA_CFG_ADVERTISED_LISTENERS
в bitnami/kafka
).
Настройка собственного образа
В listeners
нужно перечислить все адреса, которые сервис будет прослушивать для соединения с клиентами: первый — для взаимодействий внутри Docker (в т.ч. между разными контейнерами), второй — для внешних клиентов. В advertisedListeners
нужно перечислить адреса, которые будут объявлены сервисом, как используемые: первый будет соответствовать первому адресу из listeners
, а второй будет использовать порт из хост-системы, который будет прокинут на второй адрес из listeners
. Важно: в кластере объявленные адреса должны быть уникальны.
Полностью все настройки приводить не буду, продемонстрирую только listeners
и advertisedListeners
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
# Ожидаем подключения на двух портах: # - 19092 - для клиентов в Docker # - 9092 - для внешних соединений listeners=PLAINTEXT://:19092,PLAINTEXT_HOST://:9092 # Объявляем порты: # - 19092 - для взаимодействий между сервисами, входящими в кластер # - 29092 - для подключений внешних клиентов (host:29092->container:9092) advertised.listeners=PLAINTEXT://broker-i1:19092,PLAINTEXT_HOST://localhost:29092 listener.security.protocol.map=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT # Адреса кластера ZooKeeper zookeeper.connect=172.17.0.1:2181,172.17.0.1:2182,172.17.0.1:2183 |
Порт 19092 будет использован для взаимодействий внутри Docker, а 9092 будет прокинут на порт 29092 в хост-систему для внешних клиентов. Для остальных двух сервисов меняться будет только порт хост-системы: 39092 и 49092.
Описание процесса развёртывания кластера ZooKeeper я вынес в отдельную статью. Процесс демонстрируются, исходя из того, что кластер ZooKeeper запущен в контейнерах, а порты проброшены на следующие порты хост-системы (172.17.0.1
): 2181
, 2182
и 2183
.
Пример запуска контейнеров:
1 2 3 4 5 6 7 8 9 10 11 |
$ docker run -p 29092:9092 \ -v $(pwd)/config/server_i1.properties:/opt/kafka/config/server.properties \ apache/kafka-zoo:3.9.0 $ docker run -p 39092:9092 \ -v $(pwd)/config/server_i2.properties:/opt/kafka/config/server.properties \ apache/kafka-zoo:3.9.0 $ docker run -p 49092:9092 \ -v $(pwd)/config/server_i3.properties:/opt/kafka/config/server.properties \ apache/kafka-zoo:3.9.0 |
Использование bitnami/kafka
Аналогичного результата можно добиться и при помощи образа bitnami/kafka
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
$ docker run -p 29092:9092 \ -e KAFKA_CFG_ZOOKEEPER_CONNECT="172.17.0.1:2181,172.17.0.1:2182,172.17.0.1:2183" \ -e KAFKA_CFG_BROKER_ID=1 \ -e KAFKA_CFG_LISTENERS=PLAINTEXT://:19092,PLAINTEXT_HOST://:9092 \ -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:19092,PLAINTEXT_HOST://:29092 \ -e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT \ bitnami/kafka $ docker run -p 39092:9092 \ -e KAFKA_CFG_ZOOKEEPER_CONNECT="172.17.0.1:2181,172.17.0.1:2182,172.17.0.1:2183" \ -e KAFKA_CFG_BROKER_ID=2 \ -e KAFKA_CFG_LISTENERS=PLAINTEXT://:19092,PLAINTEXT_HOST://:9092 \ -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:19092,PLAINTEXT_HOST://:39092 \ -e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT \ bitnami/kafka $ docker run -p 49092:9092 \ -e KAFKA_CFG_ZOOKEEPER_CONNECT="172.17.0.1:2181,172.17.0.1:2182,172.17.0.1:2183" \ -e KAFKA_CFG_BROKER_ID=3 \ -e KAFKA_CFG_LISTENERS=PLAINTEXT://:19092,PLAINTEXT_HOST://:9092 \ -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:19092,PLAINTEXT_HOST://:49092 \ -e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT \ bitnami/kafka |
Apache Kafka и ZooKeeper в Docker Compose
Кластер Apache Kafka вместе с ZooKeeper можно описать при помощи Docker Compose, взяв за основу файл, описанный в статье, посвящённой ZooKeeper:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
services: zk1: image: zookeeper hostname: zk1 ports: - "2181:2181" environment: ZOO_MY_ID: 1 ZOO_SERVERS: server.1=zk1:2888:3888;2181 server.2=zk2:2888:3888 server.3=zk3:2888:3888 ZOO_4LW_COMMANDS_WHITELIST: srvr,stat zk2: image: zookeeper hostname: zk2 ports: - "12181:2181" environment: ZOO_MY_ID: 2 ZOO_SERVERS: server.1=zk1:2888:3888 server.2=zk2:2888:3888;2181 server.3=zk3:2888:3888 ZOO_4LW_COMMANDS_WHITELIST: srvr,stat zk3: image: zookeeper hostname: zk3 ports: - "22181:2181" environment: ZOO_MY_ID: 3 ZOO_SERVERS: server.1=zk1:2888:3888 server.2=zk2:2888:3888 server.3=zk3:2888:3888;2181 ZOO_4LW_COMMANDS_WHITELIST: srvr,stat kafka1: image: apache/kafka-zoo:3.9.0 hostname: kafka1 ports: - "29092:9092" volumes: - "conf/kafka-1.properties:/opt/kafka/config/server.properties" depends_on: - zk1 - zk2 - zk3 kafka2: image: apache/kafka-zoo:3.9.0 hostname: kafka2 ports: - "39092:9092" volumes: - "conf/kafka-2.properties:/opt/kafka/config/server.properties" depends_on: - zk1 - zk2 - zk3 kafka3: image: apache/kafka-zoo:3.9.0 hostname: kafka3 ports: - "49092:9092" volumes: - "conf/kafka-3.properties:/opt/kafka/config/server.properties" depends_on: - zk1 - zk2 - zk3 |
Так же можно использовать образ bitnami/kafka
. Для запуска кворума Apache ZooKeeper и кластера Apache Kafka из трёх экземпляров в Docker Compose потребуется следующий файл:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
services: zk1: image: zookeeper hostname: zk1 ports: - "2181:2181" environment: ZOO_MY_ID: 1 ZOO_SERVERS: server.1=zk1:2888:3888;2181 server.2=zk2:2888:3888 server.3=zk3:2888:3888 ZOO_4LW_COMMANDS_WHITELIST: srvr,stat zk2: image: zookeeper hostname: zk2 ports: - "2182:2181" environment: ZOO_MY_ID: 2 ZOO_SERVERS: server.1=zk1:2888:3888 server.2=zk2:2888:3888;2181 server.3=zk3:2888:3888 ZOO_4LW_COMMANDS_WHITELIST: srvr,stat zk3: image: zookeeper hostname: zk3 ports: - "2183:2181" environment: ZOO_MY_ID: 3 ZOO_SERVERS: server.1=zk1:2888:3888 server.2=zk2:2888:3888 server.3=zk3:2888:3888;2181 ZOO_4LW_COMMANDS_WHITELIST: srvr,stat kafka1: image: bitnami/kafka hostname: kafka1 ports: - "29092:9092" environment: KAFKA_CFG_ZOOKEEPER_CONNECT: "zk1:2181,zk2:2181,zk3:2181" KAFKA_CFG_BROKER_ID: 1 KAFKA_CFG_LISTENERS: PLAINTEXT://:19092,PLAINTEXT_HOST://:9092 KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:19092,PLAINTEXT_HOST://localhost:29092 KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT depends_on: - zk1 - zk2 - zk3 kafka2: image: bitnami/kafka hostname: kafka1 ports: - "39092:9092" environment: KAFKA_CFG_ZOOKEEPER_CONNECT: "zk1:2181,zk2:2181,zk3:2181" KAFKA_CFG_BROKER_ID: 2 KAFKA_CFG_LISTENERS: PLAINTEXT://:19092,PLAINTEXT_HOST://:9092 KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:19092,PLAINTEXT_HOST://localhost:39092 KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT depends_on: - zk1 - zk2 - zk3 kafka3: image: bitnami/kafka hostname: kafka1 ports: - "49092:9092" environment: KAFKA_CFG_ZOOKEEPER_CONNECT: "zk1:2181,zk2:2181,zk3:2181" KAFKA_CFG_BROKER_ID: 3 KAFKA_CFG_LISTENERS: PLAINTEXT://:19092,PLAINTEXT_HOST://:9092 KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://kafka3:19092,PLAINTEXT_HOST://localhost:49092 KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT depends_on: - zk1 - zk2 - zk3 |