Apache Kafka is supported by providing auto-configuration of the spring-kafka
project.
Kafka configuration is controlled by external configuration properties in spring.kafka.*
.
For example, you might declare the following section in application.properties
:
spring:
kafka:
bootstrap-servers: "localhost:9092"
consumer:
group-id: "myGroup"
Tip
|
To create a topic on startup, add a bean of type NewTopic .
If the topic already exists, the bean is ignored.
|
See {spring-boot-autoconfigure-module-code}/kafka/KafkaProperties.java[KafkaProperties
] for more supported options.
Spring’s KafkaTemplate
is auto-configured, and you can autowire it directly in your own beans, as shown in the following example:
link:{docs-java}/messaging/kafka/sending/MyBean.java[role=include]
Note
|
If the property configprop:spring.kafka.producer.transaction-id-prefix[] is defined, a KafkaTransactionManager is automatically configured.
Also, if a RecordMessageConverter bean is defined, it is automatically associated to the auto-configured KafkaTemplate .
|
When the Apache Kafka infrastructure is present, any bean can be annotated with @KafkaListener
to create a listener endpoint.
If no KafkaListenerContainerFactory
has been defined, a default one is automatically configured with keys defined in spring.kafka.listener.*
.
The following component creates a listener endpoint on the someTopic
topic:
link:{docs-java}/messaging/kafka/receiving/MyBean.java[role=include]
If a KafkaTransactionManager
bean is defined, it is automatically associated to the container factory.
Similarly, if a RecordFilterStrategy
, CommonErrorHandler
, AfterRollbackProcessor
or ConsumerAwareRebalanceListener
bean is defined, it is automatically associated to the default factory.
Depending on the listener type, a RecordMessageConverter
or BatchMessageConverter
bean is associated to the default factory.
If only a RecordMessageConverter
bean is present for a batch listener, it is wrapped in a BatchMessageConverter
.
Tip
|
A custom ChainedKafkaTransactionManager must be marked @Primary as it usually references the auto-configured KafkaTransactionManager bean.
|
Spring for Apache Kafka provides a factory bean to create a StreamsBuilder
object and manage the lifecycle of its streams.
Spring Boot auto-configures the required KafkaStreamsConfiguration
bean as long as kafka-streams
is on the classpath and Kafka Streams is enabled by the @EnableKafkaStreams
annotation.
Enabling Kafka Streams means that the application id and bootstrap servers must be set.
The former can be configured using spring.kafka.streams.application-id
, defaulting to spring.application.name
if not set.
The latter can be set globally or specifically overridden only for streams.
Several additional properties are available using dedicated properties; other arbitrary Kafka properties can be set using the spring.kafka.streams.properties
namespace.
See also features.adoc for more information.
To use the factory bean, wire StreamsBuilder
into your @Bean
as shown in the following example:
link:{docs-java}/messaging/kafka/streams/MyKafkaStreamsConfiguration.java[role=include]
By default, the streams managed by the StreamBuilder
object it creates are started automatically.
You can customize this behavior using the configprop:spring.kafka.streams.auto-startup[] property.
The properties supported by auto configuration are shown in the “Integration Properties” section of the Appendix. Note that, for the most part, these properties (hyphenated or camelCase) map directly to the Apache Kafka dotted properties. See the Apache Kafka documentation for details.
The first few of these properties apply to all components (producers, consumers, admins, and streams) but can be specified at the component level if you wish to use different values. Apache Kafka designates properties with an importance of HIGH, MEDIUM, or LOW. Spring Boot auto-configuration supports all HIGH importance properties, some selected MEDIUM and LOW properties, and any properties that do not have a default value.
Only a subset of the properties supported by Kafka are available directly through the KafkaProperties
class.
If you wish to configure the producer or consumer with additional properties that are not directly supported, use the following properties:
spring:
kafka:
properties:
"[prop.one]": "first"
admin:
properties:
"[prop.two]": "second"
consumer:
properties:
"[prop.three]": "third"
producer:
properties:
"[prop.four]": "fourth"
streams:
properties:
"[prop.five]": "fifth"
This sets the common prop.one
Kafka property to first
(applies to producers, consumers and admins), the prop.two
admin property to second
, the prop.three
consumer property to third
, the prop.four
producer property to fourth
and the prop.five
streams property to fifth
.
You can also configure the Spring Kafka JsonDeserializer
as follows:
spring:
kafka:
consumer:
value-deserializer: "org.springframework.kafka.support.serializer.JsonDeserializer"
properties:
"[spring.json.value.default.type]": "com.example.Invoice"
"[spring.json.trusted.packages]": "com.example.main,com.example.another"
Similarly, you can disable the JsonSerializer
default behavior of sending type information in headers:
spring:
kafka:
producer:
value-serializer: "org.springframework.kafka.support.serializer.JsonSerializer"
properties:
"[spring.json.add.type.headers]": false
Important
|
Properties set in this way override any configuration item that Spring Boot explicitly supports. |
Spring for Apache Kafka provides a convenient way to test projects with an embedded Apache Kafka broker.
To use this feature, annotate a test class with @EmbeddedKafka
from the spring-kafka-test
module.
For more information, please see the Spring for Apache Kafka {spring-kafka-docs}#embedded-kafka-annotation[reference manual].
To make Spring Boot auto-configuration work with the aforementioned embedded Apache Kafka broker, you need to remap a system property for embedded broker addresses (populated by the EmbeddedKafkaBroker
) into the Spring Boot configuration property for Apache Kafka.
There are several ways to do that:
-
Provide a system property to map embedded broker addresses into configprop:spring.kafka.bootstrap-servers[] in the test class:
link:{docs-java}/messaging/kafka/embedded/property/MyTest.java[role=include]
-
Configure a property name on the
@EmbeddedKafka
annotation:
link:{docs-java}/messaging/kafka/embedded/annotation/MyTest.java[role=include]
-
Use a placeholder in configuration properties:
spring:
kafka:
bootstrap-servers: "${spring.embedded.kafka.brokers}"