Skip to content

Commit d16074d

Browse files
snicollwilkinsona
andcommitted
Bind and unbind Kafka metrics as consumers and producers come and go
Fixes gh-21008 Co-authored-by: Andy Wilkinson <awilkinson@pivotal.io>
1 parent 566864e commit d16074d

File tree

5 files changed

+125
-39
lines changed

5 files changed

+125
-39
lines changed

spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/metrics/KafkaMetricsAutoConfiguration.java

+24-9
Original file line numberDiff line numberDiff line change
@@ -16,19 +16,22 @@
1616

1717
package org.springframework.boot.actuate.autoconfigure.metrics;
1818

19-
import java.util.Collections;
20-
2119
import io.micrometer.core.instrument.MeterRegistry;
2220
import io.micrometer.core.instrument.binder.kafka.KafkaClientMetrics;
2321

2422
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
23+
import org.springframework.boot.autoconfigure.AutoConfigureBefore;
2524
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
2625
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
27-
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
28-
import org.springframework.boot.autoconfigure.condition.ConditionalOnSingleCandidate;
26+
import org.springframework.boot.autoconfigure.kafka.DefaultKafkaConsumerFactoryCustomizer;
27+
import org.springframework.boot.autoconfigure.kafka.DefaultKafkaProducerFactoryCustomizer;
2928
import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
3029
import org.springframework.context.annotation.Bean;
3130
import org.springframework.context.annotation.Configuration;
31+
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
32+
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
33+
import org.springframework.kafka.core.MicrometerConsumerListener;
34+
import org.springframework.kafka.core.MicrometerProducerListener;
3235
import org.springframework.kafka.core.ProducerFactory;
3336

