Apache Pulsar 支持

Apache Pulsar 通过为 Spring for Apache Pulsar 项目提供自动配置获得支持。

org.springframework.pulsar:spring-pulsar 在 classpath 上时,Spring Boot 会自动配置并注册经典(命令式)的 Spring for Apache Pulsar 组件。 当 org.springframework.pulsar:spring-pulsar-reactive 在 classpath 上时,也会自动配置响应式组件。

分别提供了 spring-boot-starter-pulsarspring-boot-starter-pulsar-reactive 启动器,便于收集命令式和响应式使用所需的依赖。

连接 Pulsar

使用 Pulsar starter 时,Spring Boot 会自动配置并注册一个 PulsarClient bean。

默认情况下,应用会尝试连接到本地的 Pulsar 实例 pulsar://localhost:6650。 可以通过设置 spring.pulsar.client.service-url 属性为其他值来调整。

该值必须是有效的 Pulsar 协议 URL。

你可以通过指定任何以 spring.pulsar.client.* 为前缀的应用属性来配置客户端。

如需更细致的配置控制,可注册一个或多个 PulsarClientBuilderCustomizer bean。

认证

若需连接到需要认证的 Pulsar 集群,需要通过设置 pluginClassName 及插件所需参数来指定认证插件。 可将参数设置为参数名到参数值的映射。 以下示例展示了如何配置 AuthenticationOAuth2 插件。

  • Properties

  • YAML

spring.pulsar.client.authentication.plugin-class-name=org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
spring.pulsar.client.authentication.param.issuerUrl=https://auth.server.cloud/
spring.pulsar.client.authentication.param.privateKey=file:///Users/some-key.json
spring.pulsar.client.authentication.param.audience=urn:sn:acme:dev:my-instance
spring:
  pulsar:
    client:
      authentication:
        plugin-class-name: org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
        param:
          issuerUrl: https://auth.server.cloud/
          privateKey: file:///Users/some-key.json
          audience: urn:sn:acme:dev:my-instance

你需要确保 spring.pulsar.client.authentication.param.* 下定义的名称与认证插件所需名称完全一致(通常为驼峰命名)。 Spring Boot 不会对这些条目进行任何形式的松散绑定。

例如,若要为 AuthenticationOAuth2 认证插件配置 issuer url,必须使用 spring.pulsar.client.authentication.param.issuerUrl。 如果使用 issuerurlissuer-url 等其他形式,设置不会应用到插件。

这种缺乏松散绑定也导致使用环境变量配置认证参数时存在问题,因为在转换过程中会丢失大小写敏感性。 如果你使用环境变量配置参数,请参考 Spring for Apache Pulsar 文档 相关步骤 以确保正常工作。

SSL

默认情况下,Pulsar 客户端与 Pulsar 服务之间以明文通信。 你可以参考 Spring for Apache Pulsar 文档 相关步骤 启用 TLS 加密。

关于客户端和认证的完整细节,请参见 Spring for Apache Pulsar 参考文档

响应式连接 Pulsar

当响应式自动配置被激活时,Spring Boot 会自动配置并注册一个 ReactivePulsarClient bean。

ReactivePulsarClient 会适配前述的 PulsarClient 实例。 因此,请参考上一节配置 PulsarClient,以供 ReactivePulsarClient 使用。

连接 Pulsar 管理端

Spring for Apache Pulsar 的 PulsarAdministration 客户端也会被自动配置。

默认情况下,应用会尝试连接到本地 Pulsar 实例 http://localhost:8080。 可以通过设置 spring.pulsar.admin.service-url 属性为 (http|https)://<host>:<port> 形式的其他值来调整。

如需更细致的配置控制,可注册一个或多个 PulsarAdminBuilderCustomizer bean。

认证

访问需要认证的 Pulsar 集群时,管理端客户端需要与常规 Pulsar 客户端相同的安全配置。 你可以将上述 认证配置 中的 spring.pulsar.client.authentication 替换为 spring.pulsar.admin.authentication

若需在启动时创建主题,可添加一个类型为 PulsarTopic 的 bean。 如果主题已存在,该 bean 会被忽略。

