Очереди сообщений — это очень большая тема, которую невозможно охватить парой статьей или роликов. Отчасти это связано с тем, что в настоящее время существует большое количество систем обмена сообщениями, реализующих схожие механизмы, но делающих это по-разному. В этой статье я хочу попытаться в общих чертах рассказать о том, что такое очереди сообщений, брокеры очередей сообщений, потоки (стримы), а также о том, зачем они нужны. В последующих статьях я постараюсь более предметно рассказать о конкретных реализациях очередей сообщений и их практическом применении.
Что такое очередь сообщений?
Если попытаться дать максимально точное определение термину «очередь сообщений», то получится следующее: очередь сообщений — это инструмент для реализации асинхронного межпроцессного или межпотокового взаимодействия. При этом в зависимости от ситуации очередь сообщений может быть использована как для реализации взаимодействия между потоками в рамках одной программы, так и между программами, которые выполняются на разных серверах, и даже расположенных в разных дата-центрах. Традиционно очереди сообщений изображаются на схемах в виде лежачего цилиндра.
Figure 1. Очередь сообщений
Название «очередь сообщений» дано данному инструменту неслучайно — при его реализации зачастую используется очередь — упорядоченная структура данных, реализующая дисциплину доступа «первый пришёл — первый вышел» (FIFO — first in, first out).
Figure 2. Очередь
Кроме непосредственно самой очереди сообщений существуют отправитель и получатель. Отправитель при необходимости добавляет новые сообщения в очередь, а получатель — получает сообщения из очереди при их наличии. Стоит отметить, что в классических очередях сообщений сообщение после успешной доставки удаляется. В некоторых реализациях в момент добавления сообщения отправителем в очередь допускается недоступность получателя, так как обмен сообщениями происходит асинхронно, и получатель получит новые сообщения после запуска.
Figure 3. Очередь сообщений с отправителем, получателем и сообщениями
Вы можете наблюдать много аналогов очередей сообщений в реальной жизни, например — доставка товаров в пункт выдачи заказов (ПВЗ) вашего любимого маркетплейса. Доставка заказов на дом — очень удобная услуга, но это так не всегда. Представьте себе — вы ожидаете дома курьера, откладывая свои дела, а курьер задерживается — он попал в пробку, у него спустило колесо или на складе упал стеллаж, и сотрудники склада не могут найти ваш заказ. Конечно, процесс ожидания заказа не будет синхронным, вы всё равно найдёте чем заняться (вы же не будете всё это время обновлять страницу заказа в приложении маркетплейса?), но вариант с доставкой товара в удобный для вас ПВЗ, вероятно, потребовал бы от вас меньше времени. При поступлении заказа в ПВЗ вы получили бы уведомление и по мере возможности забрали бы заказ, затратив минимум времени на ожидание.
В описанном выше примере пункт выдачи заказов и маркетплейс являются очередью сообщений, вы — получателем, продавец — отправителем.
Очереди сообщений без брокеров
Для реализации очередей сообщений в вашем приложении брокеры, о которых будет рассказано далее, могут и не понадобиться, например, когда вам нужно реализовать очередь сообщений в рамках одной программы для реализации каких-то внутренних механизмов. Для этого вы вполне можете написать какую-то собственную реализацию, используя стандартные для того или иного языка программирования (например, LinkedList в Java) средства.
Так же операционные системы предоставляют собственные реализации очередей сообщений, хотя операционные системы в данном случае можно считать брокерами.
Кроме этого есть готовые библиотеки, реализующие очереди сообщений без брокеров, например, проект ZeroMQ, «Zero» в названии которого красноречиво говорит о количестве брокеров.
Однако в случае с распределёнными системами реализация очередей сообщений в рамках отдельных сервисов усложняет сопровождение системы и порождает сложную схему межсервисных взаимодействий, к тому же возрастают требования к отказоустойчивости системы и гарантиями доставки сообщений, и в этих условиях брокеры очередей сообщений приходят на помощь.
Figure 4. Один из вариантов распределения очередей сообщений между компонентами
Figure 5. Ещё один из вариантов распределения очередей сообщений между компонентами
Брокеры очередей сообщений
Брокеры очередей сообщений часто используются в проектах с распределённой архитектурой, они содержат очереди сообщений, используемые компонентами системы и предоставляют инструменты для работы с ними.
Примеры брокеров очередей сообщений: Apache ActiveMQ (Classic и Artemis), IBM MQ, RedHat AMQ, NATS, RabbitMQ, Apache Qpid.
При использовании брокеров очередей сообщений сервисы начинают меньше зависеть друг от друга, и даже временная недоступность того или иного сервиса, возможно, не станет проблемой, так как очереди сообщений и сами сообщения хранятся на стороне брокера. Да, в этой схеме появляется новая точка отказа — брокер, но вопросы его надёжности решаются развёртыванием кластера брокеров. Впрочем, брокеры очередей сообщений сами по себе тоже имеют свои недостатки и нюансы.
К нюансам можно отнести, в целом, отсутствие универсальных протоколов взаимодействия с брокерами очередей сообщений. Да, такие протоколы на самом деле существуют: AMQP, MQTT, STOMP, но зачастую брокеры реализуют их с нюансами. Так, например, RabbitMQ получил поддержку AMQP 1.0 в 2024 году, спустя 12 лет после его появления! А до этого использовал AMQP версии 0.9.1. В то же время NATS использует собственный протокол и не поддерживает AMQP, хотя имеет поддержку MQTT 3.1.1. Иными словами, заменить один брокер сообщений на другой без дополнительных манипуляций не получится.
Теперь предлагаю поговорить о проблемах. Допустим, необходимо доставлять сообщения из одной очереди сообщений сразу нескольким получателям. Проблема заключается в том, что в классических очередях при наличии нескольких получателей сообщение получает лишь один из них, после чего сообщение удаляется из очереди. Решить эту проблему, конечно же, можно, причём сразу несколькими способами. Например, создав отдельную очередь для каждого получателя и реализовав компонент, который будет копировать сообщение из исходной очереди в очереди для конкретных получателей. Этот приём называется шаблоном интеграции корпоративных приложений (EIP — Enterprise Integration Pattern) «список получателей» (Recipient List), который среди прочих описан в книге «Шаблоны интеграции корпоративных приложений» за авторством Грегора Хопа и Бобби Вульфа. А вместо собственного компонента можно использовать готовые реализации EIP такие, как Apache Camel и Spring Integration.
Figure 6. Список получателей
Так как брокеры очередей сообщений не хранят прочитанные сообщения, может возникнуть и другая проблема — появление нового получателя, которому нужно получить события, возникшие до его появления. Возможно, кто-то предложит использовать всё тот же шаблон «список получателей», где одна из очередей будет выделена для постоянного хранения сообщений «на всякий случай», и копировать сообщения из этой резервной очереди в очередь нового получателя. Этот вариант вполне рабочий, пока в очереди не накопится какое-то критическое количество сообщений — в работе того же Apache ActiveMQ Classic появляются значительные просадки в производительности при появлении очередей с большим количеством сообщений, что ведёт к деградации производительности и других компонентов системы: отправителей и получателей.
Эти и другие проблемы решаются в другом классе программного обеспечения — в стриминговых системах.
Стриминговые системы
Современные информационные системы порождают невероятное количество событий: где-то логгируются события, происходящие в пользовательском интерфейсе, где-то используются события предметной области или вовсе реализована логика на основе событий (Event Sourcing), и все эти события надо не только доставлять до получателей, но и накапливать для анализа, аудита или последующей обработки.
Традиционные очереди сообщений с этими задачами справляются плохо, поэтому в какой-то момент появились стриминговые системы, где очередь сообщений заменена на схожую структуру данных — поток или стрим (stream) сообщений.
Примеры стриминговых систем: Apache Kafka, Apache Pulsar, NATS JetStream, RabbitMQ. Кроме этого, некоторые noSQL-БД предоставляют стриминговые API, например Redis.
В отличие от очереди, стрим предполагает только добавление новых элементов в его конец. Одно и то же сообщение может быть прочитано неограниченное количество раз, так как стрим предполагает удаление элементов только при истечении их срока хранения. А для того, чтобы получатель не получал уже прочитанные сообщения, во всех стриминговых системах реализовано некоторое подобие курсора или итератора, которое указывает на последнее прочитанное получателем сообщение в стриме.
Figure 7. Стрим с курсорами трёх получателей
Таким образом решается проблема с хранением сообщений и с доставкой одного сообщения нескольким получателям, в том числе и тем, которые будут созданы или запущены после возникновения сообщения.
Но тут возникает другая проблема, которой не было при использовании традиционных очередей сообщений — невозможность доставки сообщения исключительно одному получателю. В зависимости от реализации вы можете для нескольких получателей использовать один идентификатор, тогда курсор на стриме для них будет общий и сообщение будет получать лишь один из них (NATS JetStream). Или вы можете объединить несколько получателей в группу, и лишь один получатель из группы будет получать сообщение (Kafka).
Итого
Очередь сообщений — инструмент для реализации асинхронного межпроцессного взаимодействия. Основой очереди сообщений является очередь — упорядоченная структура данных, реализующая дисциплину доступа «первый пришёл — первый вышел» (FIFO — first in, first out). Отправитель добавляет новые сообщения в очередь, получатель вычитывает сообщения из очереди, после чего они удаляются. Реализовать очередь сообщений можно при помощи стандартных средств языков программирования (например, LinkedList в Java) или при помощи готовых решений (например, ZeroMQ). Очереди сообщений в большинстве реализаций ориентированы на передачу сообщений, а не на накопление или хранение.
Брокеры очередей сообщений представляют собой централизованные хранилища очередей сообщений, упрощая сопровождение распределённых систем, повышая их отказоустойчивость и предоставляя гарантии доставки сообщений. Примеры брокеров очередей сообщений: Apache ActiveMQ, NATS и RabbitMQ. Брокеры очередей сообщений могут использовать стандартные транспортные протоколы, такие как AMQP, MQTT или STOMP, но зачастую больше ориентированы на использование собственных протоколов.
В стриминговых системах очередь заменена потоком или стримом (stream), схожей структурой данных, которая предполагает удаление сообщений только по истечению срока их хранения. Благодаря этому сообщения из стрима могут получать все получатели, включая периодических получателей и получателей, появившихся после возникновения события. Примеры стриминговых систем: Apache Kafka, Apache Pulsar, NATS JetStream. Стриминговые системы запоминают, какое сообщение последним было прочитано получателем в стриме, и при следующем запросе сообщений возвращают только новые сообщения, поэтому получатель может получить одно и то же сообщение несколько раз только при необходимости. В отличие от очередей, сообщения в стриме могут храниться какое-то продолжительное время или даже бесконечно долго, предоставляя больше свободы для аналитики.
Маршрутизировать сообщения между очередями или стримами можно при помощи шаблонов интеграции корпоративных приложений (EIP — Enterprise Integration Patterins), реализуя их самостоятельно, либо используя для этого готовые решения вроде Apache Camel и Spring Integration.
Полезные ссылки
- ZeroMQ — очереди сообщений без брокеров
- Apache ActiveMQ — брокер очередей сообщений
- Apache Qpid — брокер очередей сообщений
- NATS — брокер очередей сообщений и стриминговый сервис
- RabbitMQ — брокер очередей сообщений и стриминговый сервис
- Apache Kafka — стриминговый сервис
- Apache Pulsar — стриминговый сервис
- Apache Camel — EIP-фреймворк
- Spring Integration — EIP-фреймворк