Проект Apache Kafka разрабатывался с прицелом на применение в высоко нагруженных и отказоустойчивых системах. Кроме развёртывания сервисов в кластере, которому была посвящена предыдущая статья, высокую производительность и отказоустойчивость обеспечивают партиции и реплики.
Партиции
Топики в Apache Kafka делятся на партиции, и это помогает повысить производительность системы. Партиции распределяются между узлами кластера, что позволяет распределить и нагрузку между узлами.
Давайте создадим топик с четырьмя партициями в кластере из трёх узлов:
1 2 3 4 5 |
$ bin/kafka-topics.sh \ --bootstrap-server localhost:9092 --topic sandbox --create --partitions 4 |
Партиции созданы не на одном отдельно взятом узле кластера, а распределены между всеми узлами кластера, в чём мы можем убедиться при помощи утилиты kafka-topics.sh
:
1 2 3 4 5 6 |
$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic sandbox Topic: sandboxopTopicId: tCfBTnJsTzeW86D3O9-CDAb PartitionCount: 4 ReplicationFactor: 1 Configs: Topic: sandbox Partition: 0 Leader: 1 Replicas: 1 Isr: 1 Elr: LastKnownElr: Topic: sandbox Partition: 1 Leader: 2 Replicas: 2 Isr: 2 Elr: LastKnownElr: Topic: sandbox Partition: 2 Leader: 3 Replicas: 3 Isr: 3 Elr: LastKnownElr: Topic: sandbox Partition: 3 Leader: 1 Replicas: 1 Isr: 1 Elr: LastKnownElr: |
Схематично это выглядит так:
Максимальное количество получателей, которые могут параллельно получать сообщения из топика при подписке в рамках группы, определяется количеством партиций, так как на одну партицию может быть подписан только один получатель из группы.
При необходимости количество партиций можно увеличить при помощи утилиты kafka-topics.sh
:
1 2 3 4 5 |
$ bin/kafka-topics.sh \ --bootstrap-server localhost:9092 --topic sandbox --alter --partitions 5 |
Распределение сообщений
Добавляемые сообщения распределяются между партициями топика. Если в сообщении не указан ключ, то сообщения будут более-менее равномерно распределяться между всеми партициями.
Если в сообщении указан ключ, то все сообщения с одним ключом будут попадать в одну партицию. Партиция выбирается при помощи несложной формулы: hash(message_key) % number_of_partitions
(напоминает HashMap, не так ли?). При этом партиции могут содержать как сообщения с ключом, так и без него.
Сообщения хранятся в том порядке, в котором они были добавлены в партицию, однако порядок в рамках всего топика не гарантируется.
Партиции и отправители
Отправители могут не указывать партицию при отправке сообщений, тогда брокер сам определяет описанным выше способом, в какую партицию топика будет добавлено сообщение. Если отправитель указывает партицию при отправке сообщения, то оно добавляется именно в указанную партицию, игнорируя механизм распределения сообщений между партициями. Одновременно множество отправителей могут отправлять сообщения в партиции топиков.
Если партиция реплицирована, то сообщение записывается в главную реплику, после чего остальные реплики синхронизируются.
Партиции и получатели
Когда получатели подписываются на топики в рамках группы получателей, то партиции распределяются между ними. Если партиций больше, чем получателей, то некоторые получатели могут получать сообщения из нескольких партиций сразу. Если получателей больше, чем партиций, то «лишние» получатели будут просто бездействовать.
Figure 1. Один получатель в группе подписан на все партиции
Figure 2. Партиции распределены между несколькими получателями
Figure 3. Если получателей больше, чем партиций, то часть получателей будет бездействовать
Если происходит изменения состава группы — добавляются или исчезают получатели, то партиции перераспределяются между получателями таким образом, чтобы все сообщения были доставлены группе.
При этом несколько групп получателей могут независимо друг от друга подписываться на одни и те же топики, они никак не влияют друг на друга.
Figure 4. Несколько групп получателей
Реплики
Для обеспечения сохранности данных в Apache Kafka реализован механизм репликации — копирования данных между узлами кластера. Настройки репликации очень гибкие и с одной стороны позволяют установить общие правила реплицирования для топика или даже кластера, а с другой — определить максимально точные правила реплицирования каждой партиции!
При создании нового топика мы можем указать количество копий всех его партиций при помощи --replication-factor
:
Создание топика с тремя партициями и двумя репликами для каждой партиции
1 2 3 4 5 6 |
$ bin/kafka-topics.sh \ --bootstrap-server localhost:9092 \ --create \ --topic sandbox \ --partitions 3 \ --replication-factor 2 |
Если запросить информацию о топике, то мы увидим следующую картину:
1 2 3 4 5 |
$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic sandbox Topic: sandbox-1 TopicId: 1kMPohSsS4ObQ6PkMQ1vQg PartitionCount: 3 ReplicationFactor: 2 Topic: sandbox-1 Partition: 0 Leader: 5 Replicas: 5,6 Topic: sandbox-1 Partition: 1 Leader: 6 Replicas: 6,4 Topic: sandbox-1 Partition: 2 Leader: 4 Replicas: 4,5 |
Партиция 0 находится на 5 и 6 узлах кластера, копия на 5 узле является главной, т.е. отправители будут отправлять сообщения на 5 узел кластера при добавлении сообщения в 0 партицию топика sandbox. Аналогично, 1 партиция находится на 6 и 4 узлах, а 2 — на 4 и 5. Таким образом мы добились реплицирования данных, имея по две копии каждой партиции, но с вменяемой избыточностью — партиции дублируются на двух узлах из трёх.
На схеме выше лидирующие реплики отмечены звёздочкой (*
), именно в эти реплики отправляются сообщения.
Для обеспечения отказоустойчивости и надёжности хранения данных в реальных условиях лучше иметь 5 узлов в кластере и по 3 реплики для каждого топика.
Если есть необходимость в ручном указании, как должны реплицироваться партиции между узлами кластера, можно при создании топика использовать аргумент --replica-assignment
. В качестве значения аргумента нужно передать список идентификаторов узлов, на которых должны находиться реплики партиций. Идентификаторы узлов разделяются двоеточием :
, а партиции — запятой ,
. Общий вид такой: узлы:для:0:партиции,узлы:для:1:партиции,узлы:для:2:партиции
Допустим, нужно создать топик с тремя партициями и следующими правилами репликации в кластере с узлами 4, 5 и 6:
- 1 партиция должна иметь две реплики на узлах 4 и 5, на 4 узле реплика должна быть главной
- 2 партиция должна иметь две реплики на узлах 5 и 6, на 5 узле реплика должна быть главной
- 3 партиция должна иметь две реплики на узлах 6 и 4, на 6 узле реплика должна быть главной
1 2 3 4 5 |
$ bin/kafka-topics.sh \ --bootstrap-server localhost:9092 \ --create \ --topic sandbox \ --replica-assignment 4:5,5:6,6:4 |
Недопустимо указывать разное количество узлов для партиций одного топика.
В результате будет создан топик со следующими партициями и репликами:
1 2 3 4 5 |
$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic sandbox Topic: sandbox-1 TopicId: 1kMPohSsS4ObQ6PkMQ1vQg PartitionCount: 3 ReplicationFactor: 2 Topic: sandbox-1 Partition: 0 Leader: 4 Replicas: 4,5 Topic: sandbox-1 Partition: 1 Leader: 5 Replicas: 5,6 Topic: sandbox-1 Partition: 2 Leader: 6 Replicas: 6,4 |
В случае необходимости можно изменить настройки репликации при помощи kafka-reassign-partitions.sh
. Допустим, необходимо освободить 5 узел, следовательно, реплики партиции 0 и 1 нужно перенести с 5 узла на 4 и 6.
Для этого нужно подготовить файл миграции:
migration.json
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
{ "partitions": [ { "topic": "sandbox", "partition": 0, "replicas": [4,6] }, { "topic": "sandbox", "partition": 1, "replicas": [4,6] } ], "version": 1 } |
Осталось выполнить команду:
1 2 3 4 |
$ bin/kafka-reassign-partitions.sh \ --bootstrap-server localhost:9092 \ --reassignment-json-file migration.json \ --execute |
При использовании этой же утилиты можно уменьшать или увеличивать количество реплик, но стоит помнить, что количество реплик у всех партиций в топике должно быть одинаковым.