AMQP

高级消息队列协议(AMQP)是一种平台无关的、面向消息中间件的线级协议。 Spring AMQP 项目将核心 Spring 概念应用于基于 AMQP 的消息解决方案开发。 Spring Boot 通过 RabbitMQ 提供了多种便捷方式来使用 AMQP,包括 spring-boot-starter-amqp 启动器。

RabbitMQ 支持

RabbitMQ 是一个基于 AMQP 协议的轻量级、可靠、可扩展且可移植的消息代理。 Spring 使用 RabbitMQ 通过 AMQP 协议进行通信。

RabbitMQ 的配置通过外部配置属性 spring.rabbitmq.* 控制。 例如,你可以在 application.properties 中声明如下配置:

  • Properties

  • YAML

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=secret
spring:
  rabbitmq:
    host: "localhost"
    port: 5672
    username: "admin"
    password: "secret"

或者,你也可以通过 addresses 属性配置同一个连接:

  • Properties

  • YAML

spring.rabbitmq.addresses=amqp://admin:secret@localhost
spring:
  rabbitmq:
    addresses: "amqp://admin:secret@localhost"
以这种方式指定 addresses 时,hostport 属性会被忽略。 如果地址使用 amqps 协议,则会自动启用 SSL 支持。

更多受支持的基于属性的配置选项请参见 RabbitProperties。 如需配置 Spring AMQP 所用 RabbitMQ ConnectionFactory 的底层细节,可定义 ConnectionFactoryCustomizer bean。

如果上下文中存在 ConnectionNameStrategy bean,则会自动用于为自动配置的 CachingConnectionFactory 创建的连接命名。

如需对 RabbitTemplate 进行全局、增量自定义,可使用 RabbitTemplateCustomizer bean。

更多详情请参见 理解 RabbitMQ 所用的 AMQP 协议

发送消息

Spring 的 AmqpTemplateAmqpAdmin 已自动配置,你可以像下面这样直接注入到自己的 bean 中:

  • Java

  • Kotlin

import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	private final AmqpAdmin amqpAdmin;

	private final AmqpTemplate amqpTemplate;

	public MyBean(AmqpAdmin amqpAdmin, AmqpTemplate amqpTemplate) {
		this.amqpAdmin = amqpAdmin;
		this.amqpTemplate = amqpTemplate;
	}

	// ...

	public void someMethod() {
		this.amqpAdmin.getQueueInfo("someQueue");
	}

	public void someOtherMethod() {
		this.amqpTemplate.convertAndSend("hello");
	}

}
import org.springframework.amqp.core.AmqpAdmin
import org.springframework.amqp.core.AmqpTemplate
import org.springframework.stereotype.Component

@Component
class MyBean(private val amqpAdmin: AmqpAdmin, private val amqpTemplate: AmqpTemplate) {

	// ...

	fun someMethod() {
		amqpAdmin.getQueueInfo("someQueue")
	}

	fun someOtherMethod() {
		amqpTemplate.convertAndSend("hello")
	}

}
RabbitMessagingTemplate 也可以以类似方式注入。 如果定义了 MessageConverter bean,则会自动关联到自动配置的 AmqpTemplate

如有需要,任何作为 bean 定义的 Queue 都会自动用于在 RabbitMQ 实例上声明对应队列。

如需重试操作,可以在 AmqpTemplate 上启用重试(例如在 broker 连接丢失时):

  • Properties

  • YAML

spring.rabbitmq.template.retry.enabled=true
spring.rabbitmq.template.retry.initial-interval=2s
spring:
  rabbitmq:
    template:
      retry:
        enabled: true
        initial-interval: "2s"

重试默认是关闭的。 你也可以通过声明 RabbitRetryTemplateCustomizer bean,编程方式自定义 RetryTemplate

如果你需要创建更多 RabbitTemplate 实例,或想覆盖默认配置,Spring Boot 提供了 RabbitTemplateConfigurer bean,可用来以与自动配置工厂相同的设置初始化 RabbitTemplate

发送消息到流

如需向特定流发送消息,只需指定流的名称,如下例所示:

  • Properties

  • YAML

spring.rabbitmq.stream.name=my-stream
spring:
  rabbitmq:
    stream:
      name: "my-stream"

