diff --git a/src/main/java/com/linkedin/xinfra/monitor/XinfraMonitor.java b/src/main/java/com/linkedin/xinfra/monitor/XinfraMonitor.java index d516b076..d59486b8 100644 --- a/src/main/java/com/linkedin/xinfra/monitor/XinfraMonitor.java +++ b/src/main/java/com/linkedin/xinfra/monitor/XinfraMonitor.java @@ -16,7 +16,6 @@ import com.linkedin.xinfra.monitor.services.ServiceFactory; import java.io.BufferedReader; import java.io.FileReader; -import java.lang.reflect.Constructor; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -60,11 +59,11 @@ public class XinfraMonitor { */ @SuppressWarnings({"rawtypes"}) - public XinfraMonitor(Map allClusterProps) throws Exception { + public XinfraMonitor(Map> allClusterProps) throws Exception { _apps = new ConcurrentHashMap<>(); _services = new ConcurrentHashMap<>(); - for (Map.Entry clusterProperty : allClusterProps.entrySet()) { + for (Map.Entry> clusterProperty : allClusterProps.entrySet()) { String clusterName = clusterProperty.getKey(); Map props = clusterProperty.getValue(); if (!props.containsKey(XinfraMonitorConstants.CLASS_NAME_CONFIG)) @@ -94,15 +93,6 @@ public XinfraMonitor(Map allClusterProps) throws Exception { (config, now) -> _offlineRunnables.size()); } - private boolean constructorContainsClass(Constructor[] constructors, Class classObject) { - for (int n = 0; n < constructors[0].getParameterTypes().length; ++n) { - if (constructors[0].getParameterTypes()[n].equals(classObject)) { - return true; - } - } - return false; - } - public synchronized void start() throws Exception { if (!_isRunning.compareAndSet(false, true)) { return; @@ -165,7 +155,6 @@ public void awaitShutdown() { service.awaitShutdown(Integer.MAX_VALUE, TimeUnit.MILLISECONDS); } - @SuppressWarnings("rawtypes") public static void main(String[] args) throws Exception { if (args.length <= 0) { LOG.info("USAGE: java [options] " + XinfraMonitor.class.getName() + " config/xinfra-monitor.properties"); @@ -182,7 +171,7 @@ public static void main(String[] args) throws Exception { } @SuppressWarnings("unchecked") - Map props = new ObjectMapper().readValue(buffer.toString(), Map.class); + Map> props = new ObjectMapper().readValue(buffer.toString(), Map.class); XinfraMonitor xinfraMonitor = new XinfraMonitor(props); xinfraMonitor.start(); LOG.info("Xinfra Monitor has started."); diff --git a/src/main/java/com/linkedin/xinfra/monitor/XinfraMonitorConstants.java b/src/main/java/com/linkedin/xinfra/monitor/XinfraMonitorConstants.java index f22c63c9..9db078b9 100644 --- a/src/main/java/com/linkedin/xinfra/monitor/XinfraMonitorConstants.java +++ b/src/main/java/com/linkedin/xinfra/monitor/XinfraMonitorConstants.java @@ -15,9 +15,7 @@ */ public class XinfraMonitorConstants { - public XinfraMonitorConstants() { - - } + private XinfraMonitorConstants() {} public static final String TAGS_NAME = "name"; diff --git a/src/main/java/com/linkedin/xinfra/monitor/apps/SingleClusterMonitor.java b/src/main/java/com/linkedin/xinfra/monitor/apps/SingleClusterMonitor.java index a44b0827..ccac7e9d 100644 --- a/src/main/java/com/linkedin/xinfra/monitor/apps/SingleClusterMonitor.java +++ b/src/main/java/com/linkedin/xinfra/monitor/apps/SingleClusterMonitor.java @@ -24,6 +24,8 @@ import com.linkedin.xinfra.monitor.services.configs.ProduceServiceConfig; import com.linkedin.xinfra.monitor.services.configs.TopicManagementServiceConfig; import com.linkedin.xinfra.monitor.services.metrics.ClusterTopicManipulationMetrics; + +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -39,6 +41,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static com.linkedin.xinfra.monitor.common.Utils.delay; import static com.linkedin.xinfra.monitor.common.Utils.prettyPrint; /* @@ -94,14 +97,9 @@ public void start() throws Exception { _topicManagementService.start(); CompletableFuture topicPartitionResult = _topicManagementService.topicPartitionResult(); - try { /* Delay 2 second to reduce the chance that produce and consumer thread has race condition with TopicManagementService and MultiClusterTopicManagementService */ - long threadSleepMs = TimeUnit.SECONDS.toMillis(2); - Thread.sleep(threadSleepMs); - } catch (InterruptedException e) { - throw new Exception("Interrupted while sleeping the thread", e); - } + delay(Duration.ofSeconds(2)); CompletableFuture topicPartitionFuture = topicPartitionResult.thenRun(() -> { for (Service service : _allServices) { if (!service.isRunning()) { diff --git a/src/main/java/com/linkedin/xinfra/monitor/common/Utils.java b/src/main/java/com/linkedin/xinfra/monitor/common/Utils.java index d920437d..ce2988ed 100644 --- a/src/main/java/com/linkedin/xinfra/monitor/common/Utils.java +++ b/src/main/java/com/linkedin/xinfra/monitor/common/Utils.java @@ -55,8 +55,6 @@ */ public class Utils { private static final Logger LOG = LoggerFactory.getLogger(Utils.class); - public static final int ZK_CONNECTION_TIMEOUT_MS = 30_000; - public static final int ZK_SESSION_TIMEOUT_MS = 30_000; private static final long LIST_PARTITION_REASSIGNMENTS_TIMEOUT_MS = 60000L; private static final int LIST_PARTITION_REASSIGNMENTS_MAX_ATTEMPTS = 3; private static final String LIST_PARTITION_REASSIGNMENTS_TIMEOUT_MS_CONFIG = "list.partition.reassignment.timeout.ms"; @@ -117,9 +115,7 @@ public static List replicaIdentifiers(Set brokers) { Collections.shuffle(brokerMetadataList); // Get broker ids for replica list - List replicaList = brokerMetadataList.stream().map(m -> m.id()).collect(Collectors.toList()); - - return replicaList; + return brokerMetadataList.stream().map(m -> m.id()).collect(Collectors.toList()); } /** diff --git a/src/main/java/com/linkedin/xinfra/monitor/consumer/NewConsumer.java b/src/main/java/com/linkedin/xinfra/monitor/consumer/NewConsumer.java index e958d43c..17059b44 100644 --- a/src/main/java/com/linkedin/xinfra/monitor/consumer/NewConsumer.java +++ b/src/main/java/com/linkedin/xinfra/monitor/consumer/NewConsumer.java @@ -85,7 +85,7 @@ public void commitAsync(OffsetCommitCallback callback) { @Override public OffsetAndMetadata committed(TopicPartition tp) { - return _consumer.committed(tp); + return _consumer.committed(Collections.singleton(tp)).get(tp); } @Override diff --git a/src/main/java/com/linkedin/xinfra/monitor/services/ClusterTopicManipulationService.java b/src/main/java/com/linkedin/xinfra/monitor/services/ClusterTopicManipulationService.java index 56c3ddc2..549cbed3 100644 --- a/src/main/java/com/linkedin/xinfra/monitor/services/ClusterTopicManipulationService.java +++ b/src/main/java/com/linkedin/xinfra/monitor/services/ClusterTopicManipulationService.java @@ -67,7 +67,6 @@ public class ClusterTopicManipulationService implements Service { private final ClusterTopicManipulationMetrics _clusterTopicManipulationMetrics; private final TopicFactory _topicFactory; - private final String _zkConnect; public ClusterTopicManipulationService(String name, AdminClient adminClient, Map props) throws ClassNotFoundException, NoSuchMethodException, IllegalAccessException, InvocationTargetException, @@ -96,7 +95,6 @@ public ClusterTopicManipulationService(String name, AdminClient adminClient, Map TopicManagementServiceConfig.TOPIC_FACTORY_PROPS_CONFIG) : new HashMap(); _clusterTopicManipulationMetrics = new ClusterTopicManipulationMetrics(metrics, tags); - _zkConnect = config.getString(TopicManagementServiceConfig.ZOOKEEPER_CONNECT_CONFIG); _topicFactory = (TopicFactory) Class.forName(topicFactoryClassName).getConstructor(Map.class).newInstance(topicFactoryConfig); } diff --git a/src/main/java/com/linkedin/xinfra/monitor/services/ConsumeService.java b/src/main/java/com/linkedin/xinfra/monitor/services/ConsumeService.java index 63caff1c..81448776 100644 --- a/src/main/java/com/linkedin/xinfra/monitor/services/ConsumeService.java +++ b/src/main/java/com/linkedin/xinfra/monitor/services/ConsumeService.java @@ -29,10 +29,8 @@ import org.apache.avro.generic.GenericRecord; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.TopicDescription; -import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetCommitCallback; import org.apache.kafka.common.MetricName; -import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.JmxReporter; import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; @@ -117,9 +115,9 @@ public ConsumeService(String name, topicPartitionFuture.get(); } - private void consume() throws Exception { + private void consume() { /* Delay 1 second to reduce the chance that consumer creates topic before TopicManagementService */ - Thread.sleep(1000); + Utils.delay(Duration.ofSeconds(1)); Map nextIndexes = new HashMap<>(); @@ -132,7 +130,7 @@ record = _baseConsumer.receive(); LOG.warn(_name + "/ConsumeService failed to receive record", e); /* Avoid busy while loop */ //noinspection BusyWait - Thread.sleep(CONSUME_THREAD_SLEEP_MS); + Utils.delay(Duration.ofMillis(CONSUME_THREAD_SLEEP_MS)); continue; } @@ -152,16 +150,13 @@ record = _baseConsumer.receive(); int partition = record.partition(); /* Commit availability and commit latency service */ /* Call commitAsync, wait for a NON-NULL return value (see https://issues.apache.org/jira/browse/KAFKA-6183) */ - OffsetCommitCallback commitCallback = new OffsetCommitCallback() { - @Override - public void onComplete(Map topicPartitionOffsetAndMetadataMap, Exception kafkaException) { - if (kafkaException != null) { - LOG.error("Exception while trying to perform an asynchronous commit.", kafkaException); - _commitAvailabilityMetrics._failedCommitOffsets.record(); - } else { - _commitAvailabilityMetrics._offsetsCommitted.record(); - _commitLatencyMetrics.recordCommitComplete(); - } + OffsetCommitCallback commitCallback = (topicPartitionOffsetAndMetadataMap, kafkaException) -> { + if (kafkaException != null) { + LOG.error("Exception while trying to perform an asynchronous commit.", kafkaException); + _commitAvailabilityMetrics._failedCommitOffsets.record(); + } else { + _commitAvailabilityMetrics._offsetsCommitted.record(); + _commitLatencyMetrics.recordCommitComplete(); } }; diff --git a/src/main/java/com/linkedin/xinfra/monitor/services/DefaultMetricsReporterService.java b/src/main/java/com/linkedin/xinfra/monitor/services/DefaultMetricsReporterService.java index 64a62bf1..88d305f1 100644 --- a/src/main/java/com/linkedin/xinfra/monitor/services/DefaultMetricsReporterService.java +++ b/src/main/java/com/linkedin/xinfra/monitor/services/DefaultMetricsReporterService.java @@ -84,6 +84,6 @@ private void reportMetrics() { builder.append("\n"); } } - LOG.info("{}\n{}", LOG_DIVIDER, builder.toString()); + LOG.info("{}\n{}", LOG_DIVIDER, builder); } } diff --git a/src/main/java/com/linkedin/xinfra/monitor/services/GraphiteMetricsReporterService.java b/src/main/java/com/linkedin/xinfra/monitor/services/GraphiteMetricsReporterService.java index 24512d7d..6f99ce3a 100644 --- a/src/main/java/com/linkedin/xinfra/monitor/services/GraphiteMetricsReporterService.java +++ b/src/main/java/com/linkedin/xinfra/monitor/services/GraphiteMetricsReporterService.java @@ -52,17 +52,14 @@ public GraphiteMetricsReporterService(Map props, String name) @Override public synchronized void start() { - _executor.scheduleAtFixedRate(new Runnable() { - @Override - public void run() { - try { - GraphiteMetricsReporterService.this.reportMetrics(); - } catch (Exception e) { - LOG.error(_name + "/GraphiteMetricsReporterService failed to report metrics", - e); - } + _executor.scheduleAtFixedRate(() -> { + try { + GraphiteMetricsReporterService.this.reportMetrics(); + } catch (Exception e) { + LOG.error(_name + "/GraphiteMetricsReporterService failed to report metrics", + e); } - }, _reportIntervalSec, _reportIntervalSec, TimeUnit.SECONDS + }, _reportIntervalSec, _reportIntervalSec, TimeUnit.SECONDS ); LOG.info("{}/GraphiteMetricsReporterService started", _name); } diff --git a/src/main/java/com/linkedin/xinfra/monitor/services/OffsetCommitService.java b/src/main/java/com/linkedin/xinfra/monitor/services/OffsetCommitService.java index 18c31803..22d1dc34 100644 --- a/src/main/java/com/linkedin/xinfra/monitor/services/OffsetCommitService.java +++ b/src/main/java/com/linkedin/xinfra/monitor/services/OffsetCommitService.java @@ -134,12 +134,7 @@ public class OffsetCommitService implements Service { _consumerNetworkClient = new ConsumerNetworkClient(logContext, kafkaClient, metadata, _time, retryBackoffMs, config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG), heartbeatIntervalMs); - ThreadFactory threadFactory = new ThreadFactory() { - @Override - public Thread newThread(Runnable runnable) { - return new Thread(runnable, serviceName + SERVICE_SUFFIX); - } - }; + ThreadFactory threadFactory = runnable -> new Thread(runnable, serviceName + SERVICE_SUFFIX); _scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(threadFactory); LOGGER.info("OffsetCommitService's ConsumerConfig - {}", Utils.prettyPrint(config.values())); diff --git a/src/main/java/com/linkedin/xinfra/monitor/services/OffsetCommitServiceFactory.java b/src/main/java/com/linkedin/xinfra/monitor/services/OffsetCommitServiceFactory.java index 87bd4f88..dc0352db 100644 --- a/src/main/java/com/linkedin/xinfra/monitor/services/OffsetCommitServiceFactory.java +++ b/src/main/java/com/linkedin/xinfra/monitor/services/OffsetCommitServiceFactory.java @@ -74,9 +74,7 @@ private Properties prepareConfigs(Map props) { Map customProps = (Map) props.get(CommonServiceConfig.CONSUMER_PROPS_CONFIG); if (customProps != null) { - for (Map.Entry entry : customProps.entrySet()) { - consumerProps.put(entry.getKey(), entry.getValue()); - } + consumerProps.putAll(customProps); } return consumerProps; diff --git a/src/main/java/com/linkedin/xinfra/monitor/services/SignalFxMetricsReporterService.java b/src/main/java/com/linkedin/xinfra/monitor/services/SignalFxMetricsReporterService.java index e84f1200..bf95a08a 100644 --- a/src/main/java/com/linkedin/xinfra/monitor/services/SignalFxMetricsReporterService.java +++ b/src/main/java/com/linkedin/xinfra/monitor/services/SignalFxMetricsReporterService.java @@ -52,8 +52,8 @@ public SignalFxMetricsReporterService(Map props, String name) th _executor = Executors.newSingleThreadScheduledExecutor(); _metricRegistry = new MetricRegistry(); - _metricMap = new HashMap(); - _dimensionsMap = new HashMap(); + _metricMap = new HashMap<>(); + _dimensionsMap = new HashMap<>(); if (props.containsKey(SignalFxMetricsReporterServiceConfig.SIGNALFX_METRIC_DIMENSION)) { _dimensionsMap = (Map) props.get(SignalFxMetricsReporterServiceConfig.SIGNALFX_METRIC_DIMENSION); } diff --git a/src/main/java/com/linkedin/xinfra/monitor/services/metrics/OffsetCommitServiceMetrics.java b/src/main/java/com/linkedin/xinfra/monitor/services/metrics/OffsetCommitServiceMetrics.java index b6a6e753..16127b48 100644 --- a/src/main/java/com/linkedin/xinfra/monitor/services/metrics/OffsetCommitServiceMetrics.java +++ b/src/main/java/com/linkedin/xinfra/monitor/services/metrics/OffsetCommitServiceMetrics.java @@ -13,7 +13,6 @@ import java.util.Map; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.metrics.Measurable; -import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.stats.Avg; @@ -59,27 +58,24 @@ public OffsetCommitServiceMetrics(final Metrics metrics, final Map 0 ? offsetCommitSuccessRate / ( - offsetCommitSuccessRate + offsetCommitFailureRate) : 0; + Measurable measurable = (config, now) -> { + double offsetCommitSuccessRate = (double) metrics.metrics() + .get(metrics.metricName(SUCCESS_RATE_METRIC, METRIC_GROUP_NAME, tags)) + .metricValue(); + double offsetCommitFailureRate = (double) metrics.metrics() + .get(metrics.metricName(FAILURE_RATE_METRIC, METRIC_GROUP_NAME, tags)) + .metricValue(); + + if (new Double(offsetCommitSuccessRate).isNaN()) { + offsetCommitSuccessRate = 0; + } + + if (new Double(offsetCommitFailureRate).isNaN()) { + offsetCommitFailureRate = 0; } + + return offsetCommitSuccessRate + offsetCommitFailureRate > 0 ? offsetCommitSuccessRate / ( + offsetCommitSuccessRate + offsetCommitFailureRate) : 0; }; metrics.addMetric(new MetricName("offset-commit-availability-avg", METRIC_GROUP_NAME, diff --git a/src/test/java/com/linkedin/xinfra/monitor/XinfraMonitorTest.java b/src/test/java/com/linkedin/xinfra/monitor/XinfraMonitorTest.java index 9867718d..ab7b2586 100644 --- a/src/test/java/com/linkedin/xinfra/monitor/XinfraMonitorTest.java +++ b/src/test/java/com/linkedin/xinfra/monitor/XinfraMonitorTest.java @@ -10,14 +10,19 @@ package com.linkedin.xinfra.monitor; +import com.linkedin.xinfra.monitor.common.Utils; import com.linkedin.xinfra.monitor.services.ServiceFactory; import com.linkedin.xinfra.monitor.services.Service; + +import java.time.Duration; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; + +import org.testng.Assert; import org.testng.annotations.Test; @@ -71,16 +76,16 @@ public void run() { t.start(); xinfraMonitor.start(); - Thread.sleep(100); + Utils.delay(Duration.ofMillis(100)); xinfraMonitor.stop(); t.join(500); - org.testng.Assert.assertFalse(t.isAlive()); - org.testng.Assert.assertEquals(error.get(), null); + Assert.assertFalse(t.isAlive()); + Assert.assertNull(error.get()); } private XinfraMonitor xinfraMonitor() throws Exception { FakeService.clearCounters(); - Map config = new HashMap<>(); + Map> config = new HashMap<>(); Map fakeServiceConfig = new HashMap<>(); fakeServiceConfig.put(XinfraMonitorConstants.CLASS_NAME_CONFIG, FakeService.class.getName()); @@ -106,7 +111,7 @@ public FakeServiceFactory(Map config, String serviceInstanceName) { @SuppressWarnings("unchecked") @Override - public Service createService() throws Exception { + public Service createService() { return new XinfraMonitorTest.FakeService(_config, _serviceInstanceName); @@ -120,7 +125,7 @@ static final class FakeService implements Service { private final AtomicBoolean _isRunning = new AtomicBoolean(); /** required */ - public FakeService(Map config, String serviceInstanceName) { + public FakeService(Map> config, String serviceInstanceName) { } diff --git a/src/test/java/com/linkedin/xinfra/monitor/services/ConsumeServiceTest.java b/src/test/java/com/linkedin/xinfra/monitor/services/ConsumeServiceTest.java index 8d11fd04..370028f5 100644 --- a/src/test/java/com/linkedin/xinfra/monitor/services/ConsumeServiceTest.java +++ b/src/test/java/com/linkedin/xinfra/monitor/services/ConsumeServiceTest.java @@ -14,6 +14,8 @@ import com.linkedin.xinfra.monitor.consumer.BaseConsumerRecord; import com.linkedin.xinfra.monitor.consumer.KMBaseConsumer; import com.linkedin.xinfra.monitor.services.metrics.CommitLatencyMetrics; + +import java.time.Duration; import java.util.HashMap; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -25,7 +27,6 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.Metrics; import org.mockito.Mockito; -import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -83,12 +84,9 @@ public void commitAvailabilityTest() throws Exception { consumeService.startConsumeThreadForTesting(); Assert.assertTrue(consumeService.isRunning()); - /* in milliseconds */ - long threadStartDelay = TimeUnit.SECONDS.toMillis(THREAD_START_DELAY_SECONDS); - - /* Thread.sleep safe to do here instead of ScheduledExecutorService + /* delay safe to do here instead of ScheduledExecutorService * We want to sleep current thread so that consumeService can start running for enough seconds. */ - Thread.sleep(threadStartDelay); + Utils.delay(Duration.ofSeconds(THREAD_START_DELAY_SECONDS)); Assert.assertNotNull(metrics.metrics().get(metrics.metricName("offsets-committed-total", METRIC_GROUP_NAME, tags)).metricValue()); Assert.assertNotNull(metrics.metrics().get(metrics.metricName("failed-commit-offsets-total", METRIC_GROUP_NAME, tags)).metricValue()); @@ -112,12 +110,9 @@ public void commitLatencyTest() throws Exception { consumeService.startConsumeThreadForTesting(); Assert.assertTrue(consumeService.isRunning()); - /* in milliseconds */ - long threadStartDelay = TimeUnit.SECONDS.toMillis(THREAD_START_DELAY_SECONDS); - - /* Thread.sleep safe to do here instead of ScheduledExecutorService + /* delay safe to do here instead of ScheduledExecutorService * We want to sleep current thread so that consumeService can start running for enough seconds. */ - Thread.sleep(threadStartDelay); + Utils.delay(Duration.ofSeconds(THREAD_START_DELAY_SECONDS)); shutdownConsumeService(consumeService); } @@ -148,16 +143,13 @@ private ConsumeService consumeService() throws Exception { /* define return value */ Mockito.when(kmBaseConsumer.lastCommitted()).thenReturn(MOCK_LAST_COMMITTED_OFFSET); Mockito.when(kmBaseConsumer.committed(Mockito.any())).thenReturn(new OffsetAndMetadata(FIRST_OFFSET)); - Mockito.doAnswer(new Answer() { - @Override - public Void answer(InvocationOnMock invocationOnMock) { - OffsetCommitCallback callback = invocationOnMock.getArgument(0); - Map committedOffsets = new HashMap<>(); - committedOffsets.put(new TopicPartition(TOPIC, PARTITION), new OffsetAndMetadata(FIRST_OFFSET)); - callback.onComplete(committedOffsets, null); + Mockito.doAnswer((Answer) invocationOnMock -> { + OffsetCommitCallback callback = invocationOnMock.getArgument(0); + Map committedOffsets = new HashMap<>(); + committedOffsets.put(new TopicPartition(TOPIC, PARTITION), new OffsetAndMetadata(FIRST_OFFSET)); + callback.onComplete(committedOffsets, null); - return null; - } + return null; }).when(kmBaseConsumer).commitAsync(Mockito.any(OffsetCommitCallback.class)); @@ -190,13 +182,13 @@ public void run() { thread.start(); consumeService.startConsumeThreadForTesting(); - Thread.sleep(100); + Utils.delay(Duration.ofMillis(100)); consumeService.stop(); thread.join(500); Assert.assertFalse(thread.isAlive()); - Assert.assertEquals(error.get(), null); + Assert.assertNull(error.get()); } @@ -207,8 +199,7 @@ public void run() { */ private Metrics consumeServiceMetrics(ConsumeService consumeService) { setup(); - Metrics metrics = consumeService.metrics(); - return metrics; + return consumeService.metrics(); } /**