Apache Kafka 支持

通过对 spring-kafka 项目的自动配置,Spring Boot 支持 Apache Kafka

Kafka 的配置通过 spring.kafka.* 外部配置属性控制。 例如,你可以在 application.properties 中声明如下配置:

  • Properties

  • YAML

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup
spring:
  kafka:
    bootstrap-servers: "localhost:9092"
    consumer:
      group-id: "myGroup"
如需在启动时创建 topic,可添加类型为 NewTopic 的 bean。 如果 topic 已存在,则该 bean 会被忽略。

更多支持选项请参见 KafkaProperties

发送消息

Spring 的 KafkaTemplate 已自动配置,你可以像下面这样直接注入到自己的 bean 中:

  • Java

  • Kotlin

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	private final KafkaTemplate<String, String> kafkaTemplate;

	public MyBean(KafkaTemplate<String, String> kafkaTemplate) {
		this.kafkaTemplate = kafkaTemplate;
	}

	// ...

	public void someMethod() {
		this.kafkaTemplate.send("someTopic", "Hello");
	}

}
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.stereotype.Component

@Component
class MyBean(private val kafkaTemplate: KafkaTemplate<String, String>) {

	// ...

	fun someMethod() {
		kafkaTemplate.send("someTopic", "Hello")
	}

}
如果定义了 spring.kafka.producer.transaction-id-prefix 属性,则会自动配置 KafkaTransactionManager。 同时,如果定义了 RecordMessageConverter bean,也会自动关联到自动配置的 KafkaTemplate

接收消息

当 Apache Kafka 基础设施存在时,任何 bean 都可以通过注解 @KafkaListener 创建监听端点。 如果未定义 KafkaListenerContainerFactory,则会自动配置一个默认工厂,相关配置项在 spring.kafka.listener.*

以下组件会在 someTopic topic 上创建一个监听端点:

  • Java

  • Kotlin

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	@KafkaListener(topics = "someTopic")
	public void processMessage(String content) {
		// ...
	}

}
import org.springframework.kafka.annotation.KafkaListener
import org.springframework.stereotype.Component

@Component
class MyBean {

	@KafkaListener(topics = ["someTopic"])
	fun processMessage(content: String?) {
		// ...
	}

}

如果定义了 KafkaTransactionManager bean,则会自动关联到容器工厂。 同样,如果定义了 RecordFilterStrategyCommonErrorHandlerAfterRollbackProcessorConsumerAwareRebalanceListener bean,也会自动关联到默认工厂。

根据监听器类型,RecordMessageConverterBatchMessageConverter bean 会关联到默认工厂。 如果仅存在 RecordMessageConverter bean 且为批量监听器,则会被包装为 BatchMessageConverter

自定义的 ChainedKafkaTransactionManager 通常引用自动配置的 KafkaTransactionManager bean,必须标记为 @Primary

Kafka Streams

Spring for Apache Kafka 提供了一个工厂 bean 用于创建 StreamsBuilder 对象并管理其流的生命周期。 只要 classpath 上有 kafka-streams 且通过 @EnableKafkaStreams 注解启用,Spring Boot 会自动配置所需的 KafkaStreamsConfiguration bean。

启用 Kafka Streams 意味着必须设置 application id 和 bootstrap servers。 前者可通过 spring.kafka.streams.application-id 配置,若未设置则默认为 spring.application.name。 后者可全局设置,也可仅为 streams 单独覆盖。

还有若干专用属性可用,其他任意 Kafka 属性可通过 spring.kafka.streams.properties 命名空间设置。 更多信息参见 其他 Kafka 属性

要使用工厂 bean,可将 StreamsBuilder 注入到 @Bean,如下所示:

  • Java

  • Kotlin

import java.util.Locale;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.support.serializer.JsonSerde;

@Configuration(proxyBeanMethods = false)
@EnableKafkaStreams
public class MyKafkaStreamsConfiguration {

	@Bean
	public KStream<Integer, String> kStream(StreamsBuilder streamsBuilder) {
		KStream<Integer, String> stream = streamsBuilder.stream("ks1In");
		stream.map(this::uppercaseValue).to("ks1Out", Produced.with(Serdes.Integer(), new JsonSerde<>()));
		return stream;
	}

	private KeyValue<Integer, String> uppercaseValue(Integer key, String value) {
		return new KeyValue<>(key, value.toUpperCase(Locale.getDefault()));
	}

}
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.kstream.KStream
import org.apache.kafka.streams.kstream.Produced
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.annotation.EnableKafkaStreams
import org.springframework.kafka.support.serializer.JsonSerde

@Configuration(proxyBeanMethods = false)
@EnableKafkaStreams
class MyKafkaStreamsConfiguration {