如果定义了 MessageConverterStreamMessageConverterProducerCustomizer bean,则会自动关联到自动配置的 RabbitStreamTemplate

如果你需要创建更多 RabbitStreamTemplate 实例,或想覆盖默认配置,Spring Boot 提供了 RabbitStreamTemplateConfigurer bean,可用来以与自动配置工厂相同的设置初始化 RabbitStreamTemplate

接收消息

当 Rabbit 基础设施存在时,任何 bean 都可以通过注解 @RabbitListener 创建监听端点。 如果未定义 RabbitListenerContainerFactory,则会自动配置一个默认的 SimpleRabbitListenerContainerFactory,你可以通过 spring.rabbitmq.listener.type 属性切换为 direct 容器。 如果定义了 MessageConverterMessageRecoverer bean,则会自动关联到默认工厂。

以下示例组件会在 someQueue 队列上创建一个监听端点:

  • Java

  • Kotlin

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	@RabbitListener(queues = "someQueue")
	public void processMessage(String content) {
		// ...
	}

}
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component

@Component
class MyBean {

	@RabbitListener(queues = ["someQueue"])
	fun processMessage(content: String?) {
		// ...
	}

}
更多详情请参见 @EnableRabbit

如果你需要创建更多 RabbitListenerContainerFactory 实例,或想覆盖默认配置,Spring Boot 提供了 SimpleRabbitListenerContainerFactoryConfigurerDirectRabbitListenerContainerFactoryConfigurer,可用来以与自动配置工厂相同的设置初始化 SimpleRabbitListenerContainerFactoryDirectRabbitListenerContainerFactory

你选择哪种容器类型并不重要。 这两个 bean 都由自动配置暴露。

例如,下面的配置类暴露了一个使用特定 MessageConverter 的工厂:

  • Java

  • Kotlin

import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration(proxyBeanMethods = false)
public class MyRabbitConfiguration {

	@Bean
	public SimpleRabbitListenerContainerFactory myFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer) {
		SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
		ConnectionFactory connectionFactory = getCustomConnectionFactory();
		configurer.configure(factory, connectionFactory);
		factory.setMessageConverter(new MyMessageConverter());
		return factory;
	}

	private ConnectionFactory getCustomConnectionFactory() {
		return ...
	}

}
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory
import org.springframework.amqp.rabbit.connection.ConnectionFactory
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration

@Configuration(proxyBeanMethods = false)
class MyRabbitConfiguration {

	@Bean
	fun myFactory(configurer: SimpleRabbitListenerContainerFactoryConfigurer): SimpleRabbitListenerContainerFactory {
		val factory = SimpleRabbitListenerContainerFactory()
		val connectionFactory = getCustomConnectionFactory()
		configurer.configure(factory, connectionFactory)
		factory.setMessageConverter(MyMessageConverter())
		return factory
	}

	fun getCustomConnectionFactory() : ConnectionFactory? {
		return ...
	}

}

然后你可以在任何带有 @RabbitListener 注解的方法中使用该工厂,如下所示:

  • Java

  • Kotlin

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	@RabbitListener(queues = "someQueue", containerFactory = "myFactory")
	public void processMessage(String content) {
		// ...
	}

}
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component

@Component
class MyBean {

	@RabbitListener(queues = ["someQueue"], containerFactory = "myFactory")
	fun processMessage(content: String?) {
		// ...
	}

}

你可以启用重试来处理监听器抛出异常的情况。 默认情况下,使用 RejectAndDontRequeueRecoverer,但你可以自定义 MessageRecoverer。 当重试耗尽时,消息会被拒绝并丢弃,或在 broker 配置允许时路由到死信交换。 默认情况下,重试是关闭的。 你也可以通过声明 RabbitRetryTemplateCustomizer bean,编程方式自定义 RetryTemplate

默认情况下,如果重试关闭且监听器抛出异常,则会无限次重投递。 你可以通过两种方式修改此行为:将 defaultRequeueRejected 属性设置为 false,这样就不会再尝试重新投递;或抛出 AmqpRejectAndDontRequeueException,表示消息应被拒绝。 后者是启用重试并达到最大投递次数时采用的机制。