Apache Pulsar 支持

Apache Pulsar 通过提供 Spring for Apache Pulsar 项目的自动配置来支持。

org.springframework.pulsar:spring-pulsar 在 classpath 上时,Spring Boot 将自动配置并注册经典(命令式)Spring for Apache Pulsar 组件。 当 org.springframework.pulsar:spring-pulsar-reactive 在 classpath 上时,它将对响应式组件执行相同的操作。

spring-boot-starter-pulsarspring-boot-starter-pulsar-reactive 启动器,分别用于方便地收集命令式和响应式使用的依赖项。

连接到 Pulsar

当您使用 Pulsar 启动器时,Spring Boot 将自动配置并注册一个 PulsarClient bean。

默认情况下,应用程序尝试连接到 pulsar://localhost:6650 的本地 Pulsar 实例。 这可以通过将 spring.pulsar.client.service-url 属性设置为不同的值来调整。

注意:该值必须是有效的 Pulsar 协议 URL

您可以通过指定任何以 spring.pulsar.client.* 为前缀的应用程序属性来配置客户端。

如果您需要对配置进行更多控制,请考虑注册一个或多个 PulsarClientBuilderCustomizer bean。

认证

要连接到需要认证的 Pulsar 集群,您需要通过设置 pluginClassName 和插件所需的任何参数来指定要使用的认证插件。 您可以将参数设置为参数名称到参数值的映射。 以下示例显示如何配置 AuthenticationOAuth2 插件。

  • Properties

  • YAML

spring.pulsar.client.authentication.plugin-class-name=org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
spring.pulsar.client.authentication.param.issuerUrl=https://auth.server.cloud/
spring.pulsar.client.authentication.param.privateKey=file:///Users/some-key.json
spring.pulsar.client.authentication.param.audience=urn:sn:acme:dev:my-instance
spring:
  pulsar:
    client:
      authentication:
        plugin-class-name: org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
        param:
          issuerUrl: https://auth.server.cloud/
          privateKey: file:///Users/some-key.json
          audience: urn:sn:acme:dev:my-instance

您需要确保在 spring.pulsar.client.authentication.param.* 下定义的名称与您的认证插件期望的名称完全匹配(通常是驼峰命名法)。 Spring Boot 不会对这些条目尝试任何形式的宽松绑定。

例如,如果您想为 AuthenticationOAuth2 认证插件配置发行者 URL,您必须使用 spring.pulsar.client.authentication.param.issuerUrl。 如果您使用其他形式,如 issuerurlissuer-url,该设置将不会应用于插件。

这种缺乏宽松绑定的情况也使得使用环境变量进行认证参数设置变得困难,因为在转换过程中会丢失大小写敏感性。 如果您使用环境变量作为参数,则需要按照 Spring for Apache Pulsar 参考文档中的 这些步骤 才能正常工作。

SSL

默认情况下,Pulsar 客户端以纯文本形式与 Pulsar 服务通信。 您可以按照 Spring for Apache Pulsar 参考文档中的 这些步骤 来启用 TLS 加密。

有关客户端和认证的完整详细信息,请参阅 Spring for Apache Pulsar 参考文档

响应式连接到 Pulsar

当响应式自动配置被激活时,Spring Boot 将自动配置并注册一个 ReactivePulsarClient bean。

ReactivePulsarClient 适配了前面描述的 PulsarClient 的实例。 因此,请按照前面的部分来配置 PulsarClient,该客户端将被 ReactivePulsarClient 使用。

连接到 Pulsar 管理

Spring for Apache Pulsar 的 PulsarAdministration 客户端也会被自动配置。

默认情况下,应用程序尝试连接到 http://localhost:8080 的本地 Pulsar 实例。 这可以通过将 spring.pulsar.admin.service-url 属性设置为 (http|https)://<host>:<port> 形式的不同值来调整。

如果您需要对配置进行更多控制,请考虑注册一个或多个 PulsarAdminBuilderCustomizer bean。

认证

当访问需要认证的 Pulsar 集群时,管理客户端需要与常规 Pulsar 客户端相同的安全配置。 您可以通过将 spring.pulsar.client.authentication 替换为 spring.pulsar.admin.authentication 来使用上述 认证配置

提示:要在启动时创建主题,请添加一个类型为 PulsarTopic 的 bean。 如果主题已存在,该 bean 将被忽略。

