Skip to content

Commit e3e9c94

Browse files
fix: bulk ingester might skip listener requests (#867)
* fix: bulk ingester might skip lister requests * minor: fix style * always waiting for listener to be done before closing --------- Co-authored-by: Laura Trotta <laura.trotta@elastic.co> Co-authored-by: Laura Trotta <153528055+l-trotta@users.noreply.github.com>
1 parent 0a255f1 commit e3e9c94

File tree

2 files changed

+54
-18
lines changed

2 files changed

+54
-18
lines changed

java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java

Lines changed: 29 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ public class BulkIngester<Context> implements AutoCloseable {
7676
private final FnCondition addCondition = new FnCondition(lock, this::canAddOperation);
7777
private final FnCondition sendRequestCondition = new FnCondition(lock, this::canSendRequest);
7878
private final FnCondition closeCondition = new FnCondition(lock, this::closedAndFlushed);
79+
private AtomicInteger listenerInProgressCount = new AtomicInteger();
7980

8081
private static class RequestExecution<Context> {
8182
public final long id;
@@ -235,7 +236,7 @@ private boolean canAddOperation() {
235236
}
236237

237238
private boolean closedAndFlushed() {
238-
return isClosed && operations.isEmpty() && requestsInFlightCount == 0;
239+
return isClosed && operations.isEmpty() && requestsInFlightCount == 0 && listenerInProgressCount.get() == 0;
239240
}
240241

241242
//----- Ingester logic
@@ -311,25 +312,42 @@ public void flush() {
311312
if (exec != null) {
312313
// A request was actually sent
313314
exec.futureResponse.handle((resp, thr) -> {
314-
315-
sendRequestCondition.signalIfReadyAfter(() -> {
316-
requestsInFlightCount--;
317-
closeCondition.signalAllIfReady();
318-
});
319-
320315
if (resp != null) {
321316
// Success
322317
if (listener != null) {
323-
scheduler.submit(() -> listener.afterBulk(exec.id, exec.request,
324-
exec.contexts, resp));
318+
listenerInProgressCount.incrementAndGet();
319+
scheduler.submit(() -> {
320+
try {
321+
listener.afterBulk(exec.id, exec.request, exec.contexts, resp);
322+
}
323+
finally {
324+
if(listenerInProgressCount.decrementAndGet() == 0){
325+
closeCondition.signalIfReady();
326+
}
327+
}
328+
});
325329
}
326330
} else {
327331
// Failure
328332
if (listener != null) {
329-
scheduler.submit(() -> listener.afterBulk(exec.id, exec.request,
330-
exec.contexts, thr));
333+
listenerInProgressCount.incrementAndGet();
334+
scheduler.submit(() -> {
335+
try {
336+
listener.afterBulk(exec.id, exec.request, exec.contexts, thr);
337+
}
338+
finally {
339+
if(listenerInProgressCount.decrementAndGet() == 0){
340+
closeCondition.signalIfReady();
341+
}
342+
}
343+
});
331344
}
332345
}
346+
347+
sendRequestCondition.signalIfReadyAfter(() -> {
348+
requestsInFlightCount--;
349+
closeCondition.signalAllIfReady();
350+
});
333351
return null;
334352
});
335353
}

java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterTest.java

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -90,26 +90,44 @@ private void printStats(TestTransport transport) {
9090
@Test
9191
public void basicTestFlush() throws Exception {
9292
// Prime numbers, so that we have leftovers to flush before shutting down
93-
multiThreadTest(7, 3, 5, 101);
93+
multiThreadTest(7, 3, 5, 101, true);
94+
}
95+
96+
@Test
97+
public void basicTestFlushWithInternalScheduler() throws Exception {
98+
// Prime numbers, so that we have leftovers to flush before shutting down
99+
multiThreadTest(7, 3, 5, 101, false);
94100
}
95101

96102
@Test
97103
public void basicTestNoFlush() throws Exception {
98104
// Will have nothing to flush on close.
99-
multiThreadTest(10, 3, 5, 100);
105+
multiThreadTest(10, 3, 5, 100, true);
100106
}
101107

102-
private void multiThreadTest(int maxOperations, int maxRequests, int numThreads, int numOperations) throws Exception {
108+
@Test
109+
public void basicTestNoFlushWithInternalScheduler() throws Exception {
110+
// Will have nothing to flush on close.
111+
multiThreadTest(10, 3, 5, 100, false);
112+
}
113+
114+
private void multiThreadTest(int maxOperations, int maxRequests, int numThreads, int numOperations,
115+
boolean externalScheduler) throws Exception {
103116

104117
CountingListener listener = new CountingListener();
105118
TestTransport transport = new TestTransport();
106119
ElasticsearchAsyncClient client = new ElasticsearchAsyncClient(transport);
107-
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(r -> {
120+
ScheduledExecutorService scheduler;
121+
if (externalScheduler) {
122+
scheduler = Executors.newSingleThreadScheduledExecutor(r -> {
108123
Thread t = Executors.defaultThreadFactory().newThread(r);
109-
t.setName("my-bulk-ingester-executor#" );
124+
t.setName("my-bulk-ingester-executor#");
110125
t.setDaemon(true);
111126
return t;
112-
});
127+
});
128+
} else {
129+
scheduler = null;
130+
}
113131

114132
BulkIngester<Void> ingester = BulkIngester.of(b -> b
115133
.client(client)
@@ -139,7 +157,7 @@ private void multiThreadTest(int maxOperations, int maxRequests, int numThreads,
139157

140158
ingester.close();
141159
transport.close();
142-
scheduler.shutdownNow();
160+
if (scheduler != null) scheduler.shutdownNow();
143161

144162
printStats(ingester);
145163
printStats(listener);

0 commit comments

Comments
 (0)