JMS
ConnectionFactory
接口提供了一种创建 Connection
的标准方法,用于与 JMS broker 进行交互。
虽然 Spring 需要 ConnectionFactory
来使用 JMS,但你通常不需要直接使用它,而是可以依赖更高级别的消息抽象。
(有关详细信息,请参阅 Spring Framework 参考文档的 相关部分。)
Spring Boot 还会自动配置发送和接收消息所需的基础设施。
ActiveMQ "Classic" 支持
当 ActiveMQ "Classic" 在类路径上可用时,Spring Boot 可以配置 ConnectionFactory
。
如果存在 broker,则会自动启动和配置嵌入式 broker(前提是没有通过配置指定 broker URL 且未在配置中禁用嵌入式 broker)。
注意:如果你使用 spring-boot-starter-activemq
,则会提供连接到 ActiveMQ "Classic" 实例所需的依赖项,以及用于与 JMS 集成的 Spring 基础设施。
将 org.apache.activemq:activemq-broker
添加到你的应用程序中,你就可以使用嵌入式 broker。
ActiveMQ "Classic" 配置由 spring.activemq.*
中的外部配置属性控制。
如果 activemq-broker
在类路径上,ActiveMQ "Classic" 会自动配置为使用 VM transport,它会在同一个 JVM 实例中启动一个嵌入式 broker。
你可以通过配置 spring.activemq.embedded.enabled
属性来禁用嵌入式 broker,如下例所示:
-
Properties
-
YAML
spring.activemq.embedded.enabled=false
spring:
activemq:
embedded:
enabled: false
如果你配置了 broker URL,嵌入式 broker 也会被禁用,如下例所示:
-
Properties
-
YAML
spring.activemq.broker-url=tcp://192.168.1.210:9876
spring.activemq.user=admin
spring.activemq.password=secret
spring:
activemq:
broker-url: "tcp://192.168.1.210:9876"
user: "admin"
password: "secret"
如果你想完全控制嵌入式 broker,请参阅 ActiveMQ "Classic" 文档 获取更多信息。
默认情况下,CachingConnectionFactory
会包装原生 ConnectionFactory
,并提供合理的设置,你可以通过 spring.jms.*
中的外部配置属性来控制:
-
Properties
-
YAML
spring.jms.cache.session-cache-size=5
spring:
jms:
cache:
session-cache-size: 5
如果你更希望使用原生连接池,可以通过添加 org.messaginghub:pooled-jms
依赖并相应地配置 JmsPoolConnectionFactory
来实现,如下例所示:
-
Properties
-
YAML
spring.activemq.pool.enabled=true
spring.activemq.pool.max-connections=50
spring:
activemq:
pool:
enabled: true
max-connections: 50
提示:有关更多支持的选项,请参阅 ActiveMQProperties
。
你还可以注册任意数量的实现 ActiveMQConnectionFactoryCustomizer
的 bean 来进行更高级的自定义。
默认情况下,如果目标不存在,ActiveMQ "Classic" 会创建一个目标,以便根据提供的名称解析目标。
ActiveMQ Artemis 支持
当检测到 ActiveMQ Artemis 在类路径上可用时,Spring Boot 可以自动配置 ConnectionFactory
。
如果存在 broker,则会自动启动和配置嵌入式 broker(除非已明确设置 mode 属性)。
支持的模式有 embedded
(明确表示需要嵌入式 broker,如果类路径上没有 broker 则应出错)和 native
(使用 netty
传输协议连接到 broker)。
当配置后者时,Spring Boot 会配置一个 ConnectionFactory
,该工厂使用默认设置连接到本地机器上运行的 broker。
注意:如果你使用 spring-boot-starter-artemis
,则会提供连接到现有 ActiveMQ Artemis 实例所需的依赖项,以及用于与 JMS 集成的 Spring 基础设施。
将 org.apache.activemq:artemis-jakarta-server
添加到你的应用程序中,你就可以使用嵌入式模式。
ActiveMQ Artemis 配置由 spring.artemis.*
中的外部配置属性控制。
例如,你可以在 application.properties
中声明以下部分:
-
Properties
-
YAML
spring.artemis.mode=native
spring.artemis.broker-url=tcp://192.168.1.210:9876
spring.artemis.user=admin
spring.artemis.password=secret
spring:
artemis:
mode: native
broker-url: "tcp://192.168.1.210:9876"
user: "admin"
password: "secret"
在嵌入 broker 时,你可以选择是否启用持久性,并列出应该可用的目标。
这些可以指定为逗号分隔的列表,以使用默认选项创建它们,或者你可以定义类型为 JMSQueueConfiguration
或 TopicConfiguration
的 bean,分别用于高级队列和主题配置。
默认情况下,CachingConnectionFactory
会包装原生 ConnectionFactory
,并提供合理的设置,你可以通过 spring.jms.*
中的外部配置属性来控制:
-
Properties
-
YAML
spring.jms.cache.session-cache-size=5
spring:
jms:
cache:
session-cache-size: 5
如果你更希望使用原生连接池,可以通过添加 org.messaginghub:pooled-jms
依赖并相应地配置 JmsPoolConnectionFactory
来实现,如下例所示:
-
Properties
-
YAML
spring.artemis.pool.enabled=true
spring.artemis.pool.max-connections=50
spring:
artemis:
pool:
enabled: true
max-connections: 50
有关更多支持的选项,请参阅 ArtemisProperties
。
不涉及 JNDI 查找,目标会根据其名称进行解析,使用 ActiveMQ Artemis 配置中的 name
属性或通过配置提供的名称。
使用 JNDI ConnectionFactory
如果你的应用程序在应用服务器中运行,Spring Boot 会尝试使用 JNDI 定位 JMS ConnectionFactory
。
默认情况下,会检查 java:/JmsXA
和 java:/XAConnectionFactory
位置。
如果你需要指定替代位置,可以使用 spring.jms.jndi-name
属性,如下例所示:
-
Properties
-
YAML
spring.jms.jndi-name=java:/MyConnectionFactory
spring:
jms:
jndi-name: "java:/MyConnectionFactory"
发送消息
Spring 的 JmsTemplate
是自动配置的,你可以直接将其自动装配到你自己的 bean 中,如下例所示:
-
Java
-
Kotlin
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
private final JmsTemplate jmsTemplate;
public MyBean(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
// ...
public void someMethod() {
this.jmsTemplate.convertAndSend("hello");
}
}
import org.springframework.jms.core.JmsTemplate
import org.springframework.stereotype.Component
@Component
class MyBean(private val jmsTemplate: JmsTemplate) {
// ...
fun someMethod() {
jmsTemplate.convertAndSend("hello")
}
}
注意:JmsMessagingTemplate
可以以类似的方式注入。
如果定义了 DestinationResolver
或 MessageConverter
bean,它会自动关联到自动配置的 JmsTemplate
。
接收消息
当 JMS 基础设施存在时,任何 bean 都可以用 @JmsListener
注解来创建监听器端点。
如果没有定义 JmsListenerContainerFactory
,则会自动配置一个默认的工厂。
如果定义了 DestinationResolver
、MessageConverter
或 ExceptionListener
bean,它们会自动关联到默认工厂。
在大多数情况下,消息监听器容器应该针对原生 ConnectionFactory
进行配置。
这样每个监听器容器都有自己的连接,这使其在本地恢复方面具有完全的责任。
自动配置使用 ConnectionFactoryUnwrapper
从自动配置的工厂中解包原生连接工厂。
注意:自动配置只解包 CachedConnectionFactory
。
默认情况下,默认工厂是事务性的。
如果你在存在 JtaTransactionManager
的基础设施中运行,它会默认关联到监听器容器。
如果没有,则启用 sessionTransacted
标志。
在后一种情况下,你可以通过在监听器方法(或其委托)上添加 @Transactional
来将本地数据存储事务关联到传入消息的处理。
这确保了在本地事务完成后确认传入的消息。
这还包括在同一 JMS 会话上执行的响应消息的发送。
以下组件在 someQueue
目标上创建监听器端点:
-
Java
-
Kotlin
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
@JmsListener(destination = "someQueue")
public void processMessage(String content) {
// ...
}
}
import org.springframework.jms.annotation.JmsListener
import org.springframework.stereotype.Component
@Component
class MyBean {
@JmsListener(destination = "someQueue")
fun processMessage(content: String?) {
// ...
}
}
提示:有关更多详细信息,请参阅 @EnableJms
API 文档。
如果你需要创建更多 JmsListenerContainerFactory
实例或者想要覆盖默认配置,Spring Boot 提供了 DefaultJmsListenerContainerFactoryConfigurer
,你可以用它来初始化 DefaultJmsListenerContainerFactory
,使用与自动配置相同的设置。
例如,以下示例暴露了另一个使用特定 MessageConverter
的工厂:
-
Java
-
Kotlin
import jakarta.jms.ConnectionFactory;
import org.springframework.boot.autoconfigure.jms.DefaultJmsListenerContainerFactoryConfigurer;
import org.springframework.boot.jms.ConnectionFactoryUnwrapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
@Configuration(proxyBeanMethods = false)
public class MyJmsConfiguration {
@Bean
public DefaultJmsListenerContainerFactory myFactory(DefaultJmsListenerContainerFactoryConfigurer configurer,
ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
configurer.configure(factory, ConnectionFactoryUnwrapper.unwrapCaching(connectionFactory));
factory.setMessageConverter(new MyMessageConverter());
return factory;
}
}
import jakarta.jms.ConnectionFactory
import org.springframework.boot.autoconfigure.jms.DefaultJmsListenerContainerFactoryConfigurer
import org.springframework.boot.jms.ConnectionFactoryUnwrapper
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.jms.config.DefaultJmsListenerContainerFactory
@Configuration(proxyBeanMethods = false)
class MyJmsConfiguration {
@Bean
fun myFactory(configurer: DefaultJmsListenerContainerFactoryConfigurer,
connectionFactory: ConnectionFactory): DefaultJmsListenerContainerFactory {
val factory = DefaultJmsListenerContainerFactory()
configurer.configure(factory, ConnectionFactoryUnwrapper.unwrapCaching(connectionFactory))
factory.setMessageConverter(MyMessageConverter())
return factory
}
}
注意:在上面的示例中,自定义使用 ConnectionFactoryUnwrapper
将原生连接工厂关联到消息监听器容器,与自动配置的工厂相同。
然后你可以在任何 @JmsListener
注解的方法中使用该工厂,如下所示:
-
Java
-
Kotlin
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
@JmsListener(destination = "someQueue", containerFactory = "myFactory")
public void processMessage(String content) {
// ...
}
}
import org.springframework.jms.annotation.JmsListener
import org.springframework.stereotype.Component
@Component
class MyBean {
@JmsListener(destination = "someQueue", containerFactory = "myFactory")
fun processMessage(content: String?) {
// ...
}
}