Отправка SSE из JMS при помощи Spring WebFlux

Когда речь заходит о необходимости отправки сообщений клиенту от сервера по HTTP, варианты решения, как правило, ограничиваются использованием вебсокетов или событий отправляемых сервером (Server-sent Events, SSE). Если требуется отправить клиенту сообщение из JMS, то обычно реализация этой задачи сводится к использованию STOMP через вебсокеты, как это описано в официальной документации Spring Framework.

Но что делать, если в проекте для отправки сообщений пользователю уже используются SSE, а добавлять поддержу вебсокетов ради реализации этой функциональности нет желания? Теоретически всё достаточно просто: при появлении новой SSE-подписки нужно динамически создавать новый слушатель сообщений JMS, который при получении сообщения будет просто отправлять клиенту событие. Ну и при завершении подписки на события слушатель сообщений тоже должен завершать свою работу.

На практике всё оказалось несколько сложнее, поскольку в официальной документации Spring Framework не описано, как можно динамически создавать и использовать слушатели сообщений. Но некоторые зацепки официальная документация всё же даёт. Порядок действий при появлении нового подписчика на события достаточно прост:

  1. Создать эндпоинт для слушателя событий (экземпляр JmsListenerEndpoint)
  2. Создать слушатель событий (MessageListener), в котором получение сообщений будут приводить к публикации событий
  3. Создать контейнер (MessageListenerContainer) для слушателя и запустить его

Ниже приведён код в качестве примера. Он может быть неоптимален с точки зрения использования ресурсов, поскольку преследует целью только демонстрацию работоспособности концепции. Вы можете использовать его на свой страх и риск.

Реализация при помощи аннотированного контроллера

Первый способ реализовать передачу JMS-сообщений в виде потока SSE — при помощи аннотированного контроллера.

При реализации SSE в Spring WebFlux контроллер должен возвращать Flux, объект которого я создал при помощи статического метода create. В качестве аргумента этот метод принимает объект типа Consumer<FluxSink>, при помощи которого я реализовал связь между JMS и SSE. (#2)

Порядок действий в результате получился тот же, что описан выше:

  1. Первым делом я создал эндпоинт (#3) — объект типа SimpleJmsListenerEndpoint
  2. Для эндпоинта я создал новый слушатель сообщений (#4), в котором содержимое текстовых сообщений (javax.jms.TextMessage) я просто перенаправил в созданный Flux при помощи вызова метода FluxSink.next() (#5)
  3. Ну и наконец я создал контейнер слушателя сообщений для эндпоинта (#6) при помощи методаAbstractJmsListenerContainerFactory.createListenerContainer(), сконфигурировал его и запустил (#8)

Естественно нужно позаботиться об остановке подписки на сообщения в случае, если поток событий прерывается, для этого при отмене и завершении подписки на события нужно вызывать метод shutdown()контейнера. (#7).

В контроллер внедрён объект класса AbstractJmsListenerContainerFactory, который используется для создания нового контейнера. При использовании Spring Boot с исходными настройками контекста приложения в нём будет зарегистрирован объект типа DefaultJmsListenerContainerFactory. Если вы не используете Spring Boot, то вы должны самостоятельно зарегистрировать компонент класса DefaultJmsListenerContainerFactory или SimpleJmsListenerContainerFactory в контексте приложения.

И контейнер и эндпоинт могут быть сконфигурированы при помощи set-методов (#6).

Реализация при помощи функционального обработчика

Альтернативный способ — при помощи функционального обработчика.

У меня зарегистрирован в качестве компонента маршрутизатор:

И обработчик

Логика его работы идентична логике работы аннотированного контроллера, показанного выше.

Возможность реализации в Servlet API

Такая функциональность может быть реализована и с Servlet API, разница в коде будет минимальна. Однако, при использовании Servlet API возникает проблема, связанная с отсутствием функциональности, которая позволяла бы отследить окончание подписки на события. Для решения этой проблемы потребуется реализовать функциональность, аналогичную HEARTBEAT в STOMP.