发送消息

Spring 的 PulsarTemplate 会被自动配置,你可以用它发送消息,如下例所示:

  • Java

  • Kotlin

import org.springframework.pulsar.core.PulsarTemplate;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	private final PulsarTemplate<String> pulsarTemplate;

	public MyBean(PulsarTemplate<String> pulsarTemplate) {
		this.pulsarTemplate = pulsarTemplate;
	}

	public void someMethod() {
		this.pulsarTemplate.send("someTopic", "Hello");
	}

}
import org.apache.pulsar.client.api.PulsarClientException
import org.springframework.pulsar.core.PulsarTemplate
import org.springframework.stereotype.Component

@Component
class MyBean(private val pulsarTemplate: PulsarTemplate<String>) {

	@Throws(PulsarClientException::class)
	fun someMethod() {
		pulsarTemplate.send("someTopic", "Hello")
	}

}

PulsarTemplate 依赖 PulsarProducerFactory 创建底层 Pulsar producer。 Spring Boot 自动配置也会提供该 producer 工厂,默认情况下会缓存其创建的 producer。 你可以通过指定任何以 spring.pulsar.producer.spring.pulsar.producer.cache. 为前缀的应用属性来配置 producer 工厂及缓存设置。

如需更细致的 producer 工厂配置控制,可注册一个或多个 ProducerBuilderCustomizer bean。 这些定制器会应用于所有创建的 producer。 你也可以在发送消息时传入 ProducerBuilderCustomizer,仅影响当前 producer。

如需更细致地控制发送的消息,可在发送消息时传入 TypedMessageBuilderCustomizer

响应式发送消息

当响应式自动配置被激活时,Spring 的 ReactivePulsarTemplate 会被自动配置,你可以用它发送消息,如下例所示:

  • Java

  • Kotlin

import org.springframework.pulsar.reactive.core.ReactivePulsarTemplate;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	private final ReactivePulsarTemplate<String> pulsarTemplate;

	public MyBean(ReactivePulsarTemplate<String> pulsarTemplate) {
		this.pulsarTemplate = pulsarTemplate;
	}

	public void someMethod() {
		this.pulsarTemplate.send("someTopic", "Hello").subscribe();
	}

}
import org.springframework.pulsar.reactive.core.ReactivePulsarTemplate
import org.springframework.stereotype.Component

@Component
class MyBean(private val pulsarTemplate: ReactivePulsarTemplate<String>) {

	fun someMethod() {
		pulsarTemplate.send("someTopic", "Hello").subscribe()
	}

}

ReactivePulsarTemplate 依赖 ReactivePulsarSenderFactory 实际创建底层 sender。 Spring Boot 自动配置也会提供该 sender 工厂,默认情况下会缓存其创建的 producer。 你可以通过指定任何以 spring.pulsar.producer.spring.pulsar.producer.cache. 为前缀的应用属性来配置 sender 工厂及缓存设置。

如需更细致的 sender 工厂配置控制,可注册一个或多个 ReactiveMessageSenderBuilderCustomizer bean。 这些定制器会应用于所有创建的 sender。 你也可以在发送消息时传入 ReactiveMessageSenderBuilderCustomizer,仅影响当前 sender。

如需更细致地控制发送的消息,可在发送消息时传入 MessageSpecBuilderCustomizer

接收消息

当 Apache Pulsar 基础设施存在时,任何 bean 都可以通过注解 @PulsarListener 创建监听端点。 下列组件会在 someTopic 主题上创建监听端点:

  • Java

  • Kotlin

import org.springframework.pulsar.annotation.PulsarListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	@PulsarListener(topics = "someTopic")
	public void processMessage(String content) {
		// ...
	}

}
import org.springframework.pulsar.annotation.PulsarListener
import org.springframework.stereotype.Component

@Component
class MyBean {

	@PulsarListener(topics = ["someTopic"])
	fun processMessage(content: String?) {
		// ...
	}

}

