Apache Pulsar 支持
Apache Pulsar 通过为 Spring for Apache Pulsar 项目提供自动配置获得支持。
当 org.springframework.pulsar:spring-pulsar
在 classpath 上时,Spring Boot 会自动配置并注册经典(命令式)的 Spring for Apache Pulsar 组件。
当 org.springframework.pulsar:spring-pulsar-reactive
在 classpath 上时,也会自动配置响应式组件。
分别提供了 spring-boot-starter-pulsar
和 spring-boot-starter-pulsar-reactive
启动器,便于收集命令式和响应式使用所需的依赖。
连接 Pulsar
使用 Pulsar starter 时,Spring Boot 会自动配置并注册一个 PulsarClient
bean。
默认情况下,应用会尝试连接到本地的 Pulsar 实例 pulsar://localhost:6650
。
可以通过设置 spring.pulsar.client.service-url
属性为其他值来调整。
该值必须是有效的 Pulsar 协议 URL。 |
你可以通过指定任何以 spring.pulsar.client.*
为前缀的应用属性来配置客户端。
如需更细致的配置控制,可注册一个或多个 PulsarClientBuilderCustomizer
bean。
认证
若需连接到需要认证的 Pulsar 集群,需要通过设置 pluginClassName
及插件所需参数来指定认证插件。
可将参数设置为参数名到参数值的映射。
以下示例展示了如何配置 AuthenticationOAuth2
插件。
-
Properties
-
YAML
spring.pulsar.client.authentication.plugin-class-name=org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
spring.pulsar.client.authentication.param.issuerUrl=https://auth.server.cloud/
spring.pulsar.client.authentication.param.privateKey=file:///Users/some-key.json
spring.pulsar.client.authentication.param.audience=urn:sn:acme:dev:my-instance
spring:
pulsar:
client:
authentication:
plugin-class-name: org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
param:
issuerUrl: https://auth.server.cloud/
privateKey: file:///Users/some-key.json
audience: urn:sn:acme:dev:my-instance
你需要确保 例如,若要为 这种缺乏松散绑定也导致使用环境变量配置认证参数时存在问题,因为在转换过程中会丢失大小写敏感性。 如果你使用环境变量配置参数,请参考 Spring for Apache Pulsar 文档 相关步骤 以确保正常工作。 |
响应式连接 Pulsar
当响应式自动配置被激活时,Spring Boot 会自动配置并注册一个 ReactivePulsarClient
bean。
ReactivePulsarClient
会适配前述的 PulsarClient
实例。
因此,请参考上一节配置 PulsarClient
,以供 ReactivePulsarClient
使用。
连接 Pulsar 管理端
Spring for Apache Pulsar 的 PulsarAdministration
客户端也会被自动配置。
默认情况下,应用会尝试连接到本地 Pulsar 实例 http://localhost:8080
。
可以通过设置 spring.pulsar.admin.service-url
属性为 (http|https)://<host>:<port>
形式的其他值来调整。
如需更细致的配置控制,可注册一个或多个 PulsarAdminBuilderCustomizer
bean。
认证
访问需要认证的 Pulsar 集群时,管理端客户端需要与常规 Pulsar 客户端相同的安全配置。
你可以将上述 认证配置 中的 spring.pulsar.client.authentication
替换为 spring.pulsar.admin.authentication
。
若需在启动时创建主题,可添加一个类型为 PulsarTopic 的 bean。
如果主题已存在,该 bean 会被忽略。
|
发送消息
Spring 的 PulsarTemplate
会被自动配置,你可以用它发送消息,如下例所示:
-
Java
-
Kotlin
import org.springframework.pulsar.core.PulsarTemplate;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
private final PulsarTemplate<String> pulsarTemplate;
public MyBean(PulsarTemplate<String> pulsarTemplate) {
this.pulsarTemplate = pulsarTemplate;
}
public void someMethod() {
this.pulsarTemplate.send("someTopic", "Hello");
}
}
import org.apache.pulsar.client.api.PulsarClientException
import org.springframework.pulsar.core.PulsarTemplate
import org.springframework.stereotype.Component
@Component
class MyBean(private val pulsarTemplate: PulsarTemplate<String>) {
@Throws(PulsarClientException::class)
fun someMethod() {
pulsarTemplate.send("someTopic", "Hello")
}
}
PulsarTemplate
依赖 PulsarProducerFactory
创建底层 Pulsar producer。
Spring Boot 自动配置也会提供该 producer 工厂,默认情况下会缓存其创建的 producer。
你可以通过指定任何以 spring.pulsar.producer.
和 spring.pulsar.producer.cache.
为前缀的应用属性来配置 producer 工厂及缓存设置。
如需更细致的 producer 工厂配置控制,可注册一个或多个 ProducerBuilderCustomizer
bean。
这些定制器会应用于所有创建的 producer。
你也可以在发送消息时传入 ProducerBuilderCustomizer
,仅影响当前 producer。
如需更细致地控制发送的消息,可在发送消息时传入 TypedMessageBuilderCustomizer
。
响应式发送消息
当响应式自动配置被激活时,Spring 的 ReactivePulsarTemplate
会被自动配置,你可以用它发送消息,如下例所示:
-
Java
-
Kotlin
import org.springframework.pulsar.reactive.core.ReactivePulsarTemplate;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
private final ReactivePulsarTemplate<String> pulsarTemplate;
public MyBean(ReactivePulsarTemplate<String> pulsarTemplate) {
this.pulsarTemplate = pulsarTemplate;
}
public void someMethod() {
this.pulsarTemplate.send("someTopic", "Hello").subscribe();
}
}
import org.springframework.pulsar.reactive.core.ReactivePulsarTemplate
import org.springframework.stereotype.Component
@Component
class MyBean(private val pulsarTemplate: ReactivePulsarTemplate<String>) {
fun someMethod() {
pulsarTemplate.send("someTopic", "Hello").subscribe()
}
}
ReactivePulsarTemplate
依赖 ReactivePulsarSenderFactory
实际创建底层 sender。
Spring Boot 自动配置也会提供该 sender 工厂,默认情况下会缓存其创建的 producer。
你可以通过指定任何以 spring.pulsar.producer.
和 spring.pulsar.producer.cache.
为前缀的应用属性来配置 sender 工厂及缓存设置。
如需更细致的 sender 工厂配置控制,可注册一个或多个 ReactiveMessageSenderBuilderCustomizer
bean。
这些定制器会应用于所有创建的 sender。
你也可以在发送消息时传入 ReactiveMessageSenderBuilderCustomizer
,仅影响当前 sender。
如需更细致地控制发送的消息,可在发送消息时传入 MessageSpecBuilderCustomizer
。
接收消息
当 Apache Pulsar 基础设施存在时,任何 bean 都可以通过注解 @PulsarListener
创建监听端点。
下列组件会在 someTopic
主题上创建监听端点:
-
Java
-
Kotlin
import org.springframework.pulsar.annotation.PulsarListener;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
@PulsarListener(topics = "someTopic")
public void processMessage(String content) {
// ...
}
}
import org.springframework.pulsar.annotation.PulsarListener
import org.springframework.stereotype.Component
@Component
class MyBean {
@PulsarListener(topics = ["someTopic"])
fun processMessage(content: String?) {
// ...
}
}
Spring Boot 自动配置会提供 PulsarListener
所需的全部组件,如 PulsarListenerContainerFactory
及其用于构建底层 Pulsar consumer 的工厂。
你可以通过指定任何以 spring.pulsar.listener.
和 spring.pulsar.consumer.
为前缀的应用属性来配置这些组件。
如需更细致的 consumer 工厂配置控制,可注册一个或多个 ConsumerBuilderCustomizer
bean。
这些定制器会应用于工厂创建的所有 consumer,因此也会应用于所有 @PulsarListener
实例。
你也可以通过设置 @PulsarListener
注解的 consumerCustomizer
属性来定制单个监听器。
如需更细致地控制实际的容器工厂配置,可注册一个或多个 PulsarContainerFactoryCustomizer<ConcurrentPulsarListenerContainerFactory<?>>
bean。
响应式接收消息
当 Apache Pulsar 基础设施存在且响应式自动配置被激活时,任何 bean 都可以通过注解 @ReactivePulsarListener
创建响应式监听端点。
下列组件会在 someTopic
主题上创建响应式监听端点:
-
Java
-
Kotlin
import reactor.core.publisher.Mono;
import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListener;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
@ReactivePulsarListener(topics = "someTopic")
public Mono<Void> processMessage(String content) {
// ...
return Mono.empty();
}
}
import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListener
import org.springframework.stereotype.Component
import reactor.core.publisher.Mono
@Component
class MyBean {
@ReactivePulsarListener(topics = ["someTopic"])
fun processMessage(content: String?): Mono<Void> {
// ...
return Mono.empty()
}
}
Spring Boot 自动配置会提供 ReactivePulsarListener
所需的全部组件,如 ReactivePulsarListenerContainerFactory
及其用于构建底层响应式 Pulsar consumer 的工厂。
你可以通过指定任何以 spring.pulsar.listener.
和 spring.pulsar.consumer.
为前缀的应用属性来配置这些组件。
如需更细致的 consumer 工厂配置控制,可注册一个或多个 ReactiveMessageConsumerBuilderCustomizer
bean。
这些定制器会应用于工厂创建的所有 consumer,因此也会应用于所有 @ReactivePulsarListener
实例。
你也可以通过设置 @ReactivePulsarListener
注解的 consumerCustomizer
属性来定制单个监听器。
如需更细致地控制实际的容器工厂配置,可注册一个或多个 PulsarContainerFactoryCustomizer<DefaultReactivePulsarListenerContainerFactory<?>>
bean。
读取消息
Pulsar reader 接口使应用能够手动管理游标。 使用 reader 连接到主题时,需要指定 reader 连接时开始读取的消息。
当 Apache Pulsar 基础设施存在时,任何 bean 都可以通过注解 @PulsarReader
使用 reader 消费消息。
下列组件会创建一个从 someTopic
主题开头开始读取消息的 reader 端点:
-
Java
-
Kotlin
import org.springframework.pulsar.annotation.PulsarReader;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
@PulsarReader(topics = "someTopic", startMessageId = "earliest")
public void processMessage(String content) {
// ...
}
}
import org.springframework.pulsar.annotation.PulsarReader
import org.springframework.stereotype.Component
@Component
class MyBean {
@PulsarReader(topics = ["someTopic"], startMessageId = "earliest")
fun processMessage(content: String?) {
// ...
}
}
@PulsarReader
依赖 PulsarReaderFactory
创建底层 Pulsar reader。
Spring Boot 自动配置会提供该 reader 工厂,可通过设置任何以 spring.pulsar.reader.*
为前缀的应用属性进行定制。
如需更细致的 reader 工厂配置控制,可注册一个或多个 ReaderBuilderCustomizer
bean。
这些定制器会应用于工厂创建的所有 reader,因此也会应用于所有 @PulsarReader
实例。
你也可以通过设置 @PulsarReader
注解的 readerCustomizer
属性来定制单个监听器。
如需更细致地控制实际的容器工厂配置,可注册一个或多个 PulsarContainerFactoryCustomizer<DefaultPulsarReaderContainerFactory<?>>
bean。
响应式读取消息
当 Apache Pulsar 基础设施存在且响应式自动配置被激活时,Spring 的 ReactivePulsarReaderFactory
会被提供,你可以用它以响应式方式读取消息。
下列组件会使用提供的工厂创建 reader,并从 someTopic
主题的5分钟前读取一条消息:
-
Java
-
Kotlin
import java.time.Instant;
import java.util.List;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.reactive.client.api.StartAtSpec;
import reactor.core.publisher.Mono;
import org.springframework.pulsar.reactive.core.ReactiveMessageReaderBuilderCustomizer;
import org.springframework.pulsar.reactive.core.ReactivePulsarReaderFactory;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
private final ReactivePulsarReaderFactory<String> pulsarReaderFactory;
public MyBean(ReactivePulsarReaderFactory<String> pulsarReaderFactory) {
this.pulsarReaderFactory = pulsarReaderFactory;
}
public void someMethod() {
ReactiveMessageReaderBuilderCustomizer<String> readerBuilderCustomizer = (readerBuilder) -> readerBuilder
.topic("someTopic")
.startAtSpec(StartAtSpec.ofInstant(Instant.now().minusSeconds(5)));
Mono<Message<String>> message = this.pulsarReaderFactory
.createReader(Schema.STRING, List.of(readerBuilderCustomizer))
.readOne();
// ...
}
}
import org.apache.pulsar.client.api.Schema
import org.apache.pulsar.reactive.client.api.ReactiveMessageReaderBuilder
import org.apache.pulsar.reactive.client.api.StartAtSpec
import org.springframework.pulsar.reactive.core.ReactiveMessageReaderBuilderCustomizer
import org.springframework.pulsar.reactive.core.ReactivePulsarReaderFactory
import org.springframework.stereotype.Component
import java.time.Instant
@Component
class MyBean(private val pulsarReaderFactory: ReactivePulsarReaderFactory<String>) {
fun someMethod() {
val readerBuilderCustomizer = ReactiveMessageReaderBuilderCustomizer {
readerBuilder: ReactiveMessageReaderBuilder<String> ->
readerBuilder
.topic("someTopic")
.startAtSpec(StartAtSpec.ofInstant(Instant.now().minusSeconds(5)))
}
val message = pulsarReaderFactory
.createReader(Schema.STRING, listOf(readerBuilderCustomizer))
.readOne()
// ...
}
}
Spring Boot 自动配置会提供该 reader 工厂,可通过设置任何以 spring.pulsar.reader.*
为前缀的应用属性进行定制。
如需更细致的 reader 工厂配置控制,可在使用工厂创建 reader 时传入一个或多个 ReactiveMessageReaderBuilderCustomizer
实例。
如需更细致的 reader 工厂配置控制,可注册一个或多个 ReactiveMessageReaderBuilderCustomizer
bean。
这些定制器会应用于所有创建的 reader。
你也可以在创建 reader 时传入一个或多个 ReactiveMessageReaderBuilderCustomizer
,仅对所创建的 reader 应用定制。
关于上述任一组件的更多细节及其他可用特性,请参见 Spring for Apache Pulsar 参考文档。 |
事务支持
Spring for Apache Pulsar 在使用 PulsarTemplate
和 @PulsarListener
时支持事务。
响应式变体当前尚不支持事务。 |
将 spring.pulsar.transaction.enabled
属性设置为 true
将会:
-
配置一个
PulsarTransactionManager
bean -
为
PulsarTemplate
启用事务支持 -
为
@PulsarListener
方法启用事务支持
@PulsarListener
的 transactional
属性可用于细粒度控制监听器何时使用事务。
如需更细致地控制 Spring for Apache Pulsar 的事务特性,可自定义 PulsarTemplate
和/或 ConcurrentPulsarListenerContainerFactory
bean。
如果默认自动配置的 PulsarTransactionManager
不适用,也可以自定义 PulsarAwareTransactionManager
bean。
其他 Pulsar 属性
自动配置支持的属性见附录 集成属性 部分。 注意,这些属性(无论是连字符还是驼峰命名)大多直接映射到 Apache Pulsar 的配置属性。 详情请参见 Apache Pulsar 官方文档。
Pulsar 支持的属性中,只有一部分可通过 PulsarProperties
类直接获取。
如需用额外属性调优自动配置组件,可使用上述各组件支持的定制器。