Skip to content

Commit 10b1edd

Browse files
committed
Fix test concurrent remote connection updates
This test has a race condition. The action listener used to listen for connections has a guard against being executed twice. However, this listener can be executed twice. After on success is invoked the test starts to tear down. At this point, the threads the test forked will terminate and the remote cluster connection will be closed. However, a thread forked to the management thread pool by the remote cluster connection can still be executing and try to continue connecting. This thread will be cancelled when the remote cluster connection is closed and this leads to the action listener being invoked again. To address this, we explicitly check that the reason that on failure was invoked was cancellation, and we assert that the listener was already previously invoked. Interestingly, this issue has always been present yet a recent change (#28667) exposed errors that occur on tasks submitted to the thread pool and were silently being lost. Relates #28695
1 parent d857033 commit 10b1edd

File tree

1 file changed

+22
-12
lines changed

1 file changed

+22
-12
lines changed

server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java

+22-12
Original file line numberDiff line numberDiff line change
@@ -557,7 +557,6 @@ public void onNodeDisconnected(DiscoveryNode node) {
557557
}
558558
}
559559

560-
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/28695")
561560
public void testTriggerUpdatesConcurrently() throws IOException, InterruptedException {
562561
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
563562
try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT);
@@ -591,17 +590,28 @@ public void run() {
591590
CountDownLatch latch = new CountDownLatch(numConnectionAttempts);
592591
for (int i = 0; i < numConnectionAttempts; i++) {
593592
AtomicBoolean executed = new AtomicBoolean(false);
594-
ActionListener<Void> listener = ActionListener.wrap(x -> {
595-
assertTrue(executed.compareAndSet(false, true));
596-
latch.countDown();}, x -> {
597-
assertTrue(executed.compareAndSet(false, true));
598-
latch.countDown();
599-
if (x instanceof RejectedExecutionException) {
600-
// that's fine
601-
} else {
602-
throw new AssertionError(x);
603-
}
604-
});
593+
ActionListener<Void> listener = ActionListener.wrap(
594+
x -> {
595+
assertTrue(executed.compareAndSet(false, true));
596+
latch.countDown();},
597+
x -> {
598+
/*
599+
* This can occur on a thread submitted to the thread pool while we are closing the
600+
* remote cluster connection at the end of the test.
601+
*/
602+
if (x instanceof CancellableThreads.ExecutionCancelledException) {
603+
// we should already be shutting down
604+
assertTrue(executed.get());
605+
return;
606+
}
607+
608+
assertTrue(executed.compareAndSet(false, true));
609+
latch.countDown();
610+
611+
if (!(x instanceof RejectedExecutionException)) {
612+
throw new AssertionError(x);
613+
}
614+
});
605615
connection.updateSeedNodes(seedNodes, listener);
606616
}
607617
latch.await();

0 commit comments

Comments
 (0)