Skip to content

Commit a54add9

Browse files
Prevent :: selectors in remote cluster expressions when security is off (#125968) (#126023)
This PR prevents using `::` selectors in remote index expressions and adds a basic integration test with security off. Relates to #125252 (cherry picked from commit 83d7fe0) # Conflicts: # server/src/main/java/org/elasticsearch/action/ResolvedIndices.java
1 parent e7dd999 commit a54add9

File tree

2 files changed

+373
-0
lines changed

2 files changed

+373
-0
lines changed

server/src/main/java/org/elasticsearch/action/ResolvedIndices.java

+16
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,14 @@
1212
import org.elasticsearch.action.search.SearchContextId;
1313
import org.elasticsearch.action.support.IndicesOptions;
1414
import org.elasticsearch.cluster.ClusterState;
15+
import org.elasticsearch.cluster.metadata.DataStream;
1516
import org.elasticsearch.cluster.metadata.IndexMetadata;
1617
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
1718
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
1819
import org.elasticsearch.core.Nullable;
1920
import org.elasticsearch.index.Index;
2021
import org.elasticsearch.index.IndexNotFoundException;
22+
import org.elasticsearch.indices.InvalidIndexNameException;
2123
import org.elasticsearch.search.builder.PointInTimeBuilder;
2224
import org.elasticsearch.transport.RemoteClusterAware;
2325
import org.elasticsearch.transport.RemoteClusterService;
@@ -176,6 +178,20 @@ public static ResolvedIndices resolveWithIndexNamesAndOptions(
176178
? Index.EMPTY_ARRAY
177179
: indexNameExpressionResolver.concreteIndices(clusterState, localIndices, startTimeInMillis);
178180

181+
// prevent using selectors with remote cluster patterns
182+
if (DataStream.isFailureStoreFeatureFlagEnabled()) {
183+
for (final var indicesPerRemoteClusterAlias : remoteClusterIndices.entrySet()) {
184+
final String[] indices = indicesPerRemoteClusterAlias.getValue().indices();
185+
if (indices != null) {
186+
for (final String index : indices) {
187+
if (IndexNameExpressionResolver.hasSelectorSuffix(index)) {
188+
throw new InvalidIndexNameException(index, "Selectors are not yet supported on remote cluster patterns");
189+
}
190+
}
191+
}
192+
}
193+
}
194+
179195
return new ResolvedIndices(remoteClusterIndices, localIndices, resolveLocalIndexMetadata(concreteLocalIndices, clusterState, true));
180196
}
181197

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,357 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.remotecluster;
9+
10+
import org.apache.http.HttpHost;
11+
import org.elasticsearch.action.search.SearchResponse;
12+
import org.elasticsearch.client.Request;
13+
import org.elasticsearch.client.Response;
14+
import org.elasticsearch.client.ResponseException;
15+
import org.elasticsearch.client.RestClient;
16+
import org.elasticsearch.client.RestClientBuilder;
17+
import org.elasticsearch.common.settings.Settings;
18+
import org.elasticsearch.common.xcontent.support.XContentMapValues;
19+
import org.elasticsearch.core.IOUtils;
20+
import org.elasticsearch.core.Tuple;
21+
import org.elasticsearch.search.SearchHit;
22+
import org.elasticsearch.search.SearchResponseUtils;
23+
import org.elasticsearch.test.cluster.ElasticsearchCluster;
24+
import org.elasticsearch.test.cluster.FeatureFlag;
25+
import org.elasticsearch.test.cluster.local.distribution.DistributionType;
26+
import org.elasticsearch.test.rest.ESRestTestCase;
27+
import org.elasticsearch.test.rest.ObjectPath;
28+
import org.junit.AfterClass;
29+
import org.junit.BeforeClass;
30+
import org.junit.ClassRule;
31+
import org.junit.rules.RuleChain;
32+
import org.junit.rules.TestRule;
33+
34+
import java.io.IOException;
35+
import java.io.UncheckedIOException;
36+
import java.util.Arrays;
37+
import java.util.List;
38+
import java.util.Locale;
39+
import java.util.Map;
40+
import java.util.Set;
41+
import java.util.stream.Collectors;
42+
43+
import static org.hamcrest.Matchers.containsInAnyOrder;
44+
import static org.hamcrest.Matchers.containsString;
45+
import static org.hamcrest.Matchers.equalTo;
46+
import static org.hamcrest.Matchers.is;
47+
import static org.hamcrest.Matchers.nullValue;
48+
49+
public class RemoteClusterWithoutSecurityFailureStoreRestIT extends ESRestTestCase {
50+
51+
private static ElasticsearchCluster fulfillingCluster = ElasticsearchCluster.local()
52+
.distribution(DistributionType.DEFAULT)
53+
.name("fulfilling-cluster")
54+
.nodes(3)
55+
.feature(FeatureFlag.FAILURE_STORE_ENABLED)
56+
.module("analysis-common")
57+
.setting("xpack.license.self_generated.type", "trial")
58+
.setting("xpack.security.enabled", "false")
59+
.build();
60+
61+
private static ElasticsearchCluster queryCluster = ElasticsearchCluster.local()
62+
.distribution(DistributionType.DEFAULT)
63+
.name("query-cluster")
64+
.feature(FeatureFlag.FAILURE_STORE_ENABLED)
65+
.module("analysis-common")
66+
.setting("xpack.license.self_generated.type", "trial")
67+
.setting("xpack.security.enabled", "false")
68+
.build();
69+
70+
private static RestClient fulfillingClusterClient;
71+
72+
@BeforeClass
73+
public static void initFulfillingClusterClient() {
74+
if (fulfillingClusterClient != null) {
75+
return;
76+
}
77+
fulfillingClusterClient = buildRestClient(fulfillingCluster);
78+
}
79+
80+
@ClassRule
81+
public static TestRule clusterRule = RuleChain.outerRule(fulfillingCluster).around(queryCluster);
82+
83+
static RestClient buildRestClient(ElasticsearchCluster targetCluster) {
84+
assert targetCluster != null;
85+
final int numberOfFcNodes = targetCluster.getHttpAddresses().split(",").length;
86+
final String url = targetCluster.getHttpAddress(randomIntBetween(0, numberOfFcNodes - 1));
87+
88+
final int portSeparator = url.lastIndexOf(':');
89+
final var httpHost = new HttpHost(url.substring(0, portSeparator), Integer.parseInt(url.substring(portSeparator + 1)), "http");
90+
RestClientBuilder builder = RestClient.builder(httpHost);
91+
try {
92+
doConfigureClient(builder, Settings.EMPTY);
93+
} catch (IOException e) {
94+
throw new UncheckedIOException(e);
95+
}
96+
builder.setStrictDeprecationMode(true);
97+
return builder.build();
98+
}
99+
100+
@AfterClass
101+
public static void closeFulfillingClusterClient() throws IOException {
102+
try {
103+
IOUtils.close(fulfillingClusterClient);
104+
} finally {
105+
fulfillingClusterClient = null;
106+
}
107+
}
108+
109+
@Override
110+
protected String getTestRestCluster() {
111+
return queryCluster.getHttpAddress(0);
112+
}
113+
114+
public void testCrossClusterSearchWithoutSecurity() throws Exception {
115+
final boolean isProxyMode = randomBoolean();
116+
final boolean skipUnavailable = false; // we want to get actual failures and not skip and get empty results
117+
final boolean ccsMinimizeRoundtrips = randomBoolean();
118+
119+
configureRemoteCluster("my_remote_cluster", fulfillingCluster, isProxyMode, skipUnavailable);
120+
121+
// fulfilling cluster setup
122+
setupTestDataStreamOnFulfillingCluster();
123+
124+
// query cluster setup
125+
setupLocalDataOnQueryCluster();
126+
127+
// query remote cluster using :: selectors should not succeed, even with security off
128+
for (String indexName : Set.of(
129+
"test1::data",
130+
"test*::data",
131+
"*::data",
132+
"test1::failures",
133+
"test*::failures",
134+
"*::failures",
135+
"other1::failures",
136+
"non-existing::whatever"
137+
)) {
138+
final Request searchRequest = new Request(
139+
"GET",
140+
String.format(
141+
Locale.ROOT,
142+
"/%s:%s/_search?ccs_minimize_roundtrips=%s",
143+
randomFrom("my_remote_cluster", "*", "my_remote_*"),
144+
indexName,
145+
ccsMinimizeRoundtrips
146+
)
147+
);
148+
final ResponseException exception = expectThrows(ResponseException.class, () -> client().performRequest(searchRequest));
149+
assertSelectorsNotSupported(exception);
150+
}
151+
152+
final Tuple<String, String> backingIndices = getSingleDataAndFailureIndices("test1");
153+
final String backingDataIndexName = backingIndices.v1();
154+
final String backingFailureIndexName = backingIndices.v2();
155+
156+
// searching without selectors should work
157+
{
158+
final boolean alsoSearchLocally = randomBoolean();
159+
final Request dataSearchRequest = new Request(
160+
"GET",
161+
String.format(
162+
Locale.ROOT,
163+
"/%s%s:%s/_search?ccs_minimize_roundtrips=%s",
164+
alsoSearchLocally ? "local_index," : "",
165+
randomFrom("my_remote_cluster", "*", "my_remote_*"),
166+
randomFrom("test1", "test*", "*", backingDataIndexName),
167+
ccsMinimizeRoundtrips
168+
)
169+
);
170+
final String[] expectedIndices = alsoSearchLocally
171+
? new String[] { "local_index", backingDataIndexName }
172+
: new String[] { backingDataIndexName };
173+
assertSearchResponseContainsIndices(client().performRequest(dataSearchRequest), expectedIndices);
174+
}
175+
176+
// also, searching directly the backing failure index should work
177+
{
178+
Request failureIndexSearchRequest = new Request(
179+
"GET",
180+
String.format(
181+
Locale.ROOT,
182+
"/my_remote_cluster:%s/_search?ccs_minimize_roundtrips=%s",
183+
backingFailureIndexName,
184+
ccsMinimizeRoundtrips
185+
)
186+
);
187+
assertSearchResponseContainsIndices(client().performRequest(failureIndexSearchRequest), backingFailureIndexName);
188+
}
189+
}
190+
191+
private static void setupLocalDataOnQueryCluster() throws IOException {
192+
final var indexDocRequest = new Request("POST", "/local_index/_doc?refresh=true");
193+
indexDocRequest.setJsonEntity("{\"local_foo\": \"local_bar\"}");
194+
assertOK(client().performRequest(indexDocRequest));
195+
}
196+
197+
protected void setupTestDataStreamOnFulfillingCluster() throws IOException {
198+
// Create data stream and index some documents
199+
final Request createComponentTemplate = new Request("PUT", "/_component_template/component1");
200+
createComponentTemplate.setJsonEntity("""
201+
{
202+
"template": {
203+
"mappings": {
204+
"properties": {
205+
"@timestamp": {
206+
"type": "date"
207+
},
208+
"age": {
209+
"type": "integer"
210+
},
211+
"email": {
212+
"type": "keyword"
213+
},
214+
"name": {
215+
"type": "text"
216+
}
217+
}
218+
},
219+
"data_stream_options": {
220+
"failure_store": {
221+
"enabled": true
222+
}
223+
}
224+
}
225+
}""");
226+
assertOK(performRequestAgainstFulfillingCluster(createComponentTemplate));
227+
228+
final Request createTemplate = new Request("PUT", "/_index_template/template1");
229+
createTemplate.setJsonEntity("""
230+
{
231+
"index_patterns": ["test*"],
232+
"data_stream": {},
233+
"priority": 500,
234+
"composed_of": ["component1"]
235+
}""");
236+
assertOK(performRequestAgainstFulfillingCluster(createTemplate));
237+
238+
final Request createDoc1 = new Request("PUT", "/test1/_doc/1?refresh=true&op_type=create");
239+
createDoc1.setJsonEntity("""
240+
{
241+
"@timestamp": 1,
242+
"age" : 1,
243+
"name" : "jack",
244+
"email" : "jack@example.com"
245+
}""");
246+
assertOK(performRequestAgainstFulfillingCluster(createDoc1));
247+
248+
final Request createDoc2 = new Request("PUT", "/test1/_doc/2?refresh=true&op_type=create");
249+
createDoc2.setJsonEntity("""
250+
{
251+
"@timestamp": 2,
252+
"age" : "this should be an int",
253+
"name" : "jack",
254+
"email" : "jack@example.com"
255+
}""");
256+
assertOK(performRequestAgainstFulfillingCluster(createDoc2));
257+
}
258+
259+
private static void assertSelectorsNotSupported(ResponseException exception) {
260+
assertThat(exception.getResponse().getStatusLine().getStatusCode(), equalTo(400));
261+
assertThat(exception.getMessage(), containsString("Selectors are not yet supported on remote cluster patterns"));
262+
}
263+
264+
private static void assertSearchResponseContainsIndices(Response response, String... expectedIndices) throws IOException {
265+
assertOK(response);
266+
final SearchResponse searchResponse = SearchResponseUtils.parseSearchResponse(responseAsParser(response));
267+
try {
268+
final List<String> actualIndices = Arrays.stream(searchResponse.getHits().getHits())
269+
.map(SearchHit::getIndex)
270+
.collect(Collectors.toList());
271+
assertThat(actualIndices, containsInAnyOrder(expectedIndices));
272+
} finally {
273+
searchResponse.decRef();
274+
}
275+
}
276+
277+
private static Response performRequestAgainstFulfillingCluster(Request request) throws IOException {
278+
return fulfillingClusterClient.performRequest(request);
279+
}
280+
281+
private void configureRemoteCluster(
282+
String clusterAlias,
283+
ElasticsearchCluster targetFulfillingCluster,
284+
boolean isProxyMode,
285+
boolean skipUnavailable
286+
) throws Exception {
287+
// For configurable remote cluster security, this method assumes the cross cluster access API key is already configured in keystore
288+
putRemoteClusterSettings(clusterAlias, targetFulfillingCluster, isProxyMode, skipUnavailable);
289+
290+
// Ensure remote cluster is connected
291+
checkRemoteConnection(clusterAlias, targetFulfillingCluster, isProxyMode);
292+
}
293+
294+
private void putRemoteClusterSettings(
295+
String clusterAlias,
296+
ElasticsearchCluster targetFulfillingCluster,
297+
boolean isProxyMode,
298+
boolean skipUnavailable
299+
) throws IOException {
300+
final Settings.Builder builder = Settings.builder();
301+
final String remoteClusterEndpoint = targetFulfillingCluster.getTransportEndpoint(0);
302+
if (isProxyMode) {
303+
builder.put("cluster.remote." + clusterAlias + ".mode", "proxy")
304+
.put("cluster.remote." + clusterAlias + ".proxy_address", remoteClusterEndpoint)
305+
.putNull("cluster.remote." + clusterAlias + ".seeds");
306+
} else {
307+
builder.put("cluster.remote." + clusterAlias + ".mode", "sniff")
308+
.putList("cluster.remote." + clusterAlias + ".seeds", remoteClusterEndpoint)
309+
.putNull("cluster.remote." + clusterAlias + ".proxy_address");
310+
}
311+
builder.put("cluster.remote." + clusterAlias + ".skip_unavailable", skipUnavailable);
312+
updateClusterSettings(builder.build());
313+
}
314+
315+
private void checkRemoteConnection(String clusterAlias, ElasticsearchCluster targetFulfillingCluster, boolean isProxyMode)
316+
throws Exception {
317+
final Request remoteInfoRequest = new Request("GET", "/_remote/info");
318+
assertBusy(() -> {
319+
final Response remoteInfoResponse = adminClient().performRequest(remoteInfoRequest);
320+
assertOK(remoteInfoResponse);
321+
final ObjectPath remoteInfoObjectPath = assertOKAndCreateObjectPath(remoteInfoResponse);
322+
assertThat(remoteInfoObjectPath.evaluate(clusterAlias + ".connected"), is(true));
323+
if (false == isProxyMode) {
324+
int numberOfFcNodes = (int) Arrays.stream(targetFulfillingCluster.getRemoteClusterServerEndpoints().split(","))
325+
.filter(endpoint -> endpoint.length() > 0)
326+
.count();
327+
if (numberOfFcNodes == 0) {
328+
// The cluster is an RCS 1.0 remote cluster
329+
numberOfFcNodes = targetFulfillingCluster.getTransportEndpoints().split(",").length;
330+
}
331+
assertThat(remoteInfoObjectPath.evaluate(clusterAlias + ".num_nodes_connected"), equalTo(numberOfFcNodes));
332+
}
333+
final String credentialsValue = remoteInfoObjectPath.evaluate(clusterAlias + ".cluster_credentials");
334+
assertThat(credentialsValue, nullValue());
335+
});
336+
}
337+
338+
@SuppressWarnings("unchecked")
339+
private Tuple<List<String>, List<String>> getDataAndFailureIndices(String dataStreamName) throws IOException {
340+
Request dataStream = new Request("GET", "/_data_stream/" + dataStreamName);
341+
Response response = performRequestAgainstFulfillingCluster(dataStream);
342+
Map<String, Object> dataStreams = entityAsMap(response);
343+
List<String> dataIndexNames = (List<String>) XContentMapValues.extractValue("data_streams.indices.index_name", dataStreams);
344+
List<String> failureIndexNames = (List<String>) XContentMapValues.extractValue(
345+
"data_streams.failure_store.indices.index_name",
346+
dataStreams
347+
);
348+
return new Tuple<>(dataIndexNames, failureIndexNames);
349+
}
350+
351+
private Tuple<String, String> getSingleDataAndFailureIndices(String dataStreamName) throws IOException {
352+
Tuple<List<String>, List<String>> indices = getDataAndFailureIndices(dataStreamName);
353+
assertThat(indices.v1().size(), equalTo(1));
354+
assertThat(indices.v2().size(), equalTo(1));
355+
return new Tuple<>(indices.v1().get(0), indices.v2().get(0));
356+
}
357+
}

0 commit comments

Comments
 (0)