JMS

ConnectionFactory 接口为与 JMS broker 交互提供了创建 Connection 的标准方法。 虽然 Spring 需要 ConnectionFactory 来使用 JMS,但通常你无需直接操作它,可以依赖更高级的消息抽象。 (详见 Spring Framework 参考文档的 相关章节。) Spring Boot 也会自动配置发送和接收消息所需的基础设施。

ActiveMQ "Classic" 支持

当 classpath 上存在 ActiveMQ "Classic" 时,Spring Boot 可配置 ConnectionFactory。 如果 broker 存在,且未通过配置指定 broker URL 且未禁用嵌入式 broker,则会自动启动并配置一个嵌入式 broker。

如果你使用 spring-boot-starter-activemq,则会自动提供连接 ActiveMQ "Classic" 实例所需的依赖,以及集成 JMS 的 Spring 基础设施。 向应用添加 org.apache.activemq:activemq-broker 可启用嵌入式 broker。

ActiveMQ "Classic" 的配置通过外部配置属性 spring.activemq.* 控制。

如果 classpath 上有 activemq-broker,ActiveMQ "Classic" 会自动配置为使用 VM transport,即在同一 JVM 实例中启动 broker。

你可以通过如下配置 spring.activemq.embedded.enabled 属性禁用嵌入式 broker:

  • Properties

  • YAML

spring.activemq.embedded.enabled=false
spring:
  activemq:
    embedded:
      enabled: false

如果你配置了 broker URL,也会禁用嵌入式 broker,例如:

  • Properties

  • YAML

spring.activemq.broker-url=tcp://192.168.1.210:9876
spring.activemq.user=admin
spring.activemq.password=secret
spring:
  activemq:
    broker-url: "tcp://192.168.1.210:9876"
    user: "admin"
    password: "secret"

如需完全控制嵌入式 broker,详见 ActiveMQ "Classic" 文档

默认情况下,CachingConnectionFactory 会用合理的设置包装原生 ConnectionFactory,这些设置可通过 spring.jms.* 外部配置属性控制:

  • Properties

  • YAML

spring.jms.cache.session-cache-size=5
spring:
  jms:
    cache:
      session-cache-size: 5

如需使用原生连接池,可添加 org.messaginghub:pooled-jms 依赖,并按如下方式配置 JmsPoolConnectionFactory

  • Properties

  • YAML

spring.activemq.pool.enabled=true
spring.activemq.pool.max-connections=50
spring:
  activemq:
    pool:
      enabled: true
      max-connections: 50
更多支持选项请参见 ActiveMQProperties。 如需更高级自定义,可注册任意数量实现 ActiveMQConnectionFactoryCustomizer 的 bean。

默认情况下,ActiveMQ "Classic" 会在目的地不存在时自动创建目的地,因此目的地会根据提供的名称解析。

ActiveMQ Artemis 支持

当检测到 classpath 上有 ActiveMQ Artemis 时,Spring Boot 可自动配置 ConnectionFactory。 如果 broker 存在,且未显式设置 mode 属性,则会自动启动并配置嵌入式 broker。 支持的模式有 embedded(明确要求嵌入式 broker,若 classpath 上无 broker 则报错)和 native(通过 netty 协议连接 broker)。 配置为后者时,Spring Boot 会配置一个连接本地 broker 的 ConnectionFactory,使用默认设置。

如果你使用 spring-boot-starter-artemis,则会自动提供连接现有 ActiveMQ Artemis 实例所需的依赖,以及集成 JMS 的 Spring 基础设施。 向应用添加 org.apache.activemq:artemis-jakarta-server 可启用嵌入式模式。

ActiveMQ Artemis 的配置通过外部配置属性 spring.artemis.* 控制。例如,你可以在 application.properties 中声明如下配置:

  • Properties

  • YAML

spring.artemis.mode=native
spring.artemis.broker-url=tcp://192.168.1.210:9876
spring.artemis.user=admin
spring.artemis.password=secret
spring:
  artemis:
    mode: native
    broker-url: "tcp://192.168.1.210:9876"
    user: "admin"
    password: "secret"

嵌入 broker 时,你可以选择是否启用持久化,并列出应可用的目的地。 这些可以用逗号分隔的列表指定(以默认选项创建),也可以定义 JMSQueueConfigurationTopicConfiguration 类型的 bean 以实现高级队列和主题配置。

默认情况下,CachingConnectionFactory 会用合理的设置包装原生 ConnectionFactory,这些设置可通过 spring.jms.* 外部配置属性控制:

  • Properties

  • YAML

spring.jms.cache.session-cache-size=5
spring:
  jms:
    cache:
      session-cache-size: 5

如需使用原生连接池,可添加 org.messaginghub:pooled-jms 依赖,并按如下方式配置 JmsPoolConnectionFactory

  • Properties

  • YAML

spring.artemis.pool.enabled=true
spring.artemis.pool.max-connections=50
spring:
  artemis:
    pool:
      enabled: true
      max-connections: 50

更多支持选项请参见 ArtemisProperties

无需 JNDI 查找,目的地会根据其名称解析,名称可来自 ActiveMQ Artemis 配置的 name 属性或配置中提供的名称。

使用 JNDI ConnectionFactory

