AMQP
高级消息队列协议(AMQP)是一种平台无关的、面向消息中间件的线级协议。
Spring AMQP 项目将核心 Spring 概念应用于基于 AMQP 的消息解决方案开发。
Spring Boot 通过 RabbitMQ 提供了多种便捷方式来使用 AMQP,包括 spring-boot-starter-amqp
启动器。
RabbitMQ 支持
RabbitMQ 是一个基于 AMQP 协议的轻量级、可靠、可扩展且可移植的消息代理。 Spring 使用 RabbitMQ 通过 AMQP 协议进行通信。
RabbitMQ 的配置通过外部配置属性 spring.rabbitmq.*
控制。
例如,你可以在 application.properties
中声明如下配置:
-
Properties
-
YAML
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=secret
spring:
rabbitmq:
host: "localhost"
port: 5672
username: "admin"
password: "secret"
或者,你也可以通过 addresses
属性配置同一个连接:
-
Properties
-
YAML
spring.rabbitmq.addresses=amqp://admin:secret@localhost
spring:
rabbitmq:
addresses: "amqp://admin:secret@localhost"
以这种方式指定 addresses 时,host 和 port 属性会被忽略。
如果地址使用 amqps 协议,则会自动启用 SSL 支持。
|
更多受支持的基于属性的配置选项请参见 RabbitProperties
。
如需配置 Spring AMQP 所用 RabbitMQ ConnectionFactory
的底层细节,可定义 ConnectionFactoryCustomizer
bean。
如果上下文中存在 ConnectionNameStrategy
bean,则会自动用于为自动配置的 CachingConnectionFactory
创建的连接命名。
如需对 RabbitTemplate
进行全局、增量自定义,可使用 RabbitTemplateCustomizer
bean。
更多详情请参见 理解 RabbitMQ 所用的 AMQP 协议。 |
发送消息
Spring 的 AmqpTemplate
和 AmqpAdmin
已自动配置,你可以像下面这样直接注入到自己的 bean 中:
-
Java
-
Kotlin
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
private final AmqpAdmin amqpAdmin;
private final AmqpTemplate amqpTemplate;
public MyBean(AmqpAdmin amqpAdmin, AmqpTemplate amqpTemplate) {
this.amqpAdmin = amqpAdmin;
this.amqpTemplate = amqpTemplate;
}
// ...
public void someMethod() {
this.amqpAdmin.getQueueInfo("someQueue");
}
public void someOtherMethod() {
this.amqpTemplate.convertAndSend("hello");
}
}
import org.springframework.amqp.core.AmqpAdmin
import org.springframework.amqp.core.AmqpTemplate
import org.springframework.stereotype.Component
@Component
class MyBean(private val amqpAdmin: AmqpAdmin, private val amqpTemplate: AmqpTemplate) {
// ...
fun someMethod() {
amqpAdmin.getQueueInfo("someQueue")
}
fun someOtherMethod() {
amqpTemplate.convertAndSend("hello")
}
}
RabbitMessagingTemplate 也可以以类似方式注入。
如果定义了 MessageConverter bean,则会自动关联到自动配置的 AmqpTemplate 。
|
如有需要,任何作为 bean 定义的 Queue
都会自动用于在 RabbitMQ 实例上声明对应队列。
如需重试操作,可以在 AmqpTemplate
上启用重试(例如在 broker 连接丢失时):
-
Properties
-
YAML
spring.rabbitmq.template.retry.enabled=true
spring.rabbitmq.template.retry.initial-interval=2s
spring:
rabbitmq:
template:
retry:
enabled: true
initial-interval: "2s"
重试默认是关闭的。
你也可以通过声明 RabbitRetryTemplateCustomizer
bean,编程方式自定义 RetryTemplate
。
如果你需要创建更多 RabbitTemplate
实例,或想覆盖默认配置,Spring Boot 提供了 RabbitTemplateConfigurer
bean,可用来以与自动配置工厂相同的设置初始化 RabbitTemplate
。
发送消息到流
如需向特定流发送消息,只需指定流的名称,如下例所示:
-
Properties
-
YAML
spring.rabbitmq.stream.name=my-stream
spring:
rabbitmq:
stream:
name: "my-stream"
如果定义了 MessageConverter
、StreamMessageConverter
或 ProducerCustomizer
bean,则会自动关联到自动配置的 RabbitStreamTemplate
。
如果你需要创建更多 RabbitStreamTemplate
实例,或想覆盖默认配置,Spring Boot 提供了 RabbitStreamTemplateConfigurer
bean,可用来以与自动配置工厂相同的设置初始化 RabbitStreamTemplate
。
接收消息
当 Rabbit 基础设施存在时,任何 bean 都可以通过注解 @RabbitListener
创建监听端点。
如果未定义 RabbitListenerContainerFactory
,则会自动配置一个默认的 SimpleRabbitListenerContainerFactory
,你可以通过 spring.rabbitmq.listener.type
属性切换为 direct 容器。
如果定义了 MessageConverter
或 MessageRecoverer
bean,则会自动关联到默认工厂。
以下示例组件会在 someQueue
队列上创建一个监听端点:
-
Java
-
Kotlin
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
@RabbitListener(queues = "someQueue")
public void processMessage(String content) {
// ...
}
}
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component
@Component
class MyBean {
@RabbitListener(queues = ["someQueue"])
fun processMessage(content: String?) {
// ...
}
}
更多详情请参见 @EnableRabbit 。
|
如果你需要创建更多 RabbitListenerContainerFactory
实例,或想覆盖默认配置,Spring Boot 提供了 SimpleRabbitListenerContainerFactoryConfigurer
和 DirectRabbitListenerContainerFactoryConfigurer
,可用来以与自动配置工厂相同的设置初始化 SimpleRabbitListenerContainerFactory
和 DirectRabbitListenerContainerFactory
。
你选择哪种容器类型并不重要。 这两个 bean 都由自动配置暴露。 |
例如,下面的配置类暴露了一个使用特定 MessageConverter
的工厂:
-
Java
-
Kotlin
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration(proxyBeanMethods = false)
public class MyRabbitConfiguration {
@Bean
public SimpleRabbitListenerContainerFactory myFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
ConnectionFactory connectionFactory = getCustomConnectionFactory();
configurer.configure(factory, connectionFactory);
factory.setMessageConverter(new MyMessageConverter());
return factory;
}
private ConnectionFactory getCustomConnectionFactory() {
return ...
}
}
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory
import org.springframework.amqp.rabbit.connection.ConnectionFactory
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
@Configuration(proxyBeanMethods = false)
class MyRabbitConfiguration {
@Bean
fun myFactory(configurer: SimpleRabbitListenerContainerFactoryConfigurer): SimpleRabbitListenerContainerFactory {
val factory = SimpleRabbitListenerContainerFactory()
val connectionFactory = getCustomConnectionFactory()
configurer.configure(factory, connectionFactory)
factory.setMessageConverter(MyMessageConverter())
return factory
}
fun getCustomConnectionFactory() : ConnectionFactory? {
return ...
}
}
然后你可以在任何带有 @RabbitListener
注解的方法中使用该工厂,如下所示:
-
Java
-
Kotlin
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
@RabbitListener(queues = "someQueue", containerFactory = "myFactory")
public void processMessage(String content) {
// ...
}
}
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component
@Component
class MyBean {
@RabbitListener(queues = ["someQueue"], containerFactory = "myFactory")
fun processMessage(content: String?) {
// ...
}
}
你可以启用重试来处理监听器抛出异常的情况。
默认情况下,使用 RejectAndDontRequeueRecoverer
,但你可以自定义 MessageRecoverer
。
当重试耗尽时,消息会被拒绝并丢弃,或在 broker 配置允许时路由到死信交换。
默认情况下,重试是关闭的。
你也可以通过声明 RabbitRetryTemplateCustomizer
bean,编程方式自定义 RetryTemplate
。
默认情况下,如果重试关闭且监听器抛出异常,则会无限次重投递。
你可以通过两种方式修改此行为:将 defaultRequeueRejected 属性设置为 false ,这样就不会再尝试重新投递;或抛出 AmqpRejectAndDontRequeueException ,表示消息应被拒绝。
后者是启用重试并达到最大投递次数时采用的机制。
|