发送消息

Spring 的 PulsarTemplate 会被自动配置,您可以使用它来发送消息,如下例所示:

  • Java

  • Kotlin

import org.springframework.pulsar.core.PulsarTemplate;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	private final PulsarTemplate<String> pulsarTemplate;

	public MyBean(PulsarTemplate<String> pulsarTemplate) {
		this.pulsarTemplate = pulsarTemplate;
	}

	public void someMethod() {
		this.pulsarTemplate.send("someTopic", "Hello");
	}

}
import org.apache.pulsar.client.api.PulsarClientException
import org.springframework.pulsar.core.PulsarTemplate
import org.springframework.stereotype.Component

@Component
class MyBean(private val pulsarTemplate: PulsarTemplate<String>) {

	@Throws(PulsarClientException::class)
	fun someMethod() {
		pulsarTemplate.send("someTopic", "Hello")
	}

}

PulsarTemplate 依赖于 PulsarProducerFactory 来创建底层的 Pulsar 生产者。 Spring Boot 自动配置也提供了这个生产者工厂,默认情况下,它会缓存它创建的生产者。 您可以通过指定任何以 spring.pulsar.producer.*spring.pulsar.producer.cache.* 为前缀的应用程序属性来配置生产者工厂和缓存设置。

如果您需要对生产者工厂配置进行更多控制,请考虑注册一个或多个 ProducerBuilderCustomizer bean。 这些自定义器将应用于所有创建的生产者。 您也可以在发送消息时传入 ProducerBuilderCustomizer 以仅影响当前生产者。

如果您需要对正在发送的消息进行更多控制,您可以在发送消息时传入 TypedMessageBuilderCustomizer

响应式发送消息

当响应式自动配置被激活时,Spring 的 ReactivePulsarTemplate 会被自动配置,您可以使用它来发送消息,如下例所示:

  • Java

  • Kotlin

import org.springframework.pulsar.reactive.core.ReactivePulsarTemplate;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	private final ReactivePulsarTemplate<String> pulsarTemplate;

	public MyBean(ReactivePulsarTemplate<String> pulsarTemplate) {
		this.pulsarTemplate = pulsarTemplate;
	}

	public void someMethod() {
		this.pulsarTemplate.send("someTopic", "Hello").subscribe();
	}

}
import org.springframework.pulsar.reactive.core.ReactivePulsarTemplate
import org.springframework.stereotype.Component

@Component
class MyBean(private val pulsarTemplate: ReactivePulsarTemplate<String>) {

	fun someMethod() {
		pulsarTemplate.send("someTopic", "Hello").subscribe()
	}

}

ReactivePulsarTemplate 依赖于 ReactivePulsarSenderFactory 来实际创建底层的发送者。 Spring Boot 自动配置也提供了这个发送者工厂,默认情况下,它会缓存它创建的生产者。 您可以通过指定任何以 spring.pulsar.producer.*spring.pulsar.producer.cache.* 为前缀的应用程序属性来配置发送者工厂和缓存设置。

如果您需要对发送者工厂配置进行更多控制,请考虑注册一个或多个 ReactiveMessageSenderBuilderCustomizer bean。 这些自定义器将应用于所有创建的发送者。 您也可以在发送消息时传入 ReactiveMessageSenderBuilderCustomizer 以仅影响当前发送者。

如果您需要对正在发送的消息进行更多控制,您可以在发送消息时传入 MessageSpecBuilderCustomizer

接收消息

当 Apache Pulsar 基础设施存在时,任何 bean 都可以用 @PulsarListener 注解来创建监听器端点。 以下组件在 someTopic 主题上创建了一个监听器端点:

  • Java

  • Kotlin

import org.springframework.pulsar.annotation.PulsarListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	@PulsarListener(topics = "someTopic")
	public void processMessage(String content) {
		// ...
	}

}
import org.springframework.pulsar.annotation.PulsarListener
import org.springframework.stereotype.Component

@Component
class MyBean {

	@PulsarListener(topics = ["someTopic"])
	fun processMessage(content: String?) {
		// ...
	}

}

