Когда речь заходит о необходимости отправки сообщений клиенту от сервера по HTTP, варианты решения, как правило, ограничиваются использованием вебсокетов или событий отправляемых сервером (Server-sent Events, SSE). Если требуется отправить клиенту сообщение из JMS, то обычно реализация этой задачи сводится к использованию STOMP через вебсокеты, как это описано в официальной документации Spring Framework.
Но что делать, если в проекте для отправки сообщений пользователю уже используются SSE, а добавлять поддержу вебсокетов ради реализации этой функциональности нет желания? Теоретически всё достаточно просто: при появлении новой SSE-подписки нужно динамически создавать новый слушатель сообщений JMS, который при получении сообщения будет просто отправлять клиенту событие. Ну и при завершении подписки на события слушатель сообщений тоже должен завершать свою работу.
На практике всё оказалось несколько сложнее, поскольку в официальной документации Spring Framework не описано, как можно динамически создавать и использовать слушатели сообщений. Но некоторые зацепки официальная документация всё же даёт. Порядок действий при появлении нового подписчика на события достаточно прост:
- Создать эндпоинт для слушателя событий (экземпляр
JmsListenerEndpoint
) - Создать слушатель событий (
MessageListener
), в котором получение сообщений будут приводить к публикации событий - Создать контейнер (
MessageListenerContainer
) для слушателя и запустить его
Ниже приведён код в качестве примера. Он может быть неоптимален с точки зрения использования ресурсов, поскольку преследует целью только демонстрацию работоспособности концепции. Вы можете использовать его на свой страх и риск.
Реализация при помощи аннотированного контроллера
Первый способ реализовать передачу JMS-сообщений в виде потока SSE — при помощи аннотированного контроллера.
При реализации SSE в Spring WebFlux контроллер должен возвращать Flux
, объект которого я создал при помощи статического метода create
. В качестве аргумента этот метод принимает объект типа Consumer<FluxSink>
, при помощи которого я реализовал связь между JMS и SSE. (#2)
Порядок действий в результате получился тот же, что описан выше:
- Первым делом я создал эндпоинт (#3) — объект типа
SimpleJmsListenerEndpoint
- Для эндпоинта я создал новый слушатель сообщений (#4), в котором содержимое текстовых сообщений (
javax.jms.TextMessage
) я просто перенаправил в созданныйFlux
при помощи вызова методаFluxSink.next()
(#5) - Ну и наконец я создал контейнер слушателя сообщений для эндпоинта (#6) при помощи метода
AbstractJmsListenerContainerFactory.createListenerContainer()
, сконфигурировал его и запустил (#8)
Естественно нужно позаботиться об остановке подписки на сообщения в случае, если поток событий прерывается, для этого при отмене и завершении подписки на события нужно вызывать метод shutdown()
контейнера. (#7).
В контроллер внедрён объект класса AbstractJmsListenerContainerFactory
, который используется для создания нового контейнера. При использовании Spring Boot с исходными настройками контекста приложения в нём будет зарегистрирован объект типа DefaultJmsListenerContainerFactory
. Если вы не используете Spring Boot, то вы должны самостоятельно зарегистрировать компонент класса DefaultJmsListenerContainerFactory
или SimpleJmsListenerContainerFactory
в контексте приложения.
И контейнер и эндпоинт могут быть сконфигурированы при помощи set
-методов (#6).
Реализация при помощи функционального обработчика
Альтернативный способ — при помощи функционального обработчика.
У меня зарегистрирован в качестве компонента маршрутизатор:
И обработчик
Логика его работы идентична логике работы аннотированного контроллера, показанного выше.
Возможность реализации в Servlet API
Такая функциональность может быть реализована и с Servlet API, разница в коде будет минимальна. Однако, при использовании Servlet API возникает проблема, связанная с отсутствием функциональности, которая позволяла бы отследить окончание подписки на события. Для решения этой проблемы потребуется реализовать функциональность, аналогичную HEARTBEAT в STOMP.