3437
/**
@@ -39,16 +42,28 @@
3942
* @since 2.1.0
4043
*/
4144
@Configuration(proxyBeanMethods = false)
42-
@AutoConfigureAfter({ MetricsAutoConfiguration.class, KafkaAutoConfiguration.class })
45+
@AutoConfigureBefore(KafkaAutoConfiguration.class)
46+
@AutoConfigureAfter(MetricsAutoConfiguration.class)
4347
@ConditionalOnClass({ KafkaClientMetrics.class, ProducerFactory.class })
4448
@ConditionalOnBean(MeterRegistry.class)
4549
public class KafkaMetricsAutoConfiguration {
4650

4751
@Bean
48-
@ConditionalOnMissingBean
49-
@ConditionalOnSingleCandidate(ProducerFactory.class)
50-
public KafkaClientMetrics kafkaClientMetrics(ProducerFactory<?, ?> producerFactory) {
51-
return new KafkaClientMetrics(producerFactory.createProducer(), Collections.emptyList());
52+
public DefaultKafkaProducerFactoryCustomizer kafkaProducerMetrics(MeterRegistry meterRegistry) {
53+
return (producerFactory) -> addListener(producerFactory, meterRegistry);
54+
}
55+
56+
@Bean
57+
public DefaultKafkaConsumerFactoryCustomizer kafkaConsumerMetrics(MeterRegistry meterRegistry) {
58+
return (consumerFactory) -> addListener(consumerFactory, meterRegistry);
59+
}
60+
61+
private <K, V> void addListener(DefaultKafkaConsumerFactory<K, V> factory, MeterRegistry meterRegistry) {
62+
factory.addListener(new MicrometerConsumerListener<K, V>(meterRegistry));
63+
}
64+
65+
private <K, V> void addListener(DefaultKafkaProducerFactory<K, V> factory, MeterRegistry meterRegistry) {
66+
factory.addListener(new MicrometerProducerListener<K, V>(meterRegistry));
5267
}
5368

5469
}

spring-boot-project/spring-boot-actuator-autoconfigure/src/test/java/org/springframework/boot/actuate/autoconfigure/metrics/KafkaMetricsAutoConfigurationTests.java

+20-27
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,18 @@
1616

1717
package org.springframework.boot.actuate.autoconfigure.metrics;
1818

19-
import io.micrometer.core.instrument.binder.kafka.KafkaClientMetrics;
2019
import org.junit.jupiter.api.Test;
2120

2221
import org.springframework.boot.actuate.autoconfigure.metrics.test.MetricsRun;
2322
import org.springframework.boot.autoconfigure.AutoConfigurations;
2423
import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
2524
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
26-
import org.springframework.context.annotation.Bean;
27-
import org.springframework.context.annotation.Configuration;
25+
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
26+
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
27+
import org.springframework.kafka.core.MicrometerConsumerListener;
28+
import org.springframework.kafka.core.MicrometerProducerListener;
2829

2930
import static org.assertj.core.api.Assertions.assertThat;
30-
import static org.mockito.Mockito.mock;
3131

3232
/**
3333
* Tests for {@link KafkaMetricsAutoConfiguration}.
@@ -37,35 +37,28 @@
3737
*/
3838
class KafkaMetricsAutoConfigurationTests {
3939

40-
private final ApplicationContextRunner contextRunner = new ApplicationContextRunner().with(MetricsRun.simple())
40+
private final ApplicationContextRunner contextRunner = new ApplicationContextRunner()
4141
.withConfiguration(AutoConfigurations.of(KafkaMetricsAutoConfiguration.class));
4242

4343
@Test
44-
void whenThereIsNoProducerFactoryAutoConfigurationBacksOff() {
45-
this.contextRunner.run((context) -> assertThat(context).doesNotHaveBean(KafkaClientMetrics.class));
44+
void whenThereIsAMeterRegistryThenMetricsListenersAreAdded() {
45+
this.contextRunner.with(MetricsRun.simple())
46+
.withConfiguration(AutoConfigurations.of(KafkaAutoConfiguration.class)).run((context) -> {
47+
assertThat(((DefaultKafkaProducerFactory<?, ?>) context.getBean(DefaultKafkaProducerFactory.class))
48+
.getListeners()).hasSize(1).hasOnlyElementsOfTypes(MicrometerProducerListener.class);
49+
assertThat(((DefaultKafkaConsumerFactory<?, ?>) context.getBean(DefaultKafkaConsumerFactory.class))
50+
.getListeners()).hasSize(1).hasOnlyElementsOfTypes(MicrometerConsumerListener.class);
51+
});
4652
}
4753

4854
@Test
49-
void whenThereIsAProducerFactoryKafkaClientMetricsIsConfigured() {
50-
this.contextRunner.withConfiguration(AutoConfigurations.of(KafkaAutoConfiguration.class))
51-
.run((context) -> assertThat(context).hasSingleBean(KafkaClientMetrics.class));
52-
}
53-
54-
@Test
55-
void allowsCustomKafkaClientMetricsToBeUsed() {
56-
this.contextRunner.withConfiguration(AutoConfigurations.of(KafkaAutoConfiguration.class))
57-
.withUserConfiguration(CustomKafkaClientMetricsConfiguration.class).run((context) -> assertThat(context)
58-
.hasSingleBean(KafkaClientMetrics.class).hasBean("customKafkaClientMetrics"));
59-
}
60-
61-
@Configuration(proxyBeanMethods = false)
62-
static class CustomKafkaClientMetricsConfiguration {
63-
64-
@Bean
65-
KafkaClientMetrics customKafkaClientMetrics() {
66-
return mock(KafkaClientMetrics.class);
67-
}
68-
55+
void whenThereIsNoMeterRegistryThenListenerCustomizationBacksOff() {
56+
this.contextRunner.withConfiguration(AutoConfigurations.of(KafkaAutoConfiguration.class)).run((context) -> {
57+
assertThat(((DefaultKafkaProducerFactory<?, ?>) context.getBean(DefaultKafkaProducerFactory.class))
58+
.getListeners()).isEmpty();
59+
assertThat(((DefaultKafkaConsumerFactory<?, ?>) context.getBean(DefaultKafkaConsumerFactory.class))
60+
.getListeners()).isEmpty();
61+
});
6962
}
7063

7164
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Copyright 2012-2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.boot.autoconfigure.kafka;
18+
19+
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
20+
21+
/**
22+
* Callback interface for customizing {@code DefaultKafkaConsumerFactory} beans.
23+
*
24+
* @author Stephane Nicoll
25+
* @since 2.3.0
26+
*/
27+
@FunctionalInterface
28+
public interface DefaultKafkaConsumerFactoryCustomizer {
29+
30+
/**
31+
* Customize the {@link DefaultKafkaConsumerFactory}.
32+
* @param consumerFactory the consumer factory to customize
33+
*/
34+
void customize(DefaultKafkaConsumerFactory<?, ?> consumerFactory);
35+
36+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Copyright 2012-2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.boot.autoconfigure.kafka;
18+
19+
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
20+
21+
/**
22+
* Callback interface for customizing {@code DefaultKafkaProducerFactory} beans.
23+
*
24+
* @author Stephane Nicoll
25+
* @since 2.3.0
26+
*/
27+
@FunctionalInterface
28+
public interface DefaultKafkaProducerFactoryCustomizer {
29+
30+
/**
31+
* Customize the {@link DefaultKafkaProducerFactory}.
32+
* @param producerFactory the producer factory to customize
33+
*/
34+
void customize(DefaultKafkaProducerFactory<?, ?> producerFactory);
35+
36+
}

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java

+9-3
Original file line numberDiff line numberDiff line change
@@ -81,19 +81,25 @@ public ProducerListener<Object, Object> kafkaProducerListener() {
8181

8282
@Bean
8383
@ConditionalOnMissingBean(ConsumerFactory.class)
84-
public ConsumerFactory<?, ?> kafkaConsumerFactory() {
85-
return new DefaultKafkaConsumerFactory<>(this.properties.buildConsumerProperties());
84+
public ConsumerFactory<?, ?> kafkaConsumerFactory(
85+
ObjectProvider<DefaultKafkaConsumerFactoryCustomizer> customizers) {
86+
DefaultKafkaConsumerFactory<Object, Object> factory = new DefaultKafkaConsumerFactory<>(
87+
this.properties.buildConsumerProperties());
88+
customizers.orderedStream().forEach((customizer) -> customizer.customize(factory));
89+
return factory;
8690
}
8791

8892
@Bean
8993
@ConditionalOnMissingBean(ProducerFactory.class)
90-
public ProducerFactory<?, ?> kafkaProducerFactory() {
94+
public ProducerFactory<?, ?> kafkaProducerFactory(
95+
ObjectProvider<DefaultKafkaProducerFactoryCustomizer> customizers) {
9196
DefaultKafkaProducerFactory<?, ?> factory = new DefaultKafkaProducerFactory<>(
9297
this.properties.buildProducerProperties());
9398
String transactionIdPrefix = this.properties.getProducer().getTransactionIdPrefix();
9499
if (transactionIdPrefix != null) {
95100
factory.setTransactionIdPrefix(transactionIdPrefix);
96101
}
102+
customizers.orderedStream().forEach((customizer) -> customizer.customize(factory));
97103
return factory;
98104
}
99105

0 commit comments

Comments
 (0)