Spring Boot 自动配置提供了 PulsarListener 所需的所有组件,例如 PulsarListenerContainerFactory 和它用来构建底层 Pulsar 消费者的消费者工厂。 您可以通过指定任何以 spring.pulsar.listener.*spring.pulsar.consumer.* 为前缀的应用程序属性来配置这些组件。

如果您需要对消费者工厂的配置进行更多控制,请考虑注册一个或多个 ConsumerBuilderCustomizer bean。 这些自定义器将应用于工厂创建的所有消费者,因此也应用于所有 @PulsarListener 实例。 您也可以通过设置 @PulsarListener 注解的 consumerCustomizer 属性来自定义单个监听器。

如果您需要对实际容器工厂配置进行更多控制,请考虑注册一个或多个 PulsarContainerFactoryCustomizer<ConcurrentPulsarListenerContainerFactory<?>> bean。

响应式接收消息

当 Apache Pulsar 基础设施存在且响应式自动配置被激活时,任何 bean 都可以用 @ReactivePulsarListener 注解来创建响应式监听器端点。 以下组件在 someTopic 主题上创建了一个响应式监听器端点:

  • Java

  • Kotlin

import reactor.core.publisher.Mono;

import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	@ReactivePulsarListener(topics = "someTopic")
	public Mono<Void> processMessage(String content) {
		// ...
		return Mono.empty();
	}

}
import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListener
import org.springframework.stereotype.Component
import reactor.core.publisher.Mono

@Component
class MyBean {

	@ReactivePulsarListener(topics = ["someTopic"])
	fun processMessage(content: String?): Mono<Void> {
		// ...
		return Mono.empty()
	}

}

Spring Boot 自动配置提供了 ReactivePulsarListener 所需的所有组件,例如 ReactivePulsarListenerContainerFactory 和它用来构建底层响应式 Pulsar 消费者的消费者工厂。 您可以通过指定任何以 spring.pulsar.listener.*spring.pulsar.consumer.* 为前缀的应用程序属性来配置这些组件。

如果您需要对消费者工厂的配置进行更多控制,请考虑注册一个或多个 ReactiveMessageConsumerBuilderCustomizer bean。 这些自定义器将应用于工厂创建的所有消费者,因此也应用于所有 @ReactivePulsarListener 实例。 您也可以通过设置 @ReactivePulsarListener 注解的 consumerCustomizer 属性来自定义单个监听器。

如果您需要对实际容器工厂配置进行更多控制,请考虑注册一个或多个 PulsarContainerFactoryCustomizer<DefaultReactivePulsarListenerContainerFactory<?>> bean。

读取消息

Pulsar reader 接口使应用程序能够手动管理游标。 当您使用 reader 连接到主题时,您需要指定 reader 在连接到主题时从哪条消息开始读取。

当 Apache Pulsar 基础设施存在时,任何 bean 都可以用 @PulsarReader 注解来使用 reader 消费消息。 以下组件创建了一个从 someTopic 主题开始处开始读取消息的 reader 端点:

  • Java

  • Kotlin

import org.springframework.pulsar.annotation.PulsarReader;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	@PulsarReader(topics = "someTopic", startMessageId = "earliest")
	public void processMessage(String content) {
		// ...
	}

}
import org.springframework.pulsar.annotation.PulsarReader
import org.springframework.stereotype.Component

@Component
class MyBean {

	@PulsarReader(topics = ["someTopic"], startMessageId = "earliest")
	fun processMessage(content: String?) {
		// ...
	}

}

@PulsarReader 依赖于 PulsarReaderFactory 来创建底层的 Pulsar reader。 Spring Boot 自动配置提供了这个 reader 工厂,可以通过设置任何以 spring.pulsar.reader.* 为前缀的应用程序属性来自定义。

如果您需要对 reader 工厂的配置进行更多控制,请考虑注册一个或多个 ReaderBuilderCustomizer bean。 这些自定义器将应用于工厂创建的所有 reader,因此也应用于所有 @PulsarReader 实例。 您也可以通过设置 @PulsarReader 注解的 readerCustomizer 属性来自定义单个监听器。

如果您需要对实际容器工厂配置进行更多控制,请考虑注册一个或多个 PulsarContainerFactoryCustomizer<DefaultPulsarReaderContainerFactory<?>> bean。

响应式读取消息