如果你在应用服务器中运行应用,Spring Boot 会尝试通过 JNDI 查找 JMS ConnectionFactory。 默认会检查 java:/JmsXAjava:/XAConnectionFactory。 如需指定其他位置,可通过 spring.jms.jndi-name 属性配置,例如:

  • Properties

  • YAML

spring.jms.jndi-name=java:/MyConnectionFactory
spring:
  jms:
    jndi-name: "java:/MyConnectionFactory"

发送消息

Spring 的 JmsTemplate 已自动配置,你可以像下面这样直接注入到自己的 bean 中:

  • Java

  • Kotlin

import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	private final JmsTemplate jmsTemplate;

	public MyBean(JmsTemplate jmsTemplate) {
		this.jmsTemplate = jmsTemplate;
	}

	// ...

	public void someMethod() {
		this.jmsTemplate.convertAndSend("hello");
	}

}
import org.springframework.jms.core.JmsTemplate
import org.springframework.stereotype.Component

@Component
class MyBean(private val jmsTemplate: JmsTemplate) {

	// ...

	fun someMethod() {
		jmsTemplate.convertAndSend("hello")
	}

}
JmsMessagingTemplate 也可以以类似方式注入。 如果定义了 DestinationResolverMessageConverter bean,则会自动关联到自动配置的 JmsTemplate

接收消息

当 JMS 基础设施存在时,任何 bean 都可以通过注解 @JmsListener 创建监听端点。 如果未定义 JmsListenerContainerFactory,则会自动配置一个默认工厂。 如果定义了 DestinationResolverMessageConverterExceptionListener bean,则会自动关联到默认工厂。

大多数场景下,消息监听容器应配置为使用原生 ConnectionFactory,这样每个监听容器都有自己的连接,便于本地恢复。 自动配置会用 ConnectionFactoryUnwrapper 从自动配置的连接工厂中解包原生连接工厂。

自动配置仅解包 CachedConnectionFactory

默认情况下,默认工厂是事务性的。 如果运行环境中存在 JtaTransactionManager,则会默认关联到监听容器。 否则会启用 sessionTransacted 标志。 在后者场景下,你可以在监听方法(或其委托)上添加 @Transactional,将本地数据存储事务与接收消息的处理关联起来。 这样可确保本地事务完成后再确认接收消息。 这也包括在同一 JMS 会话上执行的响应消息发送。

以下组件会在 someQueue 目的地上创建一个监听端点:

  • Java

  • Kotlin

import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	@JmsListener(destination = "someQueue")
	public void processMessage(String content) {
		// ...
	}

}
import org.springframework.jms.annotation.JmsListener
import org.springframework.stereotype.Component

@Component
class MyBean {

	@JmsListener(destination = "someQueue")
	fun processMessage(content: String?) {
		// ...
	}

}
更多详情请参见 @EnableJms API 文档。

如果你需要创建更多 JmsListenerContainerFactory 实例,或想覆盖默认配置,Spring Boot 提供了 DefaultJmsListenerContainerFactoryConfigurer,可用来以与自动配置工厂相同的设置初始化 DefaultJmsListenerContainerFactory

例如,下面的示例暴露了一个使用特定 MessageConverter 的工厂:

  • Java

  • Kotlin

import jakarta.jms.ConnectionFactory;

import org.springframework.boot.autoconfigure.jms.DefaultJmsListenerContainerFactoryConfigurer;
import org.springframework.boot.jms.ConnectionFactoryUnwrapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;

@Configuration(proxyBeanMethods = false)
public class MyJmsConfiguration {

	@Bean
	public DefaultJmsListenerContainerFactory myFactory(DefaultJmsListenerContainerFactoryConfigurer configurer,
			ConnectionFactory connectionFactory) {
		DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
		configurer.configure(factory, ConnectionFactoryUnwrapper.unwrapCaching(connectionFactory));
		factory.setMessageConverter(new MyMessageConverter());
		return factory;
	}

}
import jakarta.jms.ConnectionFactory
import org.springframework.boot.autoconfigure.jms.DefaultJmsListenerContainerFactoryConfigurer
import org.springframework.boot.jms.ConnectionFactoryUnwrapper
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.jms.config.DefaultJmsListenerContainerFactory

@Configuration(proxyBeanMethods = false)
class MyJmsConfiguration {

	@Bean
	fun myFactory(configurer: DefaultJmsListenerContainerFactoryConfigurer,
				  connectionFactory: ConnectionFactory): DefaultJmsListenerContainerFactory {
		val factory = DefaultJmsListenerContainerFactory()
		configurer.configure(factory, ConnectionFactoryUnwrapper.unwrapCaching(connectionFactory))
		factory.setMessageConverter(MyMessageConverter())
		return factory
	}

}
上述示例中的自定义通过 ConnectionFactoryUnwrapper,以与自动配置工厂相同的方式将原生连接工厂关联到消息监听容器。

然后你可以在任何带有 @JmsListener 注解的方法中使用该工厂,如下所示:

  • Java

  • Kotlin

import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	@JmsListener(destination = "someQueue", containerFactory = "myFactory")
	public void processMessage(String content) {
		// ...
	}

}
import org.springframework.jms.annotation.JmsListener
import org.springframework.stereotype.Component

@Component
class MyBean {

	@JmsListener(destination = "someQueue", containerFactory = "myFactory")
	fun processMessage(content: String?) {
		// ...
	}

}