Skip to content

Ensure that RefreshListener do not access engine under refresh lock #124328

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
bug
  • Loading branch information
tlrx committed Mar 11, 2025
commit a422586cb1cb5c536e9aca138a9b9d9e523fb72b
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
package org.elasticsearch.action.support.replication;

import org.apache.logging.log4j.Logger;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.support.ActionFilters;
Expand Down Expand Up @@ -456,6 +457,7 @@ static final class AsyncAfterWriteAction {
// read the values back from the engine as it could deadlock.
private final AtomicLong globalCheckpoint;
private final AtomicLong localCheckpoint;
private final AtomicReference<Exception> checkpointFailure = new AtomicReference<>(null);

AsyncAfterWriteAction(
final IndexShard indexShard,
Expand Down Expand Up @@ -503,7 +505,11 @@ private void maybeFinish() {
if (refreshFailure.get() != null) {
respond.onFailure(globalCheckpoint, localCheckpoint, refreshFailure.get());
} else {
respond.onSuccess(globalCheckpoint, localCheckpoint, refreshed.get());
if (checkpointFailure.get() != null) {
respond.onFailure(globalCheckpoint, localCheckpoint, checkpointFailure.get());
} else {
respond.onSuccess(globalCheckpoint, localCheckpoint, refreshed.get());
}
}
}
}
Expand All @@ -512,11 +518,14 @@ private void maybeFinish() {

private void updateCheckpoints() {
try {
this.globalCheckpoint.accumulateAndGet(indexShard.getLastSyncedGlobalCheckpoint(), Math::max);
this.localCheckpoint.accumulateAndGet(indexShard.getLocalCheckpoint(), Math::max);
if (checkpointFailure.get() != null) {
this.globalCheckpoint.accumulateAndGet(indexShard.getLastSyncedGlobalCheckpoint(), Math::max);
this.localCheckpoint.accumulateAndGet(indexShard.getLocalCheckpoint(), Math::max);
}
} catch (AlreadyClosedException e) {
// the index was deleted or this shard was never activated after a relocation; fall through and finish normally
} catch (Exception e) {
logger.warn("Failed to retrieve checkpoints", e);
assert false : e;
checkpointFailure.set(e);
}
}

Expand Down