Skip to content

Commit daaa30e

Browse files
committed
Make replyContainer as bean in Kafka tests
It looks like `replyContainer()` is not registered as a bean, so its lifecycle is somehow out of application context control * Mark `replyContainer()` method as a `@Bean` in the `KafkaDslTests` and `KafkaDslKotlinTests` to see if this fixes flaky state of the test suite * Upgrade to Kotlin `1.7.20` and fix `KafkaDslKotlinTests` according to its requirements
1 parent f24fbd9 commit daaa30e

File tree

3 files changed

+7
-5
lines changed

3 files changed

+7
-5
lines changed

build.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
buildscript {
2-
ext.kotlinVersion = '1.7.10'
2+
ext.kotlinVersion = '1.7.20'
33
ext.isCI = System.getenv('GITHUB_ACTION') || System.getenv('bamboo_buildKey')
44
repositories {
55
mavenCentral()

spring-integration-kafka/src/test/java/org/springframework/integration/kafka/dsl/KafkaDslTests.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -445,7 +445,8 @@ public BroadcastCapableChannel pubSub(KafkaTemplate<Integer, String> template,
445445
.get();
446446
}
447447

448-
private GenericMessageListenerContainer<Integer, String> replyContainer() {
448+
@Bean
449+
public GenericMessageListenerContainer<Integer, String> replyContainer() {
449450
ContainerProperties containerProperties = new ContainerProperties(TEST_TOPIC5);
450451
containerProperties.setGroupId("outGate");
451452
containerProperties.setConsumerRebalanceListener(new ConsumerRebalanceListener() {

spring-integration-kafka/src/test/kotlin/org/springframework/integration/kafka/dsl/kotlin/KafkaDslKotlinTests.kt

+4-3
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ class KafkaDslKotlinTests {
190190

191191
@Test
192192
fun testGateways() {
193-
assertThat(this.config.replyContainerLatch.await(30, TimeUnit.SECONDS))
193+
assertThat(this.config.replyContainerLatch.await(30, TimeUnit.SECONDS)).isTrue()
194194
assertThat(this.gate.exchange(TEST_TOPIC4, "foo")).isEqualTo("FOO")
195195
}
196196

@@ -292,7 +292,7 @@ class KafkaDslKotlinTests {
292292

293293
private fun kafkaMessageHandler(producerFactory: ProducerFactory<Int, String>, topic: String) =
294294
Kafka.outboundChannelAdapter(producerFactory)
295-
.messageKey { it.headers[IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER] }
295+
.messageKey<Any> { it.headers[IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER] }
296296
.headerMapper(mapper())
297297
.sync(true)
298298
.partitionId<Any> { 0 }
@@ -326,7 +326,8 @@ class KafkaDslKotlinTests {
326326
)
327327
}
328328

329-
private fun replyContainer(): GenericMessageListenerContainer<Int, String> {
329+
@Bean
330+
fun replyContainer(): GenericMessageListenerContainer<Int, String> {
330331
val containerProperties = ContainerProperties(TEST_TOPIC5)
331332
containerProperties.setGroupId("kotlinOutGate")
332333
containerProperties.setConsumerRebalanceListener(object : ConsumerRebalanceListener {

0 commit comments

Comments
 (0)