JMS

ConnectionFactory 接口提供了一种创建 Connection 的标准方法,用于与 JMS broker 进行交互。 虽然 Spring 需要 ConnectionFactory 来使用 JMS,但你通常不需要直接使用它,而是可以依赖更高级别的消息抽象。 (有关详细信息,请参阅 Spring Framework 参考文档的 相关部分。) Spring Boot 还会自动配置发送和接收消息所需的基础设施。

ActiveMQ "Classic" 支持

ActiveMQ "Classic" 在类路径上可用时,Spring Boot 可以配置 ConnectionFactory。 如果存在 broker,则会自动启动和配置嵌入式 broker(前提是没有通过配置指定 broker URL 且未在配置中禁用嵌入式 broker)。

注意:如果你使用 spring-boot-starter-activemq,则会提供连接到 ActiveMQ "Classic" 实例所需的依赖项,以及用于与 JMS 集成的 Spring 基础设施。 将 org.apache.activemq:activemq-broker 添加到你的应用程序中,你就可以使用嵌入式 broker。

ActiveMQ "Classic" 配置由 spring.activemq.* 中的外部配置属性控制。

如果 activemq-broker 在类路径上,ActiveMQ "Classic" 会自动配置为使用 VM transport,它会在同一个 JVM 实例中启动一个嵌入式 broker。

你可以通过配置 spring.activemq.embedded.enabled 属性来禁用嵌入式 broker,如下例所示:

  • Properties

  • YAML

spring.activemq.embedded.enabled=false
spring:
  activemq:
    embedded:
      enabled: false

如果你配置了 broker URL,嵌入式 broker 也会被禁用,如下例所示:

  • Properties

  • YAML

spring.activemq.broker-url=tcp://192.168.1.210:9876
spring.activemq.user=admin
spring.activemq.password=secret
spring:
  activemq:
    broker-url: "tcp://192.168.1.210:9876"
    user: "admin"
    password: "secret"

如果你想完全控制嵌入式 broker,请参阅 ActiveMQ "Classic" 文档 获取更多信息。

默认情况下,CachingConnectionFactory 会包装原生 ConnectionFactory,并提供合理的设置,你可以通过 spring.jms.* 中的外部配置属性来控制:

  • Properties

  • YAML

spring.jms.cache.session-cache-size=5
spring:
  jms:
    cache:
      session-cache-size: 5

如果你更希望使用原生连接池,可以通过添加 org.messaginghub:pooled-jms 依赖并相应地配置 JmsPoolConnectionFactory 来实现,如下例所示:

  • Properties

  • YAML

spring.activemq.pool.enabled=true
spring.activemq.pool.max-connections=50
spring:
  activemq:
    pool:
      enabled: true
      max-connections: 50

提示:有关更多支持的选项,请参阅 ActiveMQProperties。 你还可以注册任意数量的实现 ActiveMQConnectionFactoryCustomizer 的 bean 来进行更高级的自定义。

默认情况下,如果目标不存在,ActiveMQ "Classic" 会创建一个目标,以便根据提供的名称解析目标。

ActiveMQ Artemis 支持

当检测到 ActiveMQ Artemis 在类路径上可用时,Spring Boot 可以自动配置 ConnectionFactory。 如果存在 broker,则会自动启动和配置嵌入式 broker(除非已明确设置 mode 属性)。 支持的模式有 embedded(明确表示需要嵌入式 broker,如果类路径上没有 broker 则应出错)和 native(使用 netty 传输协议连接到 broker)。 当配置后者时,Spring Boot 会配置一个 ConnectionFactory,该工厂使用默认设置连接到本地机器上运行的 broker。

注意:如果你使用 spring-boot-starter-artemis,则会提供连接到现有 ActiveMQ Artemis 实例所需的依赖项,以及用于与 JMS 集成的 Spring 基础设施。 将 org.apache.activemq:artemis-jakarta-server 添加到你的应用程序中,你就可以使用嵌入式模式。

ActiveMQ Artemis 配置由 spring.artemis.* 中的外部配置属性控制。 例如,你可以在 application.properties 中声明以下部分:

  • Properties

  • YAML

spring.artemis.mode=native
spring.artemis.broker-url=tcp://192.168.1.210:9876
spring.artemis.user=admin
spring.artemis.password=secret
spring:
  artemis:
    mode: native
    broker-url: "tcp://192.168.1.210:9876"
    user: "admin"
    password: "secret"

在嵌入 broker 时,你可以选择是否启用持久性,并列出应该可用的目标。 这些可以指定为逗号分隔的列表,以使用默认选项创建它们,或者你可以定义类型为 JMSQueueConfigurationTopicConfiguration 的 bean,分别用于高级队列和主题配置。

默认情况下,CachingConnectionFactory 会包装原生 ConnectionFactory,并提供合理的设置,你可以通过 spring.jms.* 中的外部配置属性来控制:

  • Properties

  • YAML

spring.jms.cache.session-cache-size=5
spring:
  jms:
    cache:
      session-cache-size: 5

如果你更希望使用原生连接池,可以通过添加 org.messaginghub:pooled-jms 依赖并相应地配置 JmsPoolConnectionFactory 来实现,如下例所示:

  • Properties

  • YAML

spring.artemis.pool.enabled=true
spring.artemis.pool.max-connections=50
spring:
  artemis:
    pool:
      enabled: true
      max-connections: 50

有关更多支持的选项,请参阅 ArtemisProperties

不涉及 JNDI 查找,目标会根据其名称进行解析,使用 ActiveMQ Artemis 配置中的 name 属性或通过配置提供的名称。