Spring Boot 自动配置会提供 PulsarListener 所需的全部组件,如 PulsarListenerContainerFactory 及其用于构建底层 Pulsar consumer 的工厂。 你可以通过指定任何以 spring.pulsar.listener.spring.pulsar.consumer. 为前缀的应用属性来配置这些组件。

如需更细致的 consumer 工厂配置控制,可注册一个或多个 ConsumerBuilderCustomizer bean。 这些定制器会应用于工厂创建的所有 consumer,因此也会应用于所有 @PulsarListener 实例。 你也可以通过设置 @PulsarListener 注解的 consumerCustomizer 属性来定制单个监听器。

如需更细致地控制实际的容器工厂配置,可注册一个或多个 PulsarContainerFactoryCustomizer<ConcurrentPulsarListenerContainerFactory<?>> bean。

响应式接收消息

当 Apache Pulsar 基础设施存在且响应式自动配置被激活时,任何 bean 都可以通过注解 @ReactivePulsarListener 创建响应式监听端点。 下列组件会在 someTopic 主题上创建响应式监听端点:

  • Java

  • Kotlin

import reactor.core.publisher.Mono;

import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	@ReactivePulsarListener(topics = "someTopic")
	public Mono<Void> processMessage(String content) {
		// ...
		return Mono.empty();
	}

}
import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListener
import org.springframework.stereotype.Component
import reactor.core.publisher.Mono

@Component
class MyBean {

	@ReactivePulsarListener(topics = ["someTopic"])
	fun processMessage(content: String?): Mono<Void> {
		// ...
		return Mono.empty()
	}

}

Spring Boot 自动配置会提供 ReactivePulsarListener 所需的全部组件,如 ReactivePulsarListenerContainerFactory 及其用于构建底层响应式 Pulsar consumer 的工厂。 你可以通过指定任何以 spring.pulsar.listener.spring.pulsar.consumer. 为前缀的应用属性来配置这些组件。

如需更细致的 consumer 工厂配置控制,可注册一个或多个 ReactiveMessageConsumerBuilderCustomizer bean。 这些定制器会应用于工厂创建的所有 consumer,因此也会应用于所有 @ReactivePulsarListener 实例。 你也可以通过设置 @ReactivePulsarListener 注解的 consumerCustomizer 属性来定制单个监听器。

如需更细致地控制实际的容器工厂配置,可注册一个或多个 PulsarContainerFactoryCustomizer<DefaultReactivePulsarListenerContainerFactory<?>> bean。

读取消息

Pulsar reader 接口使应用能够手动管理游标。 使用 reader 连接到主题时,需要指定 reader 连接时开始读取的消息。

当 Apache Pulsar 基础设施存在时,任何 bean 都可以通过注解 @PulsarReader 使用 reader 消费消息。 下列组件会创建一个从 someTopic 主题开头开始读取消息的 reader 端点:

  • Java

  • Kotlin

import org.springframework.pulsar.annotation.PulsarReader;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	@PulsarReader(topics = "someTopic", startMessageId = "earliest")
	public void processMessage(String content) {
		// ...
	}

}
import org.springframework.pulsar.annotation.PulsarReader
import org.springframework.stereotype.Component

@Component
class MyBean {

	@PulsarReader(topics = ["someTopic"], startMessageId = "earliest")
	fun processMessage(content: String?) {
		// ...
	}

}

@PulsarReader 依赖 PulsarReaderFactory 创建底层 Pulsar reader。 Spring Boot 自动配置会提供该 reader 工厂,可通过设置任何以 spring.pulsar.reader.* 为前缀的应用属性进行定制。

如需更细致的 reader 工厂配置控制,可注册一个或多个 ReaderBuilderCustomizer bean。 这些定制器会应用于工厂创建的所有 reader,因此也会应用于所有 @PulsarReader 实例。 你也可以通过设置 @PulsarReader 注解的 readerCustomizer 属性来定制单个监听器。

如需更细致地控制实际的容器工厂配置,可注册一个或多个 PulsarContainerFactoryCustomizer<DefaultPulsarReaderContainerFactory<?>> bean。

响应式读取消息

