Как и любой другой компонент распределённой информационной системы Apache Kafka в условиях реальной эксплуатации рекомендуется разворачивать в кластере для обеспечения отказоустойчивости. Кластер Kafka может быть развёрнут в двух вариантах: с использованием KRaft и Apache ZooKeeper.
Роли нод в кластере
В рамках кластера ноды Kafka могут иметь две роли:
- Брокер (
broker
) — ноды данного типа обеспечивают обмен сообщениями с клиентами: отправителями и получателями. - Контроллер (
controler
) — ноды данного типа отвечают за жизненный цикл кластера.
При использовании Apache ZooKeeper контроллер выбирается автоматически из числа брокеров, а при использовании KRaft некоторому количеству нод назначается роль контроллера, и из их числа автоматически выбирается лидер.
KRaft и Apache ZooKeeper
Контроллер отвечает за поддержание элементов кластера в актуальном состоянии, но нужно ещё где-то хранить метаданные кластера: список топиков, их партиции и настройки реплицирования, читателей, их группы, смещения, и т.д.
До Apache Kafka 3.0.0 роль хранилища метаданных была отведена Apache ZooKeeper — сервису, который часто используется в качестве хранилища метаданных, конфигураций и используется при реализации механизма поиска сервисов (service discovery). Недостаток использования ZooKeeper заключается в разделении данных кластера между контроллером и ZooKeeper, что усложняет сопровождение кластера Kafka, так как техническим специалистам нужно сопровождать не только кластер Kafka, но и кластер ZooKeeper.
С версии Kafka 3.0.0 появилась замена ZooKeeper — KRaft (Kafka Raft), реализация алгоритма согласования Raft, позволяющая кворуму контроллеров самостоятельно выбирать активный контроллер, который в свою очередь отвечает за хранение метаданных кластера и его жизненный цикл. Таким образом кластер ZooKeeper можно заменить на кворум контроллеров Apache Kafka, который будет обслуживать кластер.
Начиная с версии Kafka 3.3.1 реализация KRaft является стабильной и рекомендуется к использованию вместо Apache ZooKeeper, а с версии Kafka 4.0 поддержка ZooKeeper будет прекращена.
Далее в данной статье будет рассмотрено развёртывание кластера с KRaft.
Кластер Kafka с KRaft
В качестве примера я буду запускать кластер, состоящий из трёх контроллеров и трёх брокеров, это разумное минимальное количество нод для обеспечения базовой отказоустойчивости. Для развёртывания кластера мне нужно подготовить файлы настроек для каждой ноды, сгенерировать идентификатор кластера и отформатировать директории логов. Все шесть нод будут располагаться в одной системе, поэтому для них будут использоваться разные порты и директории данных, но в условиях реальной эксплуатации каждая нода должна быть расположена в отдельной системе и на разных физических серверах для обеспечения отказоустойчивости.
Дистрибутив Apache Kafka содержит в директории config/kraft
уже готовые файлы параметров для брокера, контроллера и сервера (смешанный режим), которые мы можем использовать в качестве основы:
broker.properties
controller.properties
server.properties
Файлы настроек в данной статье приведены для примера и не рассчитаны на использование в условиях реальной эксплуатации. Для более корректной настройки нод ознакомьтесь с официальной документацией.
Ниже приведён пример минимальной настройки контроллера:
config/kraft/controller_i1.properties
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
# Роль ноды process.roles=controller # Уникальный в рамках кластера идентификатор ноды, должен быть числом # Для двух других контроллеров значения будут соответственно 2 и 3 node.id=1 # Описание кворума контроллеров # Идентификатор участника кворума имеет следующий вид: # {node.id}@{host}:{port} controller.quorum.voters=1@localhost:9093,2@localhost:9193,3@localhost:9293 # В режиме KRaft нужно указывать, как будут называться слушатели-контроллеры controller.listener.names=CONTROLLER # Другие два контроллера будут запускаться на портах 9193 и 9293 соответственно listeners=CONTROLLER://:9093 # Директория данных ноды, для других контроллеров директории тоже будут свои log.dirs=/tmp/kraft-controller-logs-i1 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 num.partitions=5 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=3 transaction.state.log.replication.factor=3 transaction.state.log.min.isr=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 |
Так как все ноды разворачиваются на одной системе, значения свойств listeners
и log.idrs
должны быть разными в файлах свойств для разных нод. Для двух остальных контроллеров нужно будет создать аналогичные файлы. Так же файлы свойств нужно создать и для брокеров:
config/kraft/broker_i4.properties
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
# Роль ноды process.roles=broker # Уникальный в рамках кластера идентификатор ноды, должен быть числом # Для двух других брокеров значения будут соответственно 5 и 6 node.id=4 # Описание кворума контроллеров controller.quorum.voters=1@localhost:9093,2@localhost:9193,1@localhost:9293 # Другие два брокера будут запускаться на портах 9192 и 9292 соответственно listeners=PLAINTEXT://localhost:9092 # Директория данных ноды, для других брокеров директории тоже будут свои log.dirs=/tmp/kraft-broker-logs-i1 inter.broker.listener.name=PLAINTEXT advertised.listeners=PLAINTEXT://localhost:9092 controller.listener.names=CONTROLLER listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 num.partitions=5 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=3 transaction.state.log.replication.factor=3 transaction.state.log.min.isr=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 |
Для двух остальных брокеров нужно создать аналогичные файлы.
Теперь нужно сгенерировать идентификатор кластера, сделать это можно следующей командой:
1 2 3 4 5 |
$ bin/kafka-storage.sh random-uuid 8zxAHVOAQySnpi6uZg5HIg # Для удобства можно сохранить значение в переменную KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)" |
Далее нужно отформатировать директории логов, которые указаны в файлах свойств в свойстве log.dirs
, например для брокера с идентификатором 4 команда будет следующей:
1 2 |
$ bin/kafka-storage.sh format --config config/kraft/broker_i4.properties --cluster-id $KAFKA_CUSTER_ID Formatting /tmp/kraft-broker-logs-i1 with metadata.version 3.8-IV0. |
Сделать это нужно для всех нод кластера.
Теперь всё готово к запуску кластера, каждую ноду нужно запустить, сначала контроллеры, затем — брокеры. Пример команды запуска для первого контроллера:
1 2 |
# Для запуска каждого контроллера и брокера нужно выполнить команду $ bin/kafka-server-start.sh config/kraft/controller_i1.properties |
После запуска всех нод можно проверить состояние кластера:
1 2 3 4 5 6 7 8 9 10 11 12 |
$ bin/kafka-metadata-quorum.sh --bootstrap-controller localhost:9093 describe --status ClusterId: 8zxAHVOAQySnpi6uZg5HIg # Контроллер с идентификатором 1 является лидером LeaderId: 1 LeaderEpoch: 12 HighWatermark: 3251 MaxFollowerLag: 0 MaxFollowerLagTimeMs: 0 # В кластере 3 контроллера с идентификаторами 1, 2 и 3 CurrentVoters: [1,2,3] # В кластере 3 брокера с идентификаторами 4, 5 и 6 CurrentObservers: [4,5,6] |
Теперь для проверки можно создать топик, партиции которого будут реплицироваться на трёх нодах:
1 2 3 4 5 6 7 8 9 10 11 12 |
# Создадим топик $ bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic sandbox --partitions 5 --replication-factor 3 Created topic sandbox. # Проверим топик $ bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic sandbox Topic: sandbox TopicId: rgP_O6HbTSiLCarTFIoZWA PartitionCount: 5 ReplicationFactor: 3 Configs: Topic: sandbox Partition: 0 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2 Elr: N/A LastKnownElr: N/A Topic: sandbox Partition: 1 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3 Elr: N/A LastKnownElr: N/A Topic: sandbox Partition: 2 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1 Elr: N/A LastKnownElr: N/A Topic: sandbox Partition: 3 Leader: 3 Replicas: 3,2,1 Isr: 3,2,1 Elr: N/A LastKnownElr: N/A Topic: sandbox Partition: 4 Leader: 1 Replicas: 1,3,2 Isr: 1,3,2 Elr: N/A LastKnownElr: N/A |
По Leader и Replicas можно видеть, как настроена репликация патриций.
Кластер Kafka с KRaft в Docker
Apache Kafka в контейнерах запускается в режиме KRaft, для настройки каждой ноды мы можем использовать файлы свойств и переменные окружения.
Для запуска ноды Kafka с настройками из файла нужно смонтировать файл свойств в директорию /mnt/shared/config
в контейнере. Допустим есть файл с параметрами ноды /opt/kafka/config/docker/controller-i1/server.properties
, тогда команда для запуска будет выглядеть следующим образом:
1 |
$ docker run -v /opt/kafka/config/docker/controller-i1/:/mnt/shared/config -p 9093:9093 --name kafka-ctrl-i1 apache/kafka:3.8.1 |
Вместо файла свойств можно использовать переменные окружения. Переменные окружения соответствуют параметрам из файла свойств, имеют префикс KAFKA_
, а точки заменены на _
, например, переменная окружения KAFKA_NODE_ID
соответствует свойству node.id
.
Запуск контроллера в контейнере с переменными окружения выглядит следующим образом:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
$ docker run -p 9093:9092 --name kafka-ctrl-i1 \ -e KAFKA_PROCESS_ROLES=controller \ -e KAFKA_NODE_ID=1 \ -e KAFKA_CONTROLLER_QUORUM_VOTERS=1@172.17.0.1:9093,2@172.17.0.1:9193,3@172.17.0.1:9293 \ -e KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER \ -e KAFKA_LISTENERS=CONTROLLER://:9092 \ -e KAFKA_LOG_DIRS=/tmp/ctrl \ -e KAFKA_NUM_NETWORK_THREADS=3 \ -e KAFKA_NUM_IO_THREADS=8 \ -e KAFKA_SOCKET_SEND_BUFFER_BYTES=102400 \ -e KAFKA_SOCKET_RECEIVE_BUFFER_BYTES=102400 \ -e KAFKA_SOCKET_REQUEST_MAX_BYTES=104857600 \ -e KAFKA_NUM_PARTITIONS=5 \ -e KAFKA_NUM_RECOVERY_THREADS_PER_DATA_DIR=1 \ -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=3 \ -e KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3 \ -e KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1 \ -e KAFKA_LOG_RETENTION_HOURS=168 \ -e KAFKA_LOG_SEGMENT_BYTES=1073741824 \ -e KAFKA_LOG_RETENTION_CHECK_INTERVAL_MS=300000 \ apache/kafka:3.8.1 |
Как видно, localhost
я заменил на 172.17.0.1
— адрес хост-системы из сети 172.17.0.0/16 для Docker, если у вашей хост-системы другой адрес в сети Docker, то замените 172.17.0.1
на него. Поскольку все ноды разворачиваются изолированно в своих контейнерах, можно использовать одни и те же директории данных и порты в настройках.
Если требуется задать идентификатор кластера, отличный от стандартного, то это можно сделать при помощи переменной кружения CLUSTER_ID
.
Проверить состояние кластера можно, выполнив следующую команду на одном из контроллеров:
1 |
$ docker exec kafka-ctrl-i1 /opt/kafka/bin/kafka-metadata-quorum.sh --bootstrap-controller localhost:9093 describe --status |
Ошибка UnknownHostException
Для брокеров же требуется дополнительная настройка, так как с настройками по умолчанию при попытке подключения к Kafka из-за пределов контейнера вы получите ошибку подключения вроде такой:
1 2 |
[2024-11-12 12:54:06,791] WARN [AdminClient clientId=adminclient-1] Error connecting to node ebb8ea136b33:9092 (id: 6 rack: null) (org.apache.kafka.clients.NetworkClient) java.net.UnknownHostException: ebb8ea136b33 |
Даже если вы используете ip-адрес в качестве адреса брокера, через который взаимодействуете с кластером, то внутри кластера всё равно используются названия хостов. И клиент, получив от брокера информацию о кластере, пытается взаимодействовать с кластером, используя названия хостов. В данном случае это внутренние для Docker названия хостов, о которых внешний клиент ничего не знает, что и приводит к ошибке.
Чтобы решить эту проблему, нужно правильно сконфигурировать параметры listeners
и advertisedListeners
(KAFKA_LISTENERS
и KAFKA_ADVERTISED_LISTENERS
в apache/kafka
и KAFKA_CFG_LISTENERS
и KAFKA_CFG_ADVERTISED_LISTENERS
в bitnami/kafka
).
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
$ docker run -p 29092:9092 --name kafka-ctrl-i1 \ -e KAFKA_PROCESS_ROLES=broker \ -e KAFKA_NODE_ID=4 \ -e KAFKA_CONTROLLER_QUORUM_VOTERS=1@172.17.0.1:9093,2@172.17.0.1:9193,3@172.17.0.1:9293 \ -e KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER \ -e KAFKA_LISTENERS=PLAINTEXT://:19092,PLAINTEXT_HOST://:9092 \ -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://:19092,PLAINTEXT_HOST://:29092 \ -e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT \ -e KAFKA_LOG_DIRS=/tmp/ctrl \ -e KAFKA_NUM_NETWORK_THREADS=3 \ -e KAFKA_NUM_IO_THREADS=8 \ -e KAFKA_SOCKET_SEND_BUFFER_BYTES=102400 \ -e KAFKA_SOCKET_RECEIVE_BUFFER_BYTES=102400 \ -e KAFKA_SOCKET_REQUEST_MAX_BYTES=104857600 \ -e KAFKA_NUM_PARTITIONS=5 \ -e KAFKA_NUM_RECOVERY_THREADS_PER_DATA_DIR=1 \ -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=3 \ -e KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3 \ -e KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1 \ -e KAFKA_LOG_RETENTION_HOURS=168 \ -e KAFKA_LOG_SEGMENT_BYTES=1073741824 \ -e KAFKA_LOG_RETENTION_CHECK_INTERVAL_MS=300000 \ apache/kafka:3.8.1 |
Порт 19092 будет использован для взаимодействий внутри Docker, а 9092 будет прокинут на порт 29092 в хост-систему для внешних клиентов. Для остальных двух сервисов меняться будет только порт хост-системы: 39092 и 49092.
Кластер Kafka с KRaft в Docker Compose
При помощи Docker Compose вы можете описать весь кластер одним файлом:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 |
name: kafka-cluster networks: kafka: name: kafka services: controller-i1: image: apache/kafka:3.8.1 ports: - 9093:9093 networks: - kafka environment: KAFKA_PROCESS_ROLES: controller KAFKA_NODE_ID: 1 KAFKA_CONTROLLER_QUORUM_VOTERS: 1@controller-i1:9093,2@controller-i2:9093,3@controller-i3:9093 KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER KAFKA_LISTENERS: CONTROLLER://:9093 KAFKA_LOG_DIRS: /tmp/ctrl KAFKA_NUM_NETWORK_THREADS: 3 KAFKA_NUM_IO_THREADS: 8 KAFKA_SOCKET_SEND_BUFFER_BYTES: 102400 KAFKA_SOCKET_RECEIVE_BUFFER_BYTES: 102400 KAFKA_SOCKET_REQUEST_MAX_BYTES: 104857600 KAFKA_NUM_PARTITIONS: 5 KAFKA_NUM_RECOVERY_THREADS_PER_DATA_DIR: 1 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_LOG_RETENTION_HOURS: 168 KAFKA_LOG_SEGMENT_BYTES: 1073741824 KAFKA_LOG_RETENTION_CHECK_INTERVAL_MS: 300000 controller-i2: image: apache/kafka:3.8.1 ports: - 9193:9093 networks: - kafka environment: KAFKA_PROCESS_ROLES: controller KAFKA_NODE_ID: 2 KAFKA_CONTROLLER_QUORUM_VOTERS: 1@controller-i1:9093,2@controller-i2:9093,3@controller-i3:9093 KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER KAFKA_LISTENERS: CONTROLLER://:9093 KAFKA_LOG_DIRS: /tmp/ctrl KAFKA_NUM_NETWORK_THREADS: 3 KAFKA_NUM_IO_THREADS: 8 KAFKA_SOCKET_SEND_BUFFER_BYTES: 102400 KAFKA_SOCKET_RECEIVE_BUFFER_BYTES: 102400 KAFKA_SOCKET_REQUEST_MAX_BYTES: 104857600 KAFKA_NUM_PARTITIONS: 5 KAFKA_NUM_RECOVERY_THREADS_PER_DATA_DIR: 1 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_LOG_RETENTION_HOURS: 168 KAFKA_LOG_SEGMENT_BYTES: 1073741824 KAFKA_LOG_RETENTION_CHECK_INTERVAL_MS: 300000 controller-i3: image: apache/kafka:3.8.1 ports: - 9293:9093 networks: - kafka environment: KAFKA_PROCESS_ROLES: controller KAFKA_NODE_ID: 3 KAFKA_CONTROLLER_QUORUM_VOTERS: 1@controller-i1:9093,2@controller-i2:9093,3@controller-i3:9093 KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER KAFKA_LISTENERS: CONTROLLER://:9093 KAFKA_LOG_DIRS: /tmp/ctrl KAFKA_NUM_NETWORK_THREADS: 3 KAFKA_NUM_IO_THREADS: 8 KAFKA_SOCKET_SEND_BUFFER_BYTES: 102400 KAFKA_SOCKET_RECEIVE_BUFFER_BYTES: 102400 KAFKA_SOCKET_REQUEST_MAX_BYTES: 104857600 KAFKA_NUM_PARTITIONS: 5 KAFKA_NUM_RECOVERY_THREADS_PER_DATA_DIR: 1 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_LOG_RETENTION_HOURS: 168 KAFKA_LOG_SEGMENT_BYTES: 1073741824 KAFKA_LOG_RETENTION_CHECK_INTERVAL_MS: 300000 broker-i1: image: apache/kafka:3.8.1 hostname: broker-i1 ports: - 29092:9092 networks: - kafka environment: KAFKA_PROCESS_ROLES: broker KAFKA_NODE_ID: 4 KAFKA_CONTROLLER_QUORUM_VOTERS: 1@controller-i1:9093,2@controller-i2:9093,3@controller-i3:9093 KAFKA_LISTENERS: PLAINTEXT://:19092,PLAINTEXT_HOST://:9092 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker-i1:19092,PLAINTEXT_HOST://localhost:29092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER KAFKA_NUM_NETWORK_THREADS: 3 KAFKA_NUM_IO_THREADS: 8 KAFKA_SOCKET_SEND_BUFFER_BYTES: 102400 KAFKA_SOCKET_RECEIVE_BUFFER_BYTES: 102400 KAFKA_SOCKET_REQUEST_MAX_BYTES: 104857600 KAFKA_LOG_DIRS: /tmp/broker KAFKA_NUM_PARTITIONS: 3 KAFKA_NUM_RECOVERY_THREADS_PER_DATA_DIR: 1 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_LOG_RETENTION_HOURS: 168 KAFKA_LOG_SEGMENT_BYTES: 1073741824 depends_on: - controller-i1 - controller-i2 - controller-i3 broker-i2: image: apache/kafka:3.8.1 hostname: broker-i2 ports: - 39092:9092 networks: - kafka environment: KAFKA_PROCESS_ROLES: broker KAFKA_NODE_ID: 5 KAFKA_CONTROLLER_QUORUM_VOTERS: 1@controller-i1:9093,2@controller-i2:9093,3@controller-i3:9093 KAFKA_LISTENERS: PLAINTEXT://:19092,PLAINTEXT_HOST://:9092 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker-i2:19092,PLAINTEXT_HOST://localhost:39092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER KAFKA_NUM_NETWORK_THREADS: 3 KAFKA_NUM_IO_THREADS: 8 KAFKA_SOCKET_SEND_BUFFER_BYTES: 102400 KAFKA_SOCKET_RECEIVE_BUFFER_BYTES: 102400 KAFKA_SOCKET_REQUEST_MAX_BYTES: 104857600 KAFKA_LOG_DIRS: /tmp/broker KAFKA_NUM_PARTITIONS: 3 KAFKA_NUM_RECOVERY_THREADS_PER_DATA_DIR: 1 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_LOG_RETENTION_HOURS: 168 KAFKA_LOG_SEGMENT_BYTES: 1073741824 depends_on: - controller-i1 - controller-i2 - controller-i3 broker-i3: image: apache/kafka:3.8.1 hostname: broker-i3 ports: - 49092:9092 networks: - kafka environment: KAFKA_PROCESS_ROLES: broker KAFKA_NODE_ID: 6 KAFKA_CONTROLLER_QUORUM_VOTERS: 1@controller-i1:9093,2@controller-i2:9093,3@controller-i3:9093 KAFKA_LISTENERS: PLAINTEXT://:19092,PLAINTEXT_HOST://:9092 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker-i3:19092,PLAINTEXT_HOST://localhost:49092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER KAFKA_NUM_NETWORK_THREADS: 3 KAFKA_NUM_IO_THREADS: 8 KAFKA_SOCKET_SEND_BUFFER_BYTES: 102400 KAFKA_SOCKET_RECEIVE_BUFFER_BYTES: 102400 KAFKA_SOCKET_REQUEST_MAX_BYTES: 104857600 KAFKA_LOG_DIRS: /tmp/broker KAFKA_NUM_PARTITIONS: 3 KAFKA_NUM_RECOVERY_THREADS_PER_DATA_DIR: 1 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_LOG_RETENTION_HOURS: 168 KAFKA_LOG_SEGMENT_BYTES: 1073741824 depends_on: - controller-i1 - controller-i2 - controller-i3 |
Поскольку Docker Compose позволяет обращаться к сервисам по их названию, в описании кворума я использовал названия сервисов. К сожалению использовать scale и уменьшить таким образом файл в 3 раза не получится, так как каждая нода должна иметь уникальный в рамках кластера идентификатор.