JMS
ConnectionFactory
接口为与 JMS broker 交互提供了创建 Connection
的标准方法。
虽然 Spring 需要 ConnectionFactory
来使用 JMS,但通常你无需直接操作它,可以依赖更高级的消息抽象。
(详见 Spring Framework 参考文档的 相关章节。)
Spring Boot 也会自动配置发送和接收消息所需的基础设施。
ActiveMQ "Classic" 支持
当 classpath 上存在 ActiveMQ "Classic" 时,Spring Boot 可配置 ConnectionFactory
。
如果 broker 存在,且未通过配置指定 broker URL 且未禁用嵌入式 broker,则会自动启动并配置一个嵌入式 broker。
如果你使用 spring-boot-starter-activemq ,则会自动提供连接 ActiveMQ "Classic" 实例所需的依赖,以及集成 JMS 的 Spring 基础设施。
向应用添加 org.apache.activemq:activemq-broker 可启用嵌入式 broker。
|
ActiveMQ "Classic" 的配置通过外部配置属性 spring.activemq.*
控制。
如果 classpath 上有 activemq-broker
,ActiveMQ "Classic" 会自动配置为使用 VM transport,即在同一 JVM 实例中启动 broker。
你可以通过如下配置 spring.activemq.embedded.enabled
属性禁用嵌入式 broker:
-
Properties
-
YAML
spring.activemq.embedded.enabled=false
spring:
activemq:
embedded:
enabled: false
如果你配置了 broker URL,也会禁用嵌入式 broker,例如:
-
Properties
-
YAML
spring.activemq.broker-url=tcp://192.168.1.210:9876
spring.activemq.user=admin
spring.activemq.password=secret
spring:
activemq:
broker-url: "tcp://192.168.1.210:9876"
user: "admin"
password: "secret"
如需完全控制嵌入式 broker,详见 ActiveMQ "Classic" 文档。
默认情况下,CachingConnectionFactory
会用合理的设置包装原生 ConnectionFactory
,这些设置可通过 spring.jms.*
外部配置属性控制:
-
Properties
-
YAML
spring.jms.cache.session-cache-size=5
spring:
jms:
cache:
session-cache-size: 5
如需使用原生连接池,可添加 org.messaginghub:pooled-jms
依赖,并按如下方式配置 JmsPoolConnectionFactory
:
-
Properties
-
YAML
spring.activemq.pool.enabled=true
spring.activemq.pool.max-connections=50
spring:
activemq:
pool:
enabled: true
max-connections: 50
更多支持选项请参见 ActiveMQProperties 。
如需更高级自定义,可注册任意数量实现 ActiveMQConnectionFactoryCustomizer 的 bean。
|
默认情况下,ActiveMQ "Classic" 会在目的地不存在时自动创建目的地,因此目的地会根据提供的名称解析。
ActiveMQ Artemis 支持
当检测到 classpath 上有 ActiveMQ Artemis 时,Spring Boot 可自动配置 ConnectionFactory
。
如果 broker 存在,且未显式设置 mode 属性,则会自动启动并配置嵌入式 broker。
支持的模式有 embedded
(明确要求嵌入式 broker,若 classpath 上无 broker 则报错)和 native
(通过 netty
协议连接 broker)。
配置为后者时,Spring Boot 会配置一个连接本地 broker 的 ConnectionFactory
,使用默认设置。
如果你使用 spring-boot-starter-artemis ,则会自动提供连接现有 ActiveMQ Artemis 实例所需的依赖,以及集成 JMS 的 Spring 基础设施。
向应用添加 org.apache.activemq:artemis-jakarta-server 可启用嵌入式模式。
|
ActiveMQ Artemis 的配置通过外部配置属性 spring.artemis.*
控制。例如,你可以在 application.properties
中声明如下配置:
-
Properties
-
YAML
spring.artemis.mode=native
spring.artemis.broker-url=tcp://192.168.1.210:9876
spring.artemis.user=admin
spring.artemis.password=secret
spring:
artemis:
mode: native
broker-url: "tcp://192.168.1.210:9876"
user: "admin"
password: "secret"
嵌入 broker 时,你可以选择是否启用持久化,并列出应可用的目的地。
这些可以用逗号分隔的列表指定(以默认选项创建),也可以定义 JMSQueueConfiguration
或 TopicConfiguration
类型的 bean 以实现高级队列和主题配置。
默认情况下,CachingConnectionFactory
会用合理的设置包装原生 ConnectionFactory
,这些设置可通过 spring.jms.*
外部配置属性控制:
-
Properties
-
YAML
spring.jms.cache.session-cache-size=5
spring:
jms:
cache:
session-cache-size: 5
如需使用原生连接池,可添加 org.messaginghub:pooled-jms
依赖,并按如下方式配置 JmsPoolConnectionFactory
:
-
Properties
-
YAML
spring.artemis.pool.enabled=true
spring.artemis.pool.max-connections=50
spring:
artemis:
pool:
enabled: true
max-connections: 50
更多支持选项请参见 ArtemisProperties
。
无需 JNDI 查找,目的地会根据其名称解析,名称可来自 ActiveMQ Artemis 配置的 name
属性或配置中提供的名称。
使用 JNDI ConnectionFactory
如果你在应用服务器中运行应用,Spring Boot 会尝试通过 JNDI 查找 JMS ConnectionFactory
。
默认会检查 java:/JmsXA
和 java:/XAConnectionFactory
。
如需指定其他位置,可通过 spring.jms.jndi-name
属性配置,例如:
-
Properties
-
YAML
spring.jms.jndi-name=java:/MyConnectionFactory
spring:
jms:
jndi-name: "java:/MyConnectionFactory"
发送消息
Spring 的 JmsTemplate
已自动配置,你可以像下面这样直接注入到自己的 bean 中:
-
Java
-
Kotlin
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
private final JmsTemplate jmsTemplate;
public MyBean(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
// ...
public void someMethod() {
this.jmsTemplate.convertAndSend("hello");
}
}
import org.springframework.jms.core.JmsTemplate
import org.springframework.stereotype.Component
@Component
class MyBean(private val jmsTemplate: JmsTemplate) {
// ...
fun someMethod() {
jmsTemplate.convertAndSend("hello")
}
}
JmsMessagingTemplate 也可以以类似方式注入。
如果定义了 DestinationResolver 或 MessageConverter bean,则会自动关联到自动配置的 JmsTemplate 。
|
接收消息
当 JMS 基础设施存在时,任何 bean 都可以通过注解 @JmsListener
创建监听端点。
如果未定义 JmsListenerContainerFactory
,则会自动配置一个默认工厂。
如果定义了 DestinationResolver
、MessageConverter
或 ExceptionListener
bean,则会自动关联到默认工厂。
大多数场景下,消息监听容器应配置为使用原生 ConnectionFactory
,这样每个监听容器都有自己的连接,便于本地恢复。
自动配置会用 ConnectionFactoryUnwrapper
从自动配置的连接工厂中解包原生连接工厂。
自动配置仅解包 CachedConnectionFactory 。
|
默认情况下,默认工厂是事务性的。
如果运行环境中存在 JtaTransactionManager
,则会默认关联到监听容器。
否则会启用 sessionTransacted
标志。
在后者场景下,你可以在监听方法(或其委托)上添加 @Transactional
,将本地数据存储事务与接收消息的处理关联起来。
这样可确保本地事务完成后再确认接收消息。
这也包括在同一 JMS 会话上执行的响应消息发送。
以下组件会在 someQueue
目的地上创建一个监听端点:
-
Java
-
Kotlin
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
@JmsListener(destination = "someQueue")
public void processMessage(String content) {
// ...
}
}
import org.springframework.jms.annotation.JmsListener
import org.springframework.stereotype.Component
@Component
class MyBean {
@JmsListener(destination = "someQueue")
fun processMessage(content: String?) {
// ...
}
}
更多详情请参见 @EnableJms API 文档。
|
如果你需要创建更多 JmsListenerContainerFactory
实例,或想覆盖默认配置,Spring Boot 提供了 DefaultJmsListenerContainerFactoryConfigurer
,可用来以与自动配置工厂相同的设置初始化 DefaultJmsListenerContainerFactory
。
例如,下面的示例暴露了一个使用特定 MessageConverter
的工厂:
-
Java
-
Kotlin
import jakarta.jms.ConnectionFactory;
import org.springframework.boot.autoconfigure.jms.DefaultJmsListenerContainerFactoryConfigurer;
import org.springframework.boot.jms.ConnectionFactoryUnwrapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
@Configuration(proxyBeanMethods = false)
public class MyJmsConfiguration {
@Bean
public DefaultJmsListenerContainerFactory myFactory(DefaultJmsListenerContainerFactoryConfigurer configurer,
ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
configurer.configure(factory, ConnectionFactoryUnwrapper.unwrapCaching(connectionFactory));
factory.setMessageConverter(new MyMessageConverter());
return factory;
}
}
import jakarta.jms.ConnectionFactory
import org.springframework.boot.autoconfigure.jms.DefaultJmsListenerContainerFactoryConfigurer
import org.springframework.boot.jms.ConnectionFactoryUnwrapper
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.jms.config.DefaultJmsListenerContainerFactory
@Configuration(proxyBeanMethods = false)
class MyJmsConfiguration {
@Bean
fun myFactory(configurer: DefaultJmsListenerContainerFactoryConfigurer,
connectionFactory: ConnectionFactory): DefaultJmsListenerContainerFactory {
val factory = DefaultJmsListenerContainerFactory()
configurer.configure(factory, ConnectionFactoryUnwrapper.unwrapCaching(connectionFactory))
factory.setMessageConverter(MyMessageConverter())
return factory
}
}
上述示例中的自定义通过 ConnectionFactoryUnwrapper ,以与自动配置工厂相同的方式将原生连接工厂关联到消息监听容器。
|
然后你可以在任何带有 @JmsListener
注解的方法中使用该工厂,如下所示:
-
Java
-
Kotlin
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
@JmsListener(destination = "someQueue", containerFactory = "myFactory")
public void processMessage(String content) {
// ...
}
}
import org.springframework.jms.annotation.JmsListener
import org.springframework.stereotype.Component
@Component
class MyBean {
@JmsListener(destination = "someQueue", containerFactory = "myFactory")
fun processMessage(content: String?) {
// ...
}
}