当 Apache Pulsar 基础设施存在且响应式自动配置被激活时,Spring 的 ReactivePulsarReaderFactory 会被提供,你可以用它以响应式方式读取消息。 下列组件会使用提供的工厂创建 reader,并从 someTopic 主题的5分钟前读取一条消息:

  • Java

  • Kotlin

import java.time.Instant;
import java.util.List;

import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.reactive.client.api.StartAtSpec;
import reactor.core.publisher.Mono;

import org.springframework.pulsar.reactive.core.ReactiveMessageReaderBuilderCustomizer;
import org.springframework.pulsar.reactive.core.ReactivePulsarReaderFactory;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	private final ReactivePulsarReaderFactory<String> pulsarReaderFactory;

	public MyBean(ReactivePulsarReaderFactory<String> pulsarReaderFactory) {
		this.pulsarReaderFactory = pulsarReaderFactory;
	}

	public void someMethod() {
		ReactiveMessageReaderBuilderCustomizer<String> readerBuilderCustomizer = (readerBuilder) -> readerBuilder
			.topic("someTopic")
			.startAtSpec(StartAtSpec.ofInstant(Instant.now().minusSeconds(5)));
		Mono<Message<String>> message = this.pulsarReaderFactory
			.createReader(Schema.STRING, List.of(readerBuilderCustomizer))
			.readOne();
		// ...
	}

}
import org.apache.pulsar.client.api.Schema
import org.apache.pulsar.reactive.client.api.ReactiveMessageReaderBuilder
import org.apache.pulsar.reactive.client.api.StartAtSpec
import org.springframework.pulsar.reactive.core.ReactiveMessageReaderBuilderCustomizer
import org.springframework.pulsar.reactive.core.ReactivePulsarReaderFactory
import org.springframework.stereotype.Component
import java.time.Instant

@Component
class MyBean(private val pulsarReaderFactory: ReactivePulsarReaderFactory<String>) {

	fun someMethod() {
		val readerBuilderCustomizer = ReactiveMessageReaderBuilderCustomizer {
			readerBuilder: ReactiveMessageReaderBuilder<String> ->
				readerBuilder
					.topic("someTopic")
					.startAtSpec(StartAtSpec.ofInstant(Instant.now().minusSeconds(5)))
		}
		val message = pulsarReaderFactory
				.createReader(Schema.STRING, listOf(readerBuilderCustomizer))
				.readOne()
		// ...
	}

}

Spring Boot 自动配置会提供该 reader 工厂,可通过设置任何以 spring.pulsar.reader.* 为前缀的应用属性进行定制。

如需更细致的 reader 工厂配置控制,可在使用工厂创建 reader 时传入一个或多个 ReactiveMessageReaderBuilderCustomizer 实例。

如需更细致的 reader 工厂配置控制,可注册一个或多个 ReactiveMessageReaderBuilderCustomizer bean。 这些定制器会应用于所有创建的 reader。 你也可以在创建 reader 时传入一个或多个 ReactiveMessageReaderBuilderCustomizer,仅对所创建的 reader 应用定制。

关于上述任一组件的更多细节及其他可用特性,请参见 Spring for Apache Pulsar 参考文档

事务支持

Spring for Apache Pulsar 在使用 PulsarTemplate@PulsarListener 时支持事务。

响应式变体当前尚不支持事务。

spring.pulsar.transaction.enabled 属性设置为 true 将会:

@PulsarListenertransactional 属性可用于细粒度控制监听器何时使用事务。

如需更细致地控制 Spring for Apache Pulsar 的事务特性,可自定义 PulsarTemplate 和/或 ConcurrentPulsarListenerContainerFactory bean。 如果默认自动配置的 PulsarTransactionManager 不适用,也可以自定义 PulsarAwareTransactionManager bean。

其他 Pulsar 属性

自动配置支持的属性见附录 集成属性 部分。 注意,这些属性(无论是连字符还是驼峰命名)大多直接映射到 Apache Pulsar 的配置属性。 详情请参见 Apache Pulsar 官方文档。

Pulsar 支持的属性中,只有一部分可通过 PulsarProperties 类直接获取。 如需用额外属性调优自动配置组件,可使用上述各组件支持的定制器。