使用 JNDI ConnectionFactory

如果你的应用程序在应用服务器中运行,Spring Boot 会尝试使用 JNDI 定位 JMS ConnectionFactory。 默认情况下,会检查 java:/JmsXAjava:/XAConnectionFactory 位置。 如果你需要指定替代位置,可以使用 spring.jms.jndi-name 属性,如下例所示:

  • Properties

  • YAML

spring.jms.jndi-name=java:/MyConnectionFactory
spring:
  jms:
    jndi-name: "java:/MyConnectionFactory"

发送消息

Spring 的 JmsTemplate 是自动配置的,你可以直接将其自动装配到你自己的 bean 中,如下例所示:

  • Java

  • Kotlin

import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	private final JmsTemplate jmsTemplate;

	public MyBean(JmsTemplate jmsTemplate) {
		this.jmsTemplate = jmsTemplate;
	}

	// ...

	public void someMethod() {
		this.jmsTemplate.convertAndSend("hello");
	}

}
import org.springframework.jms.core.JmsTemplate
import org.springframework.stereotype.Component

@Component
class MyBean(private val jmsTemplate: JmsTemplate) {

	// ...

	fun someMethod() {
		jmsTemplate.convertAndSend("hello")
	}

}

注意:JmsMessagingTemplate 可以以类似的方式注入。 如果定义了 DestinationResolverMessageConverter bean,它会自动关联到自动配置的 JmsTemplate

接收消息

当 JMS 基础设施存在时,任何 bean 都可以用 @JmsListener 注解来创建监听器端点。 如果没有定义 JmsListenerContainerFactory,则会自动配置一个默认的工厂。 如果定义了 DestinationResolverMessageConverterExceptionListener bean,它们会自动关联到默认工厂。

在大多数情况下,消息监听器容器应该针对原生 ConnectionFactory 进行配置。 这样每个监听器容器都有自己的连接,这使其在本地恢复方面具有完全的责任。 自动配置使用 ConnectionFactoryUnwrapper 从自动配置的工厂中解包原生连接工厂。

注意:自动配置只解包 CachedConnectionFactory

默认情况下,默认工厂是事务性的。 如果你在存在 JtaTransactionManager 的基础设施中运行,它会默认关联到监听器容器。 如果没有,则启用 sessionTransacted 标志。 在后一种情况下,你可以通过在监听器方法(或其委托)上添加 @Transactional 来将本地数据存储事务关联到传入消息的处理。 这确保了在本地事务完成后确认传入的消息。 这还包括在同一 JMS 会话上执行的响应消息的发送。

以下组件在 someQueue 目标上创建监听器端点:

  • Java

  • Kotlin

import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	@JmsListener(destination = "someQueue")
	public void processMessage(String content) {
		// ...
	}

}
import org.springframework.jms.annotation.JmsListener
import org.springframework.stereotype.Component

@Component
class MyBean {

	@JmsListener(destination = "someQueue")
	fun processMessage(content: String?) {
		// ...
	}

}

提示:有关更多详细信息,请参阅 @EnableJms API 文档。

如果你需要创建更多 JmsListenerContainerFactory 实例或者想要覆盖默认配置,Spring Boot 提供了 DefaultJmsListenerContainerFactoryConfigurer,你可以用它来初始化 DefaultJmsListenerContainerFactory,使用与自动配置相同的设置。

例如,以下示例暴露了另一个使用特定 MessageConverter 的工厂:

  • Java

  • Kotlin

import jakarta.jms.ConnectionFactory;

import org.springframework.boot.autoconfigure.jms.DefaultJmsListenerContainerFactoryConfigurer;
import org.springframework.boot.jms.ConnectionFactoryUnwrapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;

@Configuration(proxyBeanMethods = false)
public class MyJmsConfiguration {

	@Bean
	public DefaultJmsListenerContainerFactory myFactory(DefaultJmsListenerContainerFactoryConfigurer configurer,
			ConnectionFactory connectionFactory) {
		DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
		configurer.configure(factory, ConnectionFactoryUnwrapper.unwrapCaching(connectionFactory));
		factory.setMessageConverter(new MyMessageConverter());
		return factory;
	}

}
import jakarta.jms.ConnectionFactory
import org.springframework.boot.autoconfigure.jms.DefaultJmsListenerContainerFactoryConfigurer
import org.springframework.boot.jms.ConnectionFactoryUnwrapper
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.jms.config.DefaultJmsListenerContainerFactory

@Configuration(proxyBeanMethods = false)
class MyJmsConfiguration {

	@Bean
	fun myFactory(configurer: DefaultJmsListenerContainerFactoryConfigurer,
				  connectionFactory: ConnectionFactory): DefaultJmsListenerContainerFactory {
		val factory = DefaultJmsListenerContainerFactory()
		configurer.configure(factory, ConnectionFactoryUnwrapper.unwrapCaching(connectionFactory))
		factory.setMessageConverter(MyMessageConverter())
		return factory
	}

}

注意:在上面的示例中,自定义使用 ConnectionFactoryUnwrapper 将原生连接工厂关联到消息监听器容器,与自动配置的工厂相同。

然后你可以在任何 @JmsListener 注解的方法中使用该工厂,如下所示:

  • Java

  • Kotlin

import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	@JmsListener(destination = "someQueue", containerFactory = "myFactory")
	public void processMessage(String content) {
		// ...
	}

}
import org.springframework.jms.annotation.JmsListener
import org.springframework.stereotype.Component

@Component
class MyBean {

	@JmsListener(destination = "someQueue", containerFactory = "myFactory")
	fun processMessage(content: String?) {
		// ...
	}

}