Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make DLM stats and DLM error store project-aware #124810

Merged
merged 3 commits into from
Mar 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.elasticsearch.cluster.metadata.DataStreamLifecycle;
import org.elasticsearch.cluster.metadata.DataStreamTestHelper;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.Template;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
Expand Down Expand Up @@ -594,7 +595,7 @@ public void testErrorRecordingOnRollover() throws Exception {
Iterable<DataStreamLifecycleService> lifecycleServices = internalCluster().getInstances(DataStreamLifecycleService.class);

for (DataStreamLifecycleService lifecycleService : lifecycleServices) {
writeIndexRolloverError = lifecycleService.getErrorStore().getError(writeIndexName);
writeIndexRolloverError = lifecycleService.getErrorStore().getError(Metadata.DEFAULT_PROJECT_ID, writeIndexName);
if (writeIndexRolloverError != null) {
break;
}
Expand Down Expand Up @@ -671,7 +672,7 @@ public void testErrorRecordingOnRollover() throws Exception {
Iterable<DataStreamLifecycleService> lifecycleServices = internalCluster().getInstances(DataStreamLifecycleService.class);

for (DataStreamLifecycleService lifecycleService : lifecycleServices) {
assertThat(lifecycleService.getErrorStore().getError(previousWriteInddex), nullValue());
assertThat(lifecycleService.getErrorStore().getError(Metadata.DEFAULT_PROJECT_ID, previousWriteInddex), nullValue());
}
});

Expand Down Expand Up @@ -768,7 +769,8 @@ public void testErrorRecordingOnRetention() throws Exception {
Iterable<DataStreamLifecycleService> lifecycleServices = internalCluster().getInstances(DataStreamLifecycleService.class);

for (DataStreamLifecycleService lifecycleService : lifecycleServices) {
recordedRetentionExecutionError = lifecycleService.getErrorStore().getError(firstGenerationIndex);
recordedRetentionExecutionError = lifecycleService.getErrorStore()
.getError(Metadata.DEFAULT_PROJECT_ID, firstGenerationIndex);
if (recordedRetentionExecutionError != null && recordedRetentionExecutionError.retryCount() > 3) {
break;
}
Expand Down Expand Up @@ -832,7 +834,7 @@ public void testErrorRecordingOnRetention() throws Exception {
// error stores don't contain anything for the first generation index anymore
Iterable<DataStreamLifecycleService> lifecycleServices = internalCluster().getInstances(DataStreamLifecycleService.class);
for (DataStreamLifecycleService lifecycleService : lifecycleServices) {
assertThat(lifecycleService.getErrorStore().getError(firstGenerationIndex), nullValue());
assertThat(lifecycleService.getErrorStore().getError(Metadata.DEFAULT_PROJECT_ID, firstGenerationIndex), nullValue());
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.datastreams.lifecycle.ErrorEntry;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.common.Strings;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.health.node.DslErrorInfo;
Expand All @@ -34,7 +35,7 @@
public class DataStreamLifecycleErrorStore {

public static final int MAX_ERROR_MESSAGE_LENGTH = 1000;
private final ConcurrentMap<String, ErrorEntry> indexNameToError = new ConcurrentHashMap<>();
private final ConcurrentMap<ProjectId, ConcurrentMap<String, ErrorEntry>> projectMap = new ConcurrentHashMap<>();
private final LongSupplier nowSupplier;

public DataStreamLifecycleErrorStore(LongSupplier nowSupplier) {
Expand All @@ -48,12 +49,13 @@ public DataStreamLifecycleErrorStore(LongSupplier nowSupplier) {
* Returns the previously recorded error for the provided index, or null otherwise.
*/
@Nullable
public ErrorEntry recordError(String indexName, Exception e) {
public ErrorEntry recordError(ProjectId projectId, String indexName, Exception e) {
String exceptionToString = Strings.toString((builder, params) -> {
ElasticsearchException.generateThrowableXContent(builder, EMPTY_PARAMS, e);
return builder;
});
String newError = Strings.substring(exceptionToString, 0, MAX_ERROR_MESSAGE_LENGTH);
final var indexNameToError = projectMap.computeIfAbsent(projectId, k -> new ConcurrentHashMap<>());
ErrorEntry existingError = indexNameToError.get(indexName);
long recordedTimestamp = nowSupplier.getAsLong();
if (existingError == null) {
Expand All @@ -71,29 +73,41 @@ public ErrorEntry recordError(String indexName, Exception e) {
/**
* Clears the recorded error for the provided index (if any exists)
*/
public void clearRecordedError(String indexName) {
public void clearRecordedError(ProjectId projectId, String indexName) {
final var indexNameToError = projectMap.get(projectId);
if (indexNameToError == null) {
return;
}
indexNameToError.remove(indexName);
}

/**
* Clears all the errors recorded in the store.
*/
public void clearStore() {
indexNameToError.clear();
projectMap.clear();
}

/**
* Retrieves the recorded error for the provided index.
*/
@Nullable
public ErrorEntry getError(String indexName) {
public ErrorEntry getError(ProjectId projectId, String indexName) {
final var indexNameToError = projectMap.get(projectId);
if (indexNameToError == null) {
return null;
}
return indexNameToError.get(indexName);
}

/**
* Return an immutable view (a snapshot) of the tracked indices at the moment this method is called.
*/
public Set<String> getAllIndices() {
public Set<String> getAllIndices(ProjectId projectId) {
final var indexNameToError = projectMap.get(projectId);
if (indexNameToError == null) {
return Set.of();
}
return Set.copyOf(indexNameToError.keySet());
}

Expand All @@ -103,8 +117,9 @@ public Set<String> getAllIndices() {
* retries DSL attempted (descending order) and the number of entries will be limited according to the provided limit parameter.
* Returns empty list if no entries are present in the error store or none satisfy the predicate.
*/
public List<DslErrorInfo> getErrorsInfo(Predicate<ErrorEntry> errorEntryPredicate, int limit) {
if (indexNameToError.isEmpty()) {
public List<DslErrorInfo> getErrorsInfo(ProjectId projectId, Predicate<ErrorEntry> errorEntryPredicate, int limit) {
final var indexNameToError = projectMap.get(projectId);
if (indexNameToError == null || indexNameToError.isEmpty()) {
return List.of();
}
return indexNameToError.entrySet()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -784,21 +784,22 @@ static List<Index> getTargetIndices(
*/
private void clearErrorStoreForUnmanagedIndices(DataStream dataStream) {
Metadata metadata = clusterService.state().metadata();
for (String indexName : errorStore.getAllIndices()) {
final var projectId = metadata.getProject().id();
for (String indexName : errorStore.getAllIndices(projectId)) {
IndexAbstraction indexAbstraction = metadata.getProject().getIndicesLookup().get(indexName);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit:

Suggested change
IndexAbstraction indexAbstraction = metadata.getProject().getIndicesLookup().get(indexName);
IndexAbstraction indexAbstraction = metadata.getProject(projectId).getIndicesLookup().get(indexName);

to keep deprecation in one place.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I already have local changes ready to make DataStreamLifecycleService project aware - overriding these changes. So I'll leave this as is for now.

DataStream parentDataStream = indexAbstraction != null ? indexAbstraction.getParentDataStream() : null;
if (indexAbstraction == null || parentDataStream == null) {
logger.trace(
"Clearing recorded error for index [{}] because the index doesn't exist or is not a data stream backing index anymore",
indexName
);
errorStore.clearRecordedError(indexName);
errorStore.clearRecordedError(projectId, indexName);
} else if (parentDataStream.getName().equals(dataStream.getName())) {
// we're only verifying the indices that pertain to this data stream
IndexMetadata indexMeta = metadata.getProject().index(indexName);
if (dataStream.isIndexManagedByDataStreamLifecycle(indexMeta.getIndex(), metadata.getProject()::index) == false) {
logger.trace("Clearing recorded error for index [{}] because the index is not managed by DSL anymore", indexName);
errorStore.clearRecordedError(indexName);
errorStore.clearRecordedError(projectId, indexName);
}
}
}
Expand Down Expand Up @@ -866,7 +867,7 @@ private Index maybeExecuteRollover(ClusterState state, DataStream dataStream, bo
if (latestDataStream.getWriteIndex().getName().equals(currentRunWriteIndex.getName())) {
// data stream has not been rolled over in the meantime so record the error against the write index we
// attempted the rollover
errorStore.recordError(currentRunWriteIndex.getName(), e);
errorStore.recordError(clusterService.state().metadata().getProject().id(), currentRunWriteIndex.getName(), e);
}
}
}
Expand Down Expand Up @@ -1074,7 +1075,7 @@ public void onFailure(Exception e) {
if (e instanceof IndexNotFoundException) {
// index was already deleted, treat this as a success
logger.trace("Clearing recorded error for index [{}] because the index was deleted", targetIndex);
errorStore.clearRecordedError(targetIndex);
errorStore.clearRecordedError(clusterService.state().metadata().getProject().id(), targetIndex);
listener.onResponse(null);
return;
}
Expand Down Expand Up @@ -1157,7 +1158,7 @@ public void onFailure(Exception e) {
if (e instanceof IndexNotFoundException) {
// index was already deleted, treat this as a success
logger.trace("Clearing recorded error for index [{}] because the index was deleted", targetIndex);
errorStore.clearRecordedError(targetIndex);
errorStore.clearRecordedError(clusterService.state().metadata().getProject().id(), targetIndex);
listener.onResponse(null);
return;
}
Expand Down Expand Up @@ -1193,7 +1194,7 @@ public void onFailure(Exception e) {
if (e instanceof IndexNotFoundException) {
logger.trace("Data stream lifecycle did not delete index [{}] as it was already deleted", targetIndex);
// index was already deleted, treat this as a success
errorStore.clearRecordedError(targetIndex);
errorStore.clearRecordedError(clusterService.state().metadata().getProject().id(), targetIndex);
listener.onResponse(null);
return;
}
Expand Down Expand Up @@ -1341,7 +1342,9 @@ static class ErrorRecordingActionListener implements ActionListener<Void> {
@Override
public void onResponse(Void unused) {
logger.trace("Clearing recorded error for index [{}] because the [{}] action was successful", targetIndex, actionName);
errorStore.clearRecordedError(targetIndex);
@FixForMultiProject(description = "Don't use default project ID")
final var projectId = Metadata.DEFAULT_PROJECT_ID;
Comment on lines +1345 to +1346
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not for this PR. But I wonder whether we could remove Metadata.DEFAULT_PROJECT_ID and replace it with a deprecated method, e.g.:

@Deprecated(forRemoval=true)
public static ProjectId defaultProjectId() {
    return ProjectId.DEFAULT;
}

We can then use the above method in all places where default project id is meant to be a placeholder. And use ProjectId.DEFAULT for places we actually want the default ID. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's a great idea, we should have done that earlier :)

errorStore.clearRecordedError(projectId, targetIndex);
}

@Override
Expand All @@ -1364,8 +1367,10 @@ static void recordAndLogError(
String logMessage,
int signallingErrorRetryThreshold
) {
ErrorEntry previousError = errorStore.recordError(targetIndex, e);
ErrorEntry currentError = errorStore.getError(targetIndex);
@FixForMultiProject(description = "Don't use default project ID")
final var projectId = Metadata.DEFAULT_PROJECT_ID;
ErrorEntry previousError = errorStore.recordError(projectId, targetIndex, e);
ErrorEntry currentError = errorStore.getError(projectId, targetIndex);
if (previousError == null || (currentError != null && previousError.error().equals(currentError.error()) == false)) {
logger.error(logMessage, e);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ protected void masterOperation(
rolloverInfo == null ? null : rolloverInfo.getTime(),
generationDate,
parentDataStream.getLifecycle(),
errorStore.getError(index)
errorStore.getError(state.projectId(), index)
);
explainIndices.add(explainIndexDataStreamLifecycle);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeReadAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.action.support.master.TransportMasterNodeReadProjectAction;
import org.elasticsearch.cluster.ProjectState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService;
Expand All @@ -33,7 +34,7 @@
/**
* Exposes stats about the latest lifecycle run and the error store.
*/
public class TransportGetDataStreamLifecycleStatsAction extends TransportMasterNodeReadAction<
public class TransportGetDataStreamLifecycleStatsAction extends TransportMasterNodeReadProjectAction<
GetDataStreamLifecycleStatsAction.Request,
GetDataStreamLifecycleStatsAction.Response> {

Expand All @@ -45,7 +46,8 @@ public TransportGetDataStreamLifecycleStatsAction(
ClusterService clusterService,
ThreadPool threadPool,
ActionFilters actionFilters,
DataStreamLifecycleService lifecycleService
DataStreamLifecycleService lifecycleService,
ProjectResolver projectResolver
) {
super(
GetDataStreamLifecycleStatsAction.NAME,
Expand All @@ -54,6 +56,7 @@ public TransportGetDataStreamLifecycleStatsAction(
threadPool,
actionFilters,
GetDataStreamLifecycleStatsAction.Request::new,
projectResolver,
GetDataStreamLifecycleStatsAction.Response::new,
EsExecutors.DIRECT_EXECUTOR_SERVICE
);
Expand All @@ -64,23 +67,22 @@ public TransportGetDataStreamLifecycleStatsAction(
protected void masterOperation(
Task task,
GetDataStreamLifecycleStatsAction.Request request,
ClusterState state,
ProjectState state,
ActionListener<GetDataStreamLifecycleStatsAction.Response> listener
) throws Exception {
listener.onResponse(collectStats(state));
listener.onResponse(collectStats(state.metadata()));
}

// Visible for testing
GetDataStreamLifecycleStatsAction.Response collectStats(ClusterState state) {
Metadata metadata = state.metadata();
Set<String> indicesInErrorStore = lifecycleService.getErrorStore().getAllIndices();
GetDataStreamLifecycleStatsAction.Response collectStats(ProjectMetadata project) {
Set<String> indicesInErrorStore = lifecycleService.getErrorStore().getAllIndices(project.id());
List<GetDataStreamLifecycleStatsAction.Response.DataStreamStats> dataStreamStats = new ArrayList<>();
for (DataStream dataStream : state.metadata().getProject().dataStreams().values()) {
for (DataStream dataStream : project.dataStreams().values()) {
if (dataStream.getLifecycle() != null && dataStream.getLifecycle().enabled()) {
int total = 0;
int inError = 0;
for (Index index : dataStream.getIndices()) {
if (dataStream.isIndexManagedByDataStreamLifecycle(index, metadata.getProject()::index)) {
if (dataStream.isIndexManagedByDataStreamLifecycle(index, project::index)) {
total++;
if (indicesInErrorStore.contains(index.getName())) {
inError++;
Expand All @@ -102,7 +104,7 @@ GetDataStreamLifecycleStatsAction.Response collectStats(ClusterState state) {
}

@Override
protected ClusterBlockException checkBlock(GetDataStreamLifecycleStatsAction.Request request, ClusterState state) {
protected ClusterBlockException checkBlock(GetDataStreamLifecycleStatsAction.Request request, ProjectState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.FixForMultiProject;
import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleErrorStore;
import org.elasticsearch.health.node.DataStreamLifecycleHealthInfo;
import org.elasticsearch.health.node.DslErrorInfo;
Expand Down Expand Up @@ -83,9 +85,12 @@ private void updateNumberOfErrorsToPublish(int newValue) {
* {@link org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService#DATA_STREAM_SIGNALLING_ERROR_RETRY_INTERVAL_SETTING}
*/
public void publishDslErrorEntries(ActionListener<AcknowledgedResponse> actionListener) {
@FixForMultiProject(description = "Once the health API becomes project-aware, we shouldn't use the default project ID")
final var projectId = Metadata.DEFAULT_PROJECT_ID;
// fetching the entries that persist in the error store for more than the signalling retry interval
// note that we're reporting this view into the error store on every publishing iteration
List<DslErrorInfo> errorEntriesToSignal = errorStore.getErrorsInfo(
projectId,
entry -> entry.retryCount() >= signallingErrorRetryInterval,
maxNumberOfErrorsToPublish
);
Expand All @@ -97,7 +102,7 @@ public void publishDslErrorEntries(ActionListener<AcknowledgedResponse> actionLi
UpdateHealthInfoCacheAction.INSTANCE,
new UpdateHealthInfoCacheAction.Request(
healthNodeId,
new DataStreamLifecycleHealthInfo(errorEntriesToSignal, errorStore.getAllIndices().size())
new DataStreamLifecycleHealthInfo(errorEntriesToSignal, errorStore.getAllIndices(projectId).size())
),
actionListener
);
Expand Down
Loading
Loading