当 Apache Pulsar 基础设施存在且响应式自动配置被激活时,Spring 的 ReactivePulsarReaderFactory 会被提供,您可以使用它来创建 reader 以便以响应式方式读取消息。 以下组件使用提供的工厂创建了一个 reader,并从 5 分钟前开始从 someTopic 主题读取单条消息:

  • Java

  • Kotlin

import java.time.Instant;
import java.util.List;

import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.reactive.client.api.StartAtSpec;
import reactor.core.publisher.Mono;

import org.springframework.pulsar.reactive.core.ReactiveMessageReaderBuilderCustomizer;
import org.springframework.pulsar.reactive.core.ReactivePulsarReaderFactory;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	private final ReactivePulsarReaderFactory<String> pulsarReaderFactory;

	public MyBean(ReactivePulsarReaderFactory<String> pulsarReaderFactory) {
		this.pulsarReaderFactory = pulsarReaderFactory;
	}

	public void someMethod() {
		ReactiveMessageReaderBuilderCustomizer<String> readerBuilderCustomizer = (readerBuilder) -> readerBuilder
			.topic("someTopic")
			.startAtSpec(StartAtSpec.ofInstant(Instant.now().minusSeconds(5)));
		Mono<Message<String>> message = this.pulsarReaderFactory
			.createReader(Schema.STRING, List.of(readerBuilderCustomizer))
			.readOne();
		// ...
	}

}
import org.apache.pulsar.client.api.Schema
import org.apache.pulsar.reactive.client.api.ReactiveMessageReaderBuilder
import org.apache.pulsar.reactive.client.api.StartAtSpec
import org.springframework.pulsar.reactive.core.ReactiveMessageReaderBuilderCustomizer
import org.springframework.pulsar.reactive.core.ReactivePulsarReaderFactory
import org.springframework.stereotype.Component
import java.time.Instant

@Component
class MyBean(private val pulsarReaderFactory: ReactivePulsarReaderFactory<String>) {

	fun someMethod() {
		val readerBuilderCustomizer = ReactiveMessageReaderBuilderCustomizer {
			readerBuilder: ReactiveMessageReaderBuilder<String> ->
				readerBuilder
					.topic("someTopic")
					.startAtSpec(StartAtSpec.ofInstant(Instant.now().minusSeconds(5)))
		}
		val message = pulsarReaderFactory
				.createReader(Schema.STRING, listOf(readerBuilderCustomizer))
				.readOne()
		// ...
	}

}

Spring Boot 自动配置提供了这个 reader 工厂,可以通过设置任何以 spring.pulsar.reader.* 为前缀的应用程序属性来自定义。

如果您需要对 reader 工厂配置进行更多控制,请考虑在使用工厂创建 reader 时传入一个或多个 ReactiveMessageReaderBuilderCustomizer 实例。

如果您需要对 reader 工厂配置进行更多控制,请考虑注册一个或多个 ReactiveMessageReaderBuilderCustomizer bean。 这些自定义器将应用于所有创建的 reader。 您也可以在创建 reader 时传入一个或多个 ReactiveMessageReaderBuilderCustomizer 以仅将自定义应用于创建的 reader。

提示:有关上述任何组件的更多详细信息以及发现其他可用功能,请参阅 Spring for Apache Pulsar 参考文档

事务支持

Spring for Apache Pulsar 在使用 PulsarTemplate@PulsarListener 时支持事务。

注意:当前在使用响应式变体时不支持事务。

spring.pulsar.transaction.enabled 属性设置为 true 将:

@PulsarListenertransactional 属性可用于微调何时在监听器中使用事务。

如果您需要对 Spring for Apache Pulsar 事务功能进行更多控制,您应该定义自己的 PulsarTemplate 和/或 ConcurrentPulsarListenerContainerFactory bean。 如果默认的自动配置的 PulsarTransactionManager 不合适,您也可以定义 PulsarAwareTransactionManager bean。

其他 Pulsar 属性

自动配置支持的属性显示在附录的 集成属性 部分。 请注意,在大多数情况下,这些属性(连字符或驼峰命名法)直接映射到 Apache Pulsar 配置属性。 有关详细信息,请参阅 Apache Pulsar 文档。

只有一部分 Pulsar 支持的属性可以通过 PulsarProperties 类直接使用。 如果您希望使用未直接支持的其他属性来调整自动配置的组件,您可以使用上述每个组件支持的自定义器。