@@ -218,6 +218,163 @@ acks 的默认值即为 1,代表我们的消息被 leader 副本接收之后
218
218
- 处理完消息再提交:依旧有消息重复消费的风险,和自动提交一样
219
219
- 拉取到消息即提交:会有消息丢失的风险。允许消息延时的场景,一般会采用这种方式。然后,通过定时任务在业务不繁忙(比如凌晨)的时候做数据兜底。
220
220
221
+ # kafka的重试机制
222
+ 网上关于 Spring kafka 的默认重试机制文章很多,但大多都是过时的,和实际运行结果完全不一样。以下是根据 Spring-kafka-2.9.3 源码重新梳理一下。
223
+
224
+ ## 消费失败会怎么样
225
+
226
+ 在消费过程中,当其中一个消息消费异常时,会不会卡住后续队列消息的消费?这样业务岂不是卡住了?
227
+
228
+ 生产者代码:
229
+
230
+ ``` Java
231
+ for (int i = 0 ; i < 10 ; i++ ) {
232
+ kafkaTemplate. send(KafkaConst . TEST_TOPIC , String . valueOf(i))
233
+ }
234
+ ```
235
+
236
+ 消费者消代码:
237
+
238
+ ``` Java
239
+ @KafkaListener (topics = {KafkaConst . TEST_TOPIC },groupId = " apple" )
240
+ private void customer(String message) throws InterruptedException {
241
+ log. info(" kafka customer:{}" ,message);
242
+ Integer n = Integer . parseInt(message);
243
+ if (n% 5 == 0 ){
244
+ throw new RuntimeException ();
245
+ }
246
+ }
247
+ ```
248
+
249
+ 在默认配置下,当消费异常会进行重试,重试多次后会跳过当前消息,继续进行后续消息的消费,不会一直卡在当前消息。下面是一段消费的日志,可以看出当 test-0@95 重试多次后会被跳过。
250
+
251
+ ``` Java
252
+ 2023 - 08- 10 12 : 03 : 32.918 DEBUG 9700 -- - [ntainer#0 - 0 - C - 1 ] o.s.kafka.listener. DefaultErrorHandler : Skipping seek of: test- 0 @95
253
+ 2023 - 08- 10 12 : 03 : 32.918 TRACE 9700 -- - [ntainer#0 - 0 - C - 1 ] o.s.kafka.listener. DefaultErrorHandler : Seeking : test- 0 to: 96
254
+ 2023 - 08- 10 12 : 03 : 32.918 INFO 9700 -- - [ntainer#0 - 0 - C - 1 ] o.a.k.clients.consumer. KafkaConsumer : [Consumer clientId= consumer- apple- 1 , groupId= apple] Seeking to offset 96 for partition test- 0
255
+
256
+ ```
257
+
258
+ ## 默认会重试多少次?
259
+
260
+ 默认配置下,消费异常会进行重试,重试次数是多少, 重试是否有时间间隔?
261
+ 10 次。看源码 FailedRecordTracker 类有个 recovered 函数,返回 Boolean 值判断是否要进行重试,下面是这个函数中判断是否重试的逻辑:
262
+
263
+ ``` Java
264
+ FailedRecord failedRecord = getFailedRecordInstance(record, exception, map, topicPartition);
265
+ this . retryListeners. forEach(rl - >
266
+ rl. failedDelivery(record, exception, failedRecord. getDeliveryAttempts(). get()));
267
+ long nextBackOff = failedRecord. getBackOffExecution(). nextBackOff();
268
+ if (nextBackOff != BackOffExecution . STOP ) {
269
+ this . backOffHandler. onNextBackOff(container, exception, nextBackOff);
270
+ return false ;
271
+ }
272
+ ```
273
+
274
+ 其中 BackOffExecution.STOP 的值为 -1,nextBackOff 的值调用 BackOff 类的 nextBackOff() 函数。如果当前执行次数大于最大执行次数则返回 STOP,既超过这个最大执行次数后才会停止重试。
275
+
276
+ ``` Java
277
+ public long nextBackOff() {
278
+ this . currentAttempts++ ;
279
+ if (this . currentAttempts <= getMaxAttempts()) {
280
+ return getInterval();
281
+ }
282
+ else {
283
+ return STOP ;
284
+ }
285
+ }
286
+ ```
287
+
288
+ 那么这个 getMaxAttempts 的值又是多少呢?回到最开始,当执行出错会进入 DefaultErrorHandler 。DefaultErrorHandler 默认的构造函数是:
289
+
290
+ ``` Java
291
+ public DefaultErrorHandler() {
292
+ this (null , SeekUtils . DEFAULT_BACK_OFF );
293
+ }
294
+ ```
295
+
296
+ SeekUtils.DEFAULT_BACK_OFF 定义的是:
297
+
298
+ ``` Java
299
+ public static final int DEFAULT_MAX_FAILURES = 10 ;
300
+
301
+ public static final FixedBackOff DEFAULT_BACK_OFF = new FixedBackOff (0 , DEFAULT_MAX_FAILURES - 1 );
302
+ ```
303
+
304
+ DEFAULT_MAX_FAILURES 的值是10,currentAttempts从0到9,所以总共会执行10次,每次重试的时间间隔为0。
305
+
306
+ ## 如何自定义重试次数,以及时间间隔
307
+
308
+ 从上面的代码可以知道,默认错误处理器的重试次数以及时间间隔是由 FixedBackOff 控制的,FixedBackOff 是 DefaultErrorHandler 初始化时默认的。所以自定义重试次数以及时间间隔,只需要在 DefaultErrorHandler 初始化的时候传入自定义的 FixedBackOff 即可。重新实现一个 KafkaListenerContainerFactory ,调用 setCommonErrorHandler 设置新的自定义的错误处理器就可以实现。
309
+
310
+ ``` Java
311
+ @Bean
312
+ public KafkaListenerContainerFactory kafkaListenerContainerFactory(ConsumerFactory<String , String > consumerFactory) {
313
+ ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory ();
314
+ // 自定义重试时间间隔以及次数
315
+ FixedBackOff fixedBackOff = new FixedBackOff (1000 , 5 );
316
+ factory. setCommonErrorHandler(new DefaultErrorHandler (fixedBackOff));
317
+ factory. setConsumerFactory(consumerFactory);
318
+ return factory;
319
+ }
320
+ ```
321
+
322
+ ## 如何在重试失败后进行告警
323
+
324
+ 自定义重试失败后逻辑,需要手动实现,以下是一个简单的例子,重写 DefaultErrorHandler 的 handleRemaining 函数,加上自定义的告警等操作。
325
+
326
+ ``` Java
327
+ @Slf4j
328
+ public class DelErrorHandler extends DefaultErrorHandler {
329
+
330
+ public DelErrorHandler (FixedBackOff backOff ) {
331
+ super (null ,backOff);
332
+ }
333
+
334
+ @Override
335
+ public void handleRemaining (Exception thrownException , List<ConsumerRecord<?, ?> > records , Consumer<?, ?> consumer , MessageListenerContainer container ) {
336
+ super . handleRemaining(thrownException, records, consumer, container);
337
+ log. info(" 重试多次失败" );
338
+ // 自定义操作
339
+ }
340
+ }
341
+ ```
342
+ DefaultErrorHandler 只是默认的一个错误处理器,Spring kafka 还提供了 CommonErrorHandler 接口。手动实现 CommonErrorHandler 就可以实现更多的自定义操作,有很高的灵活性。例如根据不同的错误类型,实现不同的重试逻辑以及业务逻辑等。
343
+
344
+ ## 重试失败后的数据如何再次处理
345
+
346
+ 当达到最大重试次数后,数据会直接被跳过,继续向后进行。当代码修复后,如何重新消费这些重试失败的数据呢?
347
+
348
+ 死信队列(Dead Letter Queue,简称DLQ)是消息中间件中的一种特殊队列。它主要用于处理无法被消费者正确处理的消息,通常是因为消息格式错误、处理失败、消费超时等情况导致的消息被"丢弃"或"死亡"的情况。当消息进入队列后,消费者会尝试处理它。如果处理失败,或者超过一定的重试次数仍无法被成功处理,消息可以发送到死信队列中,而不是被永久性地丢弃。在死信队列中,可以进一步分析、处理这些无法正常消费的消息,以便定位问题、修复错误,并采取适当的措施。
349
+
350
+ ` @RetryableTopic ` 是 Spring Kafka 中的一个注解,它用于配置某个 Topic 支持消息重试,更推荐使用这个注解来完成重试。
351
+
352
+ ``` Java
353
+ @RetryableTopic (
354
+ attempts = " 5" ,
355
+ backoff = @Backoff (delay = 100 , maxDelay = 1000 )
356
+ )
357
+ @KafkaListener (topics = {KafkaConst . TEST_TOPIC }, groupId = " apple" )
358
+ private void customer(String message) {
359
+ log. info(" kafka customer:{}" , message);
360
+ Integer n = Integer . parseInt(message);
361
+ if (n % 5 == 0 ) {
362
+ throw new RuntimeException ();
363
+ }
364
+ System . out. println(n);
365
+ }
366
+ ```
367
+
368
+ 这个例子在listen方法上使用@RetryableTopic 注解,配置了:
369
+ - 重试5次
370
+ - 重试间隔100毫秒,最大间隔1秒
371
+
372
+ 重试完毕后,如果仍然失败,则会进入消息队列,此时就存在两个队列:
373
+ - test 原队列
374
+ - test-dlt 原队列对应的死信队列
375
+
376
+ 对于死信队列的处理,既可以用 ` @DltHandler ` 处理,也可以使用 ` @KafkaListener ` 重新消费。
377
+
221
378
### Reference
222
379
223
380
- Kafka 官方文档:https://kafka.apache.org/documentation/
0 commit comments