Skip to content

Commit 3fea0b6

Browse files
authored
Include failures in partial response (#124929)
This change includes failures when ESQL returns partial results. It also carries failures between cluster requests. Relates #122802
1 parent a824bb2 commit 3fea0b6

File tree

11 files changed

+202
-13
lines changed

11 files changed

+202
-13
lines changed

docs/changelog/124929.yaml

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 124929
2+
summary: Include failures in partial response
3+
area: ES|QL
4+
type: enhancement
5+
issues: []

server/src/main/java/org/elasticsearch/TransportVersions.java

+1
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,7 @@ static TransportVersion def(int id) {
195195
public static final TransportVersion INFERENCE_CONTEXT_8_X = def(8_841_0_08);
196196
public static final TransportVersion ML_INFERENCE_DEEPSEEK_8_19 = def(8_841_0_09);
197197
public static final TransportVersion ESQL_SERIALIZE_BLOCK_TYPE_CODE = def(8_841_0_10);
198+
public static final TransportVersion ESQL_FAILURE_FROM_REMOTE = def(8_841_0_11);
198199

199200
/*
200201
* STOP! READ THIS FIRST! No, really,

test/external-modules/error-query/src/javaRestTest/java/org/elasticsearch/test/esql/EsqlPartialResultsIT.java

+100
Original file line numberDiff line numberDiff line change
@@ -14,18 +14,22 @@
1414
import org.elasticsearch.client.Response;
1515
import org.elasticsearch.client.ResponseException;
1616
import org.elasticsearch.common.settings.Settings;
17+
import org.elasticsearch.common.xcontent.support.XContentMapValues;
1718
import org.elasticsearch.test.cluster.ElasticsearchCluster;
1819
import org.elasticsearch.test.cluster.local.distribution.DistributionType;
1920
import org.elasticsearch.test.rest.ESRestTestCase;
2021
import org.junit.ClassRule;
2122

23+
import java.io.IOException;
2224
import java.util.HashSet;
2325
import java.util.List;
26+
import java.util.Locale;
2427
import java.util.Map;
2528
import java.util.Set;
2629

2730
import static org.hamcrest.Matchers.containsString;
2831
import static org.hamcrest.Matchers.equalTo;
32+
import static org.hamcrest.Matchers.hasSize;
2933
import static org.hamcrest.Matchers.lessThanOrEqualTo;
3034

3135
public class EsqlPartialResultsIT extends ESRestTestCase {
@@ -97,6 +101,7 @@ public Set<String> populateIndices() throws Exception {
97101
return okIds;
98102
}
99103

104+
@SuppressWarnings("unchecked")
100105
public void testPartialResult() throws Exception {
101106
Set<String> okIds = populateIndices();
102107
String query = """
@@ -113,11 +118,30 @@ public void testPartialResult() throws Exception {
113118
}
114119
Response resp = client().performRequest(request);
115120
Map<String, Object> results = entityAsMap(resp);
121+
logger.info("--> results {}", results);
116122
assertThat(results.get("is_partial"), equalTo(true));
117123
List<?> columns = (List<?>) results.get("columns");
118124
assertThat(columns, equalTo(List.of(Map.of("name", "fail_me", "type", "long"), Map.of("name", "v", "type", "long"))));
119125
List<?> values = (List<?>) results.get("values");
120126
assertThat(values.size(), lessThanOrEqualTo(okIds.size()));
127+
Map<String, Object> localInfo = (Map<String, Object>) XContentMapValues.extractValue(
128+
results,
129+
"_clusters",
130+
"details",
131+
"(local)"
132+
);
133+
assertNotNull(localInfo);
134+
assertThat(XContentMapValues.extractValue(localInfo, "_shards", "successful"), equalTo(0));
135+
assertThat(
136+
XContentMapValues.extractValue(localInfo, "_shards", "failed"),
137+
equalTo(XContentMapValues.extractValue(localInfo, "_shards", "total"))
138+
);
139+
List<Map<String, Object>> failures = (List<Map<String, Object>>) XContentMapValues.extractValue(localInfo, "failures");
140+
assertThat(failures, hasSize(1));
141+
assertThat(
142+
failures.get(0).get("reason"),
143+
equalTo(Map.of("type", "illegal_state_exception", "reason", "Accessing failing field"))
144+
);
121145
}
122146
// allow_partial_results = false
123147
{
@@ -133,5 +157,81 @@ public void testPartialResult() throws Exception {
133157
assertThat(resp.getStatusLine().getStatusCode(), equalTo(500));
134158
assertThat(EntityUtils.toString(resp.getEntity()), containsString("Accessing failing field"));
135159
}
160+
161+
}
162+
163+
@SuppressWarnings("unchecked")
164+
public void testFailureFromRemote() throws Exception {
165+
setupRemoteClusters();
166+
try {
167+
Set<String> okIds = populateIndices();
168+
String query = """
169+
{
170+
"query": "FROM *:ok-index,*:failing-index | LIMIT 100 | KEEP fail_me,v"
171+
}
172+
""";
173+
// allow_partial_results = true
174+
Request request = new Request("POST", "/_query");
175+
request.setJsonEntity(query);
176+
if (randomBoolean()) {
177+
request.addParameter("allow_partial_results", "true");
178+
}
179+
Response resp = client().performRequest(request);
180+
Map<String, Object> results = entityAsMap(resp);
181+
logger.info("--> results {}", results);
182+
assertThat(results.get("is_partial"), equalTo(true));
183+
List<?> columns = (List<?>) results.get("columns");
184+
assertThat(columns, equalTo(List.of(Map.of("name", "fail_me", "type", "long"), Map.of("name", "v", "type", "long"))));
185+
List<?> values = (List<?>) results.get("values");
186+
assertThat(values.size(), lessThanOrEqualTo(okIds.size()));
187+
Map<String, Object> remoteCluster = (Map<String, Object>) XContentMapValues.extractValue(
188+
results,
189+
"_clusters",
190+
"details",
191+
"cluster_one"
192+
);
193+
assertNotNull(remoteCluster);
194+
assertThat(XContentMapValues.extractValue(remoteCluster, "_shards", "successful"), equalTo(0));
195+
assertThat(
196+
XContentMapValues.extractValue(remoteCluster, "_shards", "failed"),
197+
equalTo(XContentMapValues.extractValue(remoteCluster, "_shards", "total"))
198+
);
199+
List<Map<String, Object>> failures = (List<Map<String, Object>>) XContentMapValues.extractValue(remoteCluster, "failures");
200+
assertThat(failures, hasSize(1));
201+
assertThat(
202+
failures.get(0).get("reason"),
203+
equalTo(Map.of("type", "illegal_state_exception", "reason", "Accessing failing field"))
204+
);
205+
} finally {
206+
removeRemoteCluster();
207+
}
208+
}
209+
210+
private void setupRemoteClusters() throws IOException {
211+
String settings = String.format(Locale.ROOT, """
212+
{
213+
"persistent": {
214+
"cluster": {
215+
"remote": {
216+
"cluster_one": {
217+
"seeds": [ "%s" ],
218+
"skip_unavailable": false
219+
}
220+
}
221+
}
222+
}
223+
}
224+
""", cluster.getTransportEndpoints());
225+
Request request = new Request("PUT", "/_cluster/settings");
226+
request.setJsonEntity(settings);
227+
client().performRequest(request);
228+
}
229+
230+
private void removeRemoteCluster() throws IOException {
231+
Request settingsRequest = new Request("PUT", "/_cluster/settings");
232+
settingsRequest.setJsonEntity("""
233+
{"persistent": { "cluster.*": null}}
234+
""");
235+
client().performRequest(settingsRequest);
136236
}
137237
}

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithPartialResultsIT.java

+19-5
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
import org.elasticsearch.ExceptionsHelper;
1111
import org.elasticsearch.ResourceNotFoundException;
12+
import org.elasticsearch.action.search.ShardSearchFailure;
1213
import org.elasticsearch.client.internal.Client;
1314
import org.elasticsearch.common.breaker.CircuitBreaker;
1415
import org.elasticsearch.common.breaker.CircuitBreakingException;
@@ -37,11 +38,13 @@
3738
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
3839
import static org.elasticsearch.xpack.esql.EsqlTestUtils.getValuesList;
3940
import static org.hamcrest.Matchers.containsString;
41+
import static org.hamcrest.Matchers.empty;
4042
import static org.hamcrest.Matchers.equalTo;
4143
import static org.hamcrest.Matchers.greaterThan;
4244
import static org.hamcrest.Matchers.in;
4345
import static org.hamcrest.Matchers.is;
4446
import static org.hamcrest.Matchers.lessThanOrEqualTo;
47+
import static org.hamcrest.Matchers.not;
4548

4649
public class CrossClusterQueryWithPartialResultsIT extends AbstractCrossClusterTestCase {
4750

@@ -70,6 +73,14 @@ private void assertClusterPartial(EsqlQueryResponse resp, String clusterAlias, C
7073
assertClusterPartial(resp, clusterAlias, cluster.okShards + cluster.failingShards, cluster.okShards);
7174
}
7275

76+
private void assertClusterFailure(EsqlQueryResponse resp, String clusterAlias, String reason) {
77+
EsqlExecutionInfo.Cluster info = resp.getExecutionInfo().getCluster(clusterAlias);
78+
assertThat(info.getFailures(), not(empty()));
79+
for (ShardSearchFailure f : info.getFailures()) {
80+
assertThat(f.reason(), containsString(reason));
81+
}
82+
}
83+
7384
private void assertClusterPartial(EsqlQueryResponse resp, String clusterAlias, int totalShards, int okShards) {
7485
EsqlExecutionInfo.Cluster clusterInfo = resp.getExecutionInfo().getCluster(clusterAlias);
7586
assertThat(clusterInfo.getTotalShards(), equalTo(totalShards));
@@ -83,6 +94,7 @@ private void assertClusterSuccess(EsqlQueryResponse resp, String clusterAlias, i
8394
assertThat(clusterInfo.getSuccessfulShards(), equalTo(numShards));
8495
assertThat(clusterInfo.getFailedShards(), equalTo(0));
8596
assertThat(clusterInfo.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL));
97+
assertThat(clusterInfo.getFailures(), empty());
8698
}
8799

88100
public void testPartialResults() throws Exception {
@@ -110,10 +122,12 @@ public void testPartialResults() throws Exception {
110122
assertTrue(returnedIds.add(id));
111123
assertThat(id, is(in(allIds)));
112124
}
113-
114125
assertClusterPartial(resp, LOCAL_CLUSTER, local);
115126
assertClusterPartial(resp, REMOTE_CLUSTER_1, remote1);
116127
assertClusterPartial(resp, REMOTE_CLUSTER_2, remote2);
128+
for (String cluster : List.of(LOCAL_CLUSTER, REMOTE_CLUSTER_1, REMOTE_CLUSTER_2)) {
129+
assertClusterFailure(resp, cluster, "Accessing failing field");
130+
}
117131
}
118132
}
119133

@@ -139,6 +153,7 @@ public void testOneRemoteClusterPartial() throws Exception {
139153
assertClusterSuccess(resp, LOCAL_CLUSTER, local.okShards);
140154
assertClusterSuccess(resp, REMOTE_CLUSTER_1, remote1.okShards);
141155
assertClusterPartial(resp, REMOTE_CLUSTER_2, remote2.failingShards, 0);
156+
assertClusterFailure(resp, REMOTE_CLUSTER_2, "Accessing failing field");
142157
}
143158
}
144159

@@ -191,9 +206,9 @@ public void sendResponse(Exception exception) {
191206
}
192207
assertThat(returnedIds, equalTo(Sets.union(local.okIds, remote1.okIds)));
193208
assertClusterSuccess(resp, LOCAL_CLUSTER, local.okShards);
194-
195209
EsqlExecutionInfo.Cluster remoteInfo = resp.getExecutionInfo().getCluster(REMOTE_CLUSTER_1);
196210
assertThat(remoteInfo.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL));
211+
assertClusterFailure(resp, REMOTE_CLUSTER_1, simulatedFailure.getMessage());
197212
}
198213
} finally {
199214
for (TransportService transportService : cluster(REMOTE_CLUSTER_1).getInstances(TransportService.class)) {
@@ -239,9 +254,9 @@ public void testFailToStartRequestOnRemoteCluster() throws Exception {
239254
}
240255
assertThat(returnedIds, equalTo(local.okIds));
241256
assertClusterSuccess(resp, LOCAL_CLUSTER, local.okShards);
242-
243257
EsqlExecutionInfo.Cluster remoteInfo = resp.getExecutionInfo().getCluster(REMOTE_CLUSTER_1);
244258
assertThat(remoteInfo.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL));
259+
assertClusterFailure(resp, REMOTE_CLUSTER_1, simulatedFailure.getMessage());
245260
}
246261
} finally {
247262
for (TransportService transportService : cluster(REMOTE_CLUSTER_1).getInstances(TransportService.class)) {
@@ -286,8 +301,7 @@ public void testFailSearchShardsOnLocalCluster() throws Exception {
286301
assertThat(returnedIds, equalTo(remote1.okIds));
287302
EsqlExecutionInfo.Cluster localInfo = resp.getExecutionInfo().getCluster(LOCAL_CLUSTER);
288303
assertThat(localInfo.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL));
289-
290-
assertClusterSuccess(resp, REMOTE_CLUSTER_1, remote1.okShards);
304+
assertClusterFailure(resp, LOCAL_CLUSTER, simulatedFailure.getMessage());
291305
}
292306
} finally {
293307
for (TransportService transportService : cluster(LOCAL_CLUSTER).getInstances(TransportService.class)) {

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlNodeFailureIT.java

+8
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.elasticsearch.plugins.Plugin;
1515
import org.elasticsearch.test.ESIntegTestCase;
1616
import org.elasticsearch.test.FailingFieldPlugin;
17+
import org.elasticsearch.transport.RemoteClusterService;
1718
import org.elasticsearch.xcontent.XContentBuilder;
1819
import org.elasticsearch.xcontent.json.JsonXContent;
1920
import org.elasticsearch.xpack.esql.EsqlTestUtils;
@@ -26,9 +27,12 @@
2627
import java.util.Set;
2728

2829
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
30+
import static org.hamcrest.Matchers.containsString;
31+
import static org.hamcrest.Matchers.empty;
2932
import static org.hamcrest.Matchers.equalTo;
3033
import static org.hamcrest.Matchers.in;
3134
import static org.hamcrest.Matchers.lessThanOrEqualTo;
35+
import static org.hamcrest.Matchers.not;
3236

3337
/**
3438
* Make sure the failures on the data node come back as failures over the wire.
@@ -121,6 +125,10 @@ public void testPartialResults() throws Exception {
121125
assertThat(id, in(okIds));
122126
assertTrue(actualIds.add(id));
123127
}
128+
EsqlExecutionInfo.Cluster localInfo = resp.getExecutionInfo().getCluster(RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY);
129+
assertThat(localInfo.getFailures(), not(empty()));
130+
assertThat(localInfo.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL));
131+
assertThat(localInfo.getFailures().get(0).reason(), containsString("Accessing failing field"));
124132
}
125133
}
126134
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,8 @@ private void updateExecutionInfo(EsqlExecutionInfo executionInfo, String cluster
163163
builder.setTook(executionInfo.tookSoFar());
164164
}
165165
if (v.getStatus() == EsqlExecutionInfo.Cluster.Status.RUNNING) {
166-
if (executionInfo.isStopped() || resp.failedShards > 0) {
166+
builder.setFailures(resp.failures);
167+
if (executionInfo.isStopped() || resp.failedShards > 0 || resp.failures.isEmpty() == false) {
167168
builder.setStatus(EsqlExecutionInfo.Cluster.Status.PARTIAL);
168169
} else {
169170
builder.setStatus(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL);
@@ -251,7 +252,7 @@ void runComputeOnRemoteCluster(
251252
try (var computeListener = new ComputeListener(transportService.getThreadPool(), cancelQueryOnFailure, listener.map(profiles -> {
252253
final TimeValue took = TimeValue.timeValueNanos(System.nanoTime() - startTimeInNanos);
253254
final ComputeResponse r = finalResponse.get();
254-
return new ComputeResponse(profiles, took, r.totalShards, r.successfulShards, r.skippedShards, r.failedShards);
255+
return new ComputeResponse(profiles, took, r.totalShards, r.successfulShards, r.skippedShards, r.failedShards, r.failures);
255256
}))) {
256257
var exchangeSource = new ExchangeSourceHandler(
257258
configuration.pragmas().exchangeBufferSize(),

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeResponse.java

+18-2
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
package org.elasticsearch.xpack.esql.plugin;
99

1010
import org.elasticsearch.TransportVersions;
11+
import org.elasticsearch.action.search.ShardSearchFailure;
1112
import org.elasticsearch.common.io.stream.StreamInput;
1213
import org.elasticsearch.common.io.stream.StreamOutput;
1314
import org.elasticsearch.compute.operator.DriverProfile;
@@ -29,9 +30,10 @@ final class ComputeResponse extends TransportResponse {
2930
public final int successfulShards;
3031
public final int skippedShards;
3132
public final int failedShards;
33+
public final List<ShardSearchFailure> failures;
3234

3335
ComputeResponse(List<DriverProfile> profiles) {
34-
this(profiles, null, null, null, null, null);
36+
this(profiles, null, null, null, null, null, List.of());
3537
}
3638

3739
ComputeResponse(
@@ -40,14 +42,16 @@ final class ComputeResponse extends TransportResponse {
4042
Integer totalShards,
4143
Integer successfulShards,
4244
Integer skippedShards,
43-
Integer failedShards
45+
Integer failedShards,
46+
List<ShardSearchFailure> failures
4447
) {
4548
this.profiles = profiles;
4649
this.took = took;
4750
this.totalShards = totalShards == null ? 0 : totalShards.intValue();
4851
this.successfulShards = successfulShards == null ? 0 : successfulShards.intValue();
4952
this.skippedShards = skippedShards == null ? 0 : skippedShards.intValue();
5053
this.failedShards = failedShards == null ? 0 : failedShards.intValue();
54+
this.failures = failures;
5155
}
5256

5357
ComputeResponse(StreamInput in) throws IOException {
@@ -74,6 +78,11 @@ final class ComputeResponse extends TransportResponse {
7478
this.skippedShards = 0;
7579
this.failedShards = 0;
7680
}
81+
if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_FAILURE_FROM_REMOTE)) {
82+
this.failures = in.readCollectionAsImmutableList(ShardSearchFailure::readShardSearchFailure);
83+
} else {
84+
this.failures = List.of();
85+
}
7786
}
7887

7988
@Override
@@ -93,6 +102,9 @@ public void writeTo(StreamOutput out) throws IOException {
93102
out.writeVInt(skippedShards);
94103
out.writeVInt(failedShards);
95104
}
105+
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_FAILURE_FROM_REMOTE)) {
106+
out.writeCollection(failures, (o, v) -> v.writeTo(o));
107+
}
96108
}
97109

98110
public List<DriverProfile> getProfiles() {
@@ -118,4 +130,8 @@ public int getSkippedShards() {
118130
public int getFailedShards() {
119131
return failedShards;
120132
}
133+
134+
public List<ShardSearchFailure> getFailures() {
135+
return failures;
136+
}
121137
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java

+1
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,7 @@ public void execute(
266266
.setSuccessfulShards(r.getSuccessfulShards())
267267
.setSkippedShards(r.getSkippedShards())
268268
.setFailedShards(r.getFailedShards())
269+
.setFailures(r.failures)
269270
.build()
270271
);
271272
dataNodesListener.onResponse(r.getProfiles());

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,13 @@ void startComputeOnDataNodes(
9999
ActionListener<ComputeResponse> outListener
100100
) {
101101
final boolean allowPartialResults = configuration.allowPartialResults();
102-
DataNodeRequestSender sender = new DataNodeRequestSender(transportService, esqlExecutor, parentTask, allowPartialResults) {
102+
DataNodeRequestSender sender = new DataNodeRequestSender(
103+
transportService,
104+
esqlExecutor,
105+
clusterAlias,
106+
parentTask,
107+
allowPartialResults
108+
) {
103109
@Override
104110
protected void sendRequest(
105111
DiscoveryNode node,

0 commit comments

Comments
 (0)