	@Bean
	fun kStream(streamsBuilder: StreamsBuilder): KStream<Int, String> {
		val stream = streamsBuilder.stream<Int, String>("ks1In")
		stream.map(this::uppercaseValue).to("ks1Out", Produced.with(Serdes.Integer(), JsonSerde()))
		return stream
	}

	private fun uppercaseValue(key: Int, value: String): KeyValue<Int?, String?> {
		return KeyValue(key, value.uppercase())
	}

}

默认情况下,由 StreamsBuilder 管理的流会自动启动。 你可以通过 spring.kafka.streams.auto-startup 属性自定义此行为。

其他 Kafka 属性

自动配置支持的属性见附录 集成属性 部分。 大多数情况下,这些属性(无论是连字符还是驼峰命名)都直接映射到 Apache Kafka 的点分属性。 详情请参见 Apache Kafka 官方文档。

未包含客户端类型(producerconsumeradminstreams)的属性视为通用属性,适用于所有客户端。 如有需要,大多数通用属性可为一个或多个客户端类型单独覆盖。

Apache Kafka 将属性分为 HIGH、MEDIUM 和 LOW 三个重要级别。 Spring Boot 自动配置支持所有 HIGH 重要级别属性,部分 MEDIUM 和 LOW 属性,以及所有无默认值的属性。

Kafka 支持的属性中,只有一部分可通过 KafkaProperties 类直接配置。 如需为各客户端类型配置更多未直接支持的属性,可使用如下属性:

  • Properties

  • YAML

spring.kafka.properties[prop.one]=first
spring.kafka.admin.properties[prop.two]=second
spring.kafka.consumer.properties[prop.three]=third
spring.kafka.producer.properties[prop.four]=fourth
spring.kafka.streams.properties[prop.five]=fifth
spring:
  kafka:
    properties:
      "[prop.one]": "first"
    admin:
      properties:
        "[prop.two]": "second"
    consumer:
      properties:
        "[prop.three]": "third"
    producer:
      properties:
        "[prop.four]": "fourth"
    streams:
      properties:
        "[prop.five]": "fifth"

这会将通用的 prop.one Kafka 属性设置为 first(适用于 producer、consumer、admin 和 streams),prop.two 设置为 admin 属性,prop.three 设置为 consumer 属性,prop.four 设置为 producer 属性,prop.five 设置为 streams 属性。

你还可以如下配置 Spring Kafka 的 JsonDeserializer

  • Properties

  • YAML

spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties[spring.json.value.default.type]=com.example.Invoice
spring.kafka.consumer.properties[spring.json.trusted.packages]=com.example.main,com.example.another
spring:
  kafka:
    consumer:
      value-deserializer: "org.springframework.kafka.support.serializer.JsonDeserializer"
      properties:
        "[spring.json.value.default.type]": "com.example.Invoice"
        "[spring.json.trusted.packages]": "com.example.main,com.example.another"

同样,你可以禁用 JsonSerializer 默认发送类型信息到 header 的行为:

  • Properties

  • YAML

spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties[spring.json.add.type.headers]=false
spring:
  kafka:
    producer:
      value-serializer: "org.springframework.kafka.support.serializer.JsonSerializer"
      properties:
        "[spring.json.add.type.headers]": false
通过这种方式设置的属性会覆盖 Spring Boot 明确支持的任何配置项。

使用嵌入式 Kafka 进行测试

Spring for Apache Kafka 提供了便捷方式,可通过嵌入式 Apache Kafka broker 测试项目。 要使用此功能,在测试类上添加 spring-kafka-test 模块中的 @EmbeddedKafka 注解。 更多信息请参见 Spring for Apache Kafka 参考手册

要让 Spring Boot 自动配置与上述嵌入式 Apache Kafka broker 协同工作,需要将嵌入式 broker 地址(由 EmbeddedKafkaBroker 提供)映射到 Spring Boot 的 Apache Kafka 配置属性。 有几种方式可以实现:

  • 在测试类中通过系统属性将嵌入式 broker 地址映射到 spring.kafka.bootstrap-servers

  • Java

  • Kotlin

	static {
		System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers");
	}
	init {
		System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers")
	}
  • Java

  • Kotlin

import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.test.context.EmbeddedKafka;

@SpringBootTest
@EmbeddedKafka(topics = "someTopic", bootstrapServersProperty = "spring.kafka.bootstrap-servers")
class MyTest {

	// ...

}
import org.springframework.boot.test.context.SpringBootTest
import org.springframework.kafka.test.context.EmbeddedKafka

@SpringBootTest
@EmbeddedKafka(topics = ["someTopic"], bootstrapServersProperty = "spring.kafka.bootstrap-servers")
class MyTest {

	// ...

}
  • 在配置属性中使用占位符:

  • Properties

  • YAML

spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}
spring:
  kafka:
    bootstrap-servers: "${spring.embedded.kafka.brokers}"