From ae3c1bb187197b7dac47c3e51d7ffcc9d80cf0ae Mon Sep 17 00:00:00 2001 From: Peter-Josef Meisch Date: Sat, 2 Aug 2025 18:28:32 +0200 Subject: [PATCH] Adjust Rest5Client building by using and exposing the callbacks provided by the Elasticsearch Java client library. Closes #3129 Signed-off-by: Peter-Josef Meisch --- .../ROOT/pages/elasticsearch/clients.adoc | 57 ++ .../client/elc/rest5_client/Rest5Clients.java | 494 ++++++++++-------- .../elasticsearch/client/RestClientsTest.java | 69 ++- 3 files changed, 380 insertions(+), 240 deletions(-) diff --git a/src/main/antora/modules/ROOT/pages/elasticsearch/clients.adoc b/src/main/antora/modules/ROOT/pages/elasticsearch/clients.adoc index b8250a9f92..86354312cc 100644 --- a/src/main/antora/modules/ROOT/pages/elasticsearch/clients.adoc +++ b/src/main/antora/modules/ROOT/pages/elasticsearch/clients.adoc @@ -389,6 +389,63 @@ ClientConfiguration.builder() ---- ==== +[[elasticsearch.clients.configurationcallbacks.connectionconfig]] +==== Configuration of the ConnectionConfig used by the low level Elasticsearch `Rest5Client`: + +This callback provides a `org.apache.hc.client5.http.config.ConnectionConfig` to configure the connection that is +used by the `Rest5Client`. + +==== +[source,java] +---- +ClientConfiguration.builder() + .connectedTo("localhost:9200", "localhost:9291") + .withClientConfigurer(Rest5Clients.ElasticsearchConnectionConfigurationCallback.from(connectionConfigBuilder -> { + // configure the connection + return connectionConfigBuilder; + })) + .build(); +---- +==== + +[[elasticsearch.clients.configurationcallbacks.connectioncmanager]] +==== Configuration of the ConnectionManager used by the low level Elasticsearch `Rest5Client`: + +This callback provides a `org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder` to configure the connection manager that is +used by the `Rest5Client`. + +==== +[source,java] +---- +ClientConfiguration.builder() + .connectedTo("localhost:9200", "localhost:9291") + .withClientConfigurer(Rest5Clients.ElasticsearchConnectionManagerCallback.from(connectionManagerBuilder -> { + // configure the connection manager + return connectionManagerBuilder; + })) + .build(); +---- +==== + +[[elasticsearch.clients.configurationcallbacks.requestconfig]] +==== Configuration of the RequestConfig used by the low level Elasticsearch `Rest5Client`: + +This callback provides a `org.apache.hc.client5.http.config.RequestConfig` to configure the RequestConfig that is +used by the `Rest5Client`. + +==== +[source,java] +---- +ClientConfiguration.builder() + .connectedTo("localhost:9200", "localhost:9291") + .withClientConfigurer(Rest5Clients.ElasticsearchRequestConfigCallback.from(requestConfigBuilder -> { + // configure the request config + return requestConfigBuilder; + })) + .build(); +---- +==== + [[elasticsearch.clients.logging]] == Client Logging diff --git a/src/main/java/org/springframework/data/elasticsearch/client/elc/rest5_client/Rest5Clients.java b/src/main/java/org/springframework/data/elasticsearch/client/elc/rest5_client/Rest5Clients.java index 4d40e99af4..885083a5f8 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/elc/rest5_client/Rest5Clients.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/elc/rest5_client/Rest5Clients.java @@ -13,20 +13,14 @@ import java.security.NoSuchAlgorithmException; import java.time.Duration; import java.util.List; -import java.util.Locale; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; import javax.net.ssl.SSLContext; import org.apache.hc.client5.http.config.ConnectionConfig; import org.apache.hc.client5.http.config.RequestConfig; -import org.apache.hc.client5.http.impl.DefaultAuthenticationStrategy; -import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; import org.apache.hc.client5.http.impl.async.HttpAsyncClientBuilder; -import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager; import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder; import org.apache.hc.client5.http.impl.routing.DefaultProxyRoutePlanner; import org.apache.hc.core5.http.Header; @@ -48,227 +42,269 @@ */ public final class Rest5Clients { - // values copied from Rest5ClientBuilder - public static final int DEFAULT_CONNECT_TIMEOUT_MILLIS = 1000; - public static final int DEFAULT_SOCKET_TIMEOUT_MILLIS = 30000; - public static final int DEFAULT_RESPONSE_TIMEOUT_MILLIS = 0; // meaning infinite - public static final int DEFAULT_MAX_CONN_PER_ROUTE = 10; - public static final int DEFAULT_MAX_CONN_TOTAL = 30; - - private Rest5Clients() {} - - /** - * Creates a low level {@link Rest5Client} for the given configuration. - * - * @param clientConfiguration must not be {@literal null} - * @return the {@link Rest5Client} - */ - public static Rest5Client getRest5Client(ClientConfiguration clientConfiguration) { - return getRest5ClientBuilder(clientConfiguration).build(); - } - - private static Rest5ClientBuilder getRest5ClientBuilder(ClientConfiguration clientConfiguration) { - - HttpHost[] httpHosts = getHttpHosts(clientConfiguration); - Rest5ClientBuilder builder = Rest5Client.builder(httpHosts); - - if (clientConfiguration.getPathPrefix() != null) { - builder.setPathPrefix(clientConfiguration.getPathPrefix()); - } - - HttpHeaders headers = clientConfiguration.getDefaultHeaders(); - - if (!headers.isEmpty()) { - builder.setDefaultHeaders(toHeaderArray(headers)); - } - - // we need to provide our own HttpClient, as the Rest5ClientBuilder - // does not provide a callback for configuration the http client as the old RestClientBuilder. - var httpClient = createHttpClient(clientConfiguration); - builder.setHttpClient(httpClient); - - for (ClientConfiguration.ClientConfigurationCallback clientConfigurationCallback : clientConfiguration - .getClientConfigurers()) { - if (clientConfigurationCallback instanceof ElasticsearchRest5ClientConfigurationCallback configurationCallback) { - builder = configurationCallback.configure(builder); - } - } - - return builder; - } - - private static HttpHost @NonNull [] getHttpHosts(ClientConfiguration clientConfiguration) { - List hosts = clientConfiguration.getEndpoints(); - boolean useSsl = clientConfiguration.useSsl(); - return hosts.stream() - .map(it -> (useSsl ? "https" : "http") + "://" + it.getHostString() + ':' + it.getPort()) - .map(URI::create) - .map(HttpHost::create) - .toArray(HttpHost[]::new); - } - - private static Header[] toHeaderArray(HttpHeaders headers) { - return headers.entrySet().stream() // - .flatMap(entry -> entry.getValue().stream() // - .map(value -> new BasicHeader(entry.getKey(), value))) // - .toList().toArray(new Header[0]); - } - - // the basic logic to create the http client is copied from the Rest5ClientBuilder class, this is taken from the - // Elasticsearch code, as there is no public usable instance in that - private static CloseableHttpAsyncClient createHttpClient(ClientConfiguration clientConfiguration) { - - var requestConfigBuilder = RequestConfig.custom(); - var connectionConfigBuilder = ConnectionConfig.custom(); - - Duration connectTimeout = clientConfiguration.getConnectTimeout(); - - if (!connectTimeout.isNegative()) { - connectionConfigBuilder.setConnectTimeout( - Timeout.of(Math.toIntExact(connectTimeout.toMillis()), TimeUnit.MILLISECONDS)); - } - - Duration socketTimeout = clientConfiguration.getSocketTimeout(); - - if (!socketTimeout.isNegative()) { - var soTimeout = Timeout.of(Math.toIntExact(socketTimeout.toMillis()), TimeUnit.MILLISECONDS); - connectionConfigBuilder.setSocketTimeout(soTimeout); - requestConfigBuilder.setConnectionRequestTimeout(soTimeout); - } else { - connectionConfigBuilder.setSocketTimeout(Timeout.of(DEFAULT_SOCKET_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)); - requestConfigBuilder - .setConnectionRequestTimeout(Timeout.of(DEFAULT_RESPONSE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)); - } - - try { - SSLContext sslContext = clientConfiguration.getCaFingerprint().isPresent() - ? TransportUtils.sslContextFromCaFingerprint(clientConfiguration.getCaFingerprint().get()) - : (clientConfiguration.getSslContext().isPresent() - ? clientConfiguration.getSslContext().get() - : SSLContext.getDefault()); - - ConnectionConfig connectionConfig = connectionConfigBuilder.build(); - - PoolingAsyncClientConnectionManager defaultConnectionManager = PoolingAsyncClientConnectionManagerBuilder.create() - .setDefaultConnectionConfig(connectionConfig) - .setMaxConnPerRoute(DEFAULT_MAX_CONN_PER_ROUTE) - .setMaxConnTotal(DEFAULT_MAX_CONN_TOTAL) - .setTlsStrategy(new BasicClientTlsStrategy(sslContext)) - .build(); - - var requestConfig = requestConfigBuilder.build(); - - var immutableRefToHttpClientBuilder = new Object() { - HttpAsyncClientBuilder httpClientBuilder = HttpAsyncClientBuilder.create() - .setDefaultRequestConfig(requestConfig) - .setConnectionManager(defaultConnectionManager) - .setUserAgent(VersionInfo.clientVersions()) - .setTargetAuthenticationStrategy(new DefaultAuthenticationStrategy()) - .setThreadFactory(new RestClientThreadFactory()); - }; - - clientConfiguration.getProxy().ifPresent(proxy -> { - try { - var proxyRoutePlanner = new DefaultProxyRoutePlanner(HttpHost.create(proxy)); - immutableRefToHttpClientBuilder.httpClientBuilder.setRoutePlanner(proxyRoutePlanner); - } catch (URISyntaxException e) { - throw new RuntimeException(e); - } - }); - - immutableRefToHttpClientBuilder.httpClientBuilder.addRequestInterceptorFirst((request, entity, context) -> { - clientConfiguration.getHeadersSupplier().get().forEach((header, values) -> { - // The accept and content-type headers are already put on the request, despite this being the first - // interceptor. - if ("Accept".equalsIgnoreCase(header) || " Content-Type".equalsIgnoreCase(header)) { - request.removeHeaders(header); - } - values.forEach(value -> request.addHeader(header, value)); - }); - }); - - for (ClientConfiguration.ClientConfigurationCallback clientConfigurer : clientConfiguration - .getClientConfigurers()) { - if (clientConfigurer instanceof ElasticsearchHttpClientConfigurationCallback httpClientConfigurer) { - immutableRefToHttpClientBuilder.httpClientBuilder = httpClientConfigurer.configure(immutableRefToHttpClientBuilder.httpClientBuilder); - } - } - - return immutableRefToHttpClientBuilder.httpClientBuilder.build(); - } catch (NoSuchAlgorithmException e) { - throw new IllegalStateException("could not create the default ssl context", e); - } - } - - /* - * Copied from the Elasticsearch code as this class is not public there. - */ - private static class RestClientThreadFactory implements ThreadFactory { - private static final AtomicLong CLIENT_THREAD_POOL_ID_GENERATOR = new AtomicLong(); - private final long clientThreadPoolId; - private final AtomicLong clientThreadId; - - private RestClientThreadFactory() { - this.clientThreadPoolId = CLIENT_THREAD_POOL_ID_GENERATOR.getAndIncrement(); - this.clientThreadId = new AtomicLong(); - } - - public Thread newThread(Runnable runnable) { - return new Thread(runnable, String.format(Locale.ROOT, "elasticsearch-rest-client-%d-thread-%d", - this.clientThreadPoolId, this.clientThreadId.incrementAndGet())); - } - } - - /** - * {@link org.springframework.data.elasticsearch.client.ClientConfiguration.ClientConfigurationCallback} to configure - * the Elasticsearch Rest5Client's Http client with a {@link HttpAsyncClientBuilder} - * - * @since 6.0 - */ - public interface ElasticsearchHttpClientConfigurationCallback - extends ClientConfiguration.ClientConfigurationCallback { - - static Rest5Clients.ElasticsearchHttpClientConfigurationCallback from( - Function httpClientBuilderCallback) { - - Assert.notNull(httpClientBuilderCallback, "httpClientBuilderCallback must not be null"); - - return httpClientBuilderCallback::apply; - } - } - - /** - * {@link ClientConfiguration.ClientConfigurationCallback} to configure the Rest5Client client with a - * {@link Rest5ClientBuilder} - * - * @since 6.0 - */ - public interface ElasticsearchRest5ClientConfigurationCallback - extends ClientConfiguration.ClientConfigurationCallback { - - static ElasticsearchRest5ClientConfigurationCallback from( - Function rest5ClientBuilderCallback) { - - Assert.notNull(rest5ClientBuilderCallback, "rest5ClientBuilderCallback must not be null"); - - return rest5ClientBuilderCallback::apply; - } - } - - public static Rest5ClientOptions.Builder getRest5ClientOptionsBuilder(@Nullable TransportOptions transportOptions) { - - if (transportOptions instanceof Rest5ClientOptions rest5ClientOptions) { - return rest5ClientOptions.toBuilder(); - } - - var builder = new Rest5ClientOptions.Builder(RequestOptions.DEFAULT.toBuilder()); - - if (transportOptions != null) { - transportOptions.headers().forEach(header -> builder.addHeader(header.getKey(), header.getValue())); - transportOptions.queryParameters().forEach(builder::setParameter); - builder.onWarnings(transportOptions.onWarnings()); - } - - return builder; - } + // values copied from Rest5ClientBuilder + public static final int DEFAULT_SOCKET_TIMEOUT_MILLIS = 30000; + public static final int DEFAULT_RESPONSE_TIMEOUT_MILLIS = 0; // meaning infinite + + private Rest5Clients() { + } + + /** + * Creates a low level {@link Rest5Client} for the given configuration. + * + * @param clientConfiguration must not be {@literal null} + * @return the {@link Rest5Client} + */ + public static Rest5Client getRest5Client(ClientConfiguration clientConfiguration) { + return getRest5ClientBuilder(clientConfiguration).build(); + } + + private static Rest5ClientBuilder getRest5ClientBuilder(ClientConfiguration clientConfiguration) { + + HttpHost[] httpHosts = getHttpHosts(clientConfiguration); + Rest5ClientBuilder builder = Rest5Client.builder(httpHosts); + + if (clientConfiguration.getPathPrefix() != null) { + builder.setPathPrefix(clientConfiguration.getPathPrefix()); + } + + HttpHeaders headers = clientConfiguration.getDefaultHeaders(); + + if (!headers.isEmpty()) { + builder.setDefaultHeaders(toHeaderArray(headers)); + } + + // RestClientBuilder configuration callbacks from the consumer + for (ClientConfiguration.ClientConfigurationCallback clientConfigurationCallback : clientConfiguration + .getClientConfigurers()) { + if (clientConfigurationCallback instanceof ElasticsearchRest5ClientConfigurationCallback configurationCallback) { + builder = configurationCallback.configure(builder); + } + } + + Duration connectTimeout = clientConfiguration.getConnectTimeout(); + Duration socketTimeout = clientConfiguration.getSocketTimeout(); + + builder.setHttpClientConfigCallback(httpAsyncClientBuilder -> { + + httpAsyncClientBuilder.setUserAgent(VersionInfo.clientVersions()); + if (clientConfiguration.getProxy().isPresent()) { + var proxy = clientConfiguration.getProxy().get(); + try { + var proxyRoutePlanner = new DefaultProxyRoutePlanner(HttpHost.create(proxy)); + httpAsyncClientBuilder.setRoutePlanner(proxyRoutePlanner); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + } + httpAsyncClientBuilder.addRequestInterceptorFirst((request, entity, context) -> { + clientConfiguration.getHeadersSupplier().get().forEach((header, values) -> { + // The accept and content-type headers are already put on the request, despite this being the first + // interceptor. + if ("Accept".equalsIgnoreCase(header) || " Content-Type".equalsIgnoreCase(header)) { + request.removeHeaders(header); + } + values.forEach(value -> request.addHeader(header, value)); + }); + }); + + // add httpclient configurator callbacks provided by the configuration + for (ClientConfiguration.ClientConfigurationCallback clientConfigurer : clientConfiguration + .getClientConfigurers()) { + if (clientConfigurer instanceof ElasticsearchHttpClientConfigurationCallback httpClientConfigurer) { + httpAsyncClientBuilder = httpClientConfigurer.configure(httpAsyncClientBuilder); + } + } + }); + + builder.setConnectionConfigCallback(connectionConfigBuilder -> { + + if (!connectTimeout.isNegative()) { + connectionConfigBuilder.setConnectTimeout( + Timeout.of(Math.toIntExact(connectTimeout.toMillis()), TimeUnit.MILLISECONDS)); + } + if (!socketTimeout.isNegative()) { + var soTimeout = Timeout.of(Math.toIntExact(socketTimeout.toMillis()), TimeUnit.MILLISECONDS); + connectionConfigBuilder.setSocketTimeout(soTimeout); + } else { + connectionConfigBuilder.setSocketTimeout(Timeout.of(DEFAULT_SOCKET_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)); + } + + // add connectionConfig configurator callbacks provided by the configuration + for (ClientConfiguration.ClientConfigurationCallback clientConfigurer : clientConfiguration + .getClientConfigurers()) { + if (clientConfigurer instanceof ElasticsearchConnectionConfigurationCallback connectionConfigurationCallback) { + connectionConfigBuilder = connectionConfigurationCallback.configure(connectionConfigBuilder); + } + } + }); + + builder.setConnectionManagerCallback(poolingAsyncClientConnectionManagerBuilder -> { + + SSLContext sslContext = null; + try { + sslContext = clientConfiguration.getCaFingerprint().isPresent() + ? TransportUtils.sslContextFromCaFingerprint(clientConfiguration.getCaFingerprint().get()) + : (clientConfiguration.getSslContext().isPresent() + ? clientConfiguration.getSslContext().get() + : SSLContext.getDefault()); + } catch (NoSuchAlgorithmException e) { + throw new IllegalStateException("could not create the default ssl context", e); + } + poolingAsyncClientConnectionManagerBuilder.setTlsStrategy(new BasicClientTlsStrategy(sslContext)); + + // add connectionManager configurator callbacks provided by the configuration + for (ClientConfiguration.ClientConfigurationCallback clientConfigurer : clientConfiguration + .getClientConfigurers()) { + if (clientConfigurer instanceof ElasticsearchConnectionManagerCallback connectionManagerCallback) { + poolingAsyncClientConnectionManagerBuilder = connectionManagerCallback.configure(poolingAsyncClientConnectionManagerBuilder); + } + } + }); + + builder.setRequestConfigCallback(requestConfigBuilder -> { + + if (!socketTimeout.isNegative()) { + var soTimeout = Timeout.of(Math.toIntExact(socketTimeout.toMillis()), TimeUnit.MILLISECONDS); + requestConfigBuilder.setConnectionRequestTimeout(soTimeout); + } else { + requestConfigBuilder + .setConnectionRequestTimeout(Timeout.of(DEFAULT_RESPONSE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)); + } + // add connectionConfig configurator callbacks provided by the configuration + for (ClientConfiguration.ClientConfigurationCallback clientConfigurer : clientConfiguration + .getClientConfigurers()) { + if (clientConfigurer instanceof ElasticsearchRequestConfigCallback requestConfigCallback) { + requestConfigBuilder = requestConfigCallback.configure(requestConfigBuilder); + } + } + }); + + return builder; + } + + private static HttpHost @NonNull [] getHttpHosts(ClientConfiguration clientConfiguration) { + List hosts = clientConfiguration.getEndpoints(); + boolean useSsl = clientConfiguration.useSsl(); + return hosts.stream() + .map(it -> (useSsl ? "https" : "http") + "://" + it.getHostString() + ':' + it.getPort()) + .map(URI::create) + .map(HttpHost::create) + .toArray(HttpHost[]::new); + } + + private static Header[] toHeaderArray(HttpHeaders headers) { + return headers.entrySet().stream() // + .flatMap(entry -> entry.getValue().stream() // + .map(value -> new BasicHeader(entry.getKey(), value))) // + .toList().toArray(new Header[0]); + } + + /** + * {@link ClientConfiguration.ClientConfigurationCallback} to configure the Rest5Client client with a + * {@link Rest5ClientBuilder} + * + * @since 6.0 + */ + public interface ElasticsearchRest5ClientConfigurationCallback + extends ClientConfiguration.ClientConfigurationCallback { + + static ElasticsearchRest5ClientConfigurationCallback from( + Function rest5ClientBuilderCallback) { + + Assert.notNull(rest5ClientBuilderCallback, "rest5ClientBuilderCallback must not be null"); + + return rest5ClientBuilderCallback::apply; + } + + } + + /** + * {@link org.springframework.data.elasticsearch.client.ClientConfiguration.ClientConfigurationCallback} to configure + * the Elasticsearch Rest5Client's Http client with a {@link HttpAsyncClientBuilder} + * + * @since 6.0 + */ + public interface ElasticsearchHttpClientConfigurationCallback + extends ClientConfiguration.ClientConfigurationCallback { + + static Rest5Clients.ElasticsearchHttpClientConfigurationCallback from( + Function httpClientBuilderCallback) { + + Assert.notNull(httpClientBuilderCallback, "httpClientBuilderCallback must not be null"); + + return httpClientBuilderCallback::apply; + } + } + + /** + * {@link org.springframework.data.elasticsearch.client.ClientConfiguration.ClientConfigurationCallback} to configure + * the Elasticsearch Rest5Client's connection with a {@link ConnectionConfig.Builder} + * + * @since 6.0 + */ + public interface ElasticsearchConnectionConfigurationCallback + extends ClientConfiguration.ClientConfigurationCallback { + + static ElasticsearchConnectionConfigurationCallback from( + Function connectionConfigBuilderCallback) { + + Assert.notNull(connectionConfigBuilderCallback, "connectionConfigBuilderCallback must not be null"); + + return connectionConfigBuilderCallback::apply; + } + } + + /** + * {@link org.springframework.data.elasticsearch.client.ClientConfiguration.ClientConfigurationCallback} to configure + * the Elasticsearch Rest5Client's connection manager with a {@link PoolingAsyncClientConnectionManagerBuilder} + * + * @since 6.0 + */ + public interface ElasticsearchConnectionManagerCallback + extends ClientConfiguration.ClientConfigurationCallback { + + static ElasticsearchConnectionManagerCallback from( + Function connectionManagerBuilderCallback) { + + Assert.notNull(connectionManagerBuilderCallback, "connectionManagerBuilderCallback must not be null"); + + return connectionManagerBuilderCallback::apply; + } + } + + /** + * {@link org.springframework.data.elasticsearch.client.ClientConfiguration.ClientConfigurationCallback} to configure + * the Elasticsearch Rest5Client's connection manager with a {@link RequestConfig.Builder} + * + * @since 6.0 + */ + public interface ElasticsearchRequestConfigCallback + extends ClientConfiguration.ClientConfigurationCallback { + + static ElasticsearchRequestConfigCallback from( + Function requestConfigBuilderCallback) { + + Assert.notNull(requestConfigBuilderCallback, "requestConfigBuilderCallback must not be null"); + + return requestConfigBuilderCallback::apply; + } + } + + public static Rest5ClientOptions.Builder getRest5ClientOptionsBuilder(@Nullable TransportOptions transportOptions) { + + if (transportOptions instanceof Rest5ClientOptions rest5ClientOptions) { + return rest5ClientOptions.toBuilder(); + } + + var builder = new Rest5ClientOptions.Builder(RequestOptions.DEFAULT.toBuilder()); + + if (transportOptions != null) { + transportOptions.headers().forEach(header -> builder.addHeader(header.getKey(), header.getValue())); + transportOptions.queryParameters().forEach(builder::setParameter); + builder.onWarnings(transportOptions.onWarnings()); + } + + return builder; + } } diff --git a/src/test/java/org/springframework/data/elasticsearch/client/RestClientsTest.java b/src/test/java/org/springframework/data/elasticsearch/client/RestClientsTest.java index 62c983a4bd..d8e3f9b1e2 100644 --- a/src/test/java/org/springframework/data/elasticsearch/client/RestClientsTest.java +++ b/src/test/java/org/springframework/data/elasticsearch/client/RestClientsTest.java @@ -47,7 +47,6 @@ import com.github.tomakehurst.wiremock.WireMockServer; import com.github.tomakehurst.wiremock.client.WireMock; -import com.github.tomakehurst.wiremock.common.ConsoleNotifier; import com.github.tomakehurst.wiremock.matching.AnythingPattern; import com.github.tomakehurst.wiremock.matching.EqualToPattern; import com.github.tomakehurst.wiremock.stubbing.StubMapping; @@ -104,8 +103,11 @@ void shouldConfigureClientAndSetAllRequiredHeaders(ClientUnderTestFactory client defaultHeaders.add("def2", "def2-1"); AtomicInteger supplierCount = new AtomicInteger(1); - AtomicInteger httpClientConfigurerCount = new AtomicInteger(0); AtomicInteger restClientConfigurerCount = new AtomicInteger(0); + AtomicInteger httpClientConfigurerCount = new AtomicInteger(0); + AtomicInteger connectionConfigurerCount = new AtomicInteger(0); + AtomicInteger connectionManagerConfigurerCount = new AtomicInteger(0); + AtomicInteger requestConfigurerCount = new AtomicInteger(0); ClientConfigurationBuilder configurationBuilder = new ClientConfigurationBuilder(); configurationBuilder // @@ -120,17 +122,31 @@ void shouldConfigureClientAndSetAllRequiredHeaders(ClientUnderTestFactory client }); if (clientUnderTestFactory instanceof ELCRest5ClientUnderTestFactory) { + configurationBuilder.withClientConfigurer( + Rest5Clients.ElasticsearchRest5ClientConfigurationCallback.from(rest5ClientBuilder -> { + restClientConfigurerCount.incrementAndGet(); + return rest5ClientBuilder; + })); configurationBuilder.withClientConfigurer( Rest5Clients.ElasticsearchHttpClientConfigurationCallback.from(httpClientBuilder -> { httpClientConfigurerCount.incrementAndGet(); return httpClientBuilder; })); configurationBuilder.withClientConfigurer( - Rest5Clients.ElasticsearchRest5ClientConfigurationCallback.from(rest5ClientBuilder -> { - restClientConfigurerCount.incrementAndGet(); - return rest5ClientBuilder; + Rest5Clients.ElasticsearchConnectionConfigurationCallback.from(connectionConfigBuilder -> { + connectionConfigurerCount.incrementAndGet(); + return connectionConfigBuilder; + })); + configurationBuilder.withClientConfigurer( + Rest5Clients.ElasticsearchConnectionManagerCallback.from(connectionManagerBuilder -> { + connectionManagerConfigurerCount.incrementAndGet(); + return connectionManagerBuilder; + })); + configurationBuilder.withClientConfigurer( + Rest5Clients.ElasticsearchRequestConfigCallback.from(requestConfigBuilder -> { + requestConfigurerCount.incrementAndGet(); + return requestConfigBuilder; })); - } else if (clientUnderTestFactory instanceof ELCRestClientUnderTestFactory) { configurationBuilder.withClientConfigurer( RestClients.ElasticsearchHttpClientConfigurationCallback.from(httpClientBuilder -> { @@ -177,8 +193,12 @@ void shouldConfigureClientAndSetAllRequiredHeaders(ClientUnderTestFactory client ; } + assertThat(restClientConfigurerCount).hasValue(clientUnderTestFactory.getExpectedRestClientConfigurerCalls()); assertThat(httpClientConfigurerCount).hasValue(1); - assertThat(restClientConfigurerCount).hasValue(clientUnderTestFactory.getExpectedRestClientConfigCalls()); + assertThat(connectionConfigurerCount).hasValue(clientUnderTestFactory.getExpectedConnectionConfigurerCalls()); + assertThat(connectionManagerConfigurerCount) + .hasValue(clientUnderTestFactory.getExpectedConnectionManagerConfigurerCalls()); + assertThat(requestConfigurerCount).hasValue(clientUnderTestFactory.getExpectedRequestConfigurerCalls()); }); } @@ -404,11 +424,23 @@ public String toString() { protected abstract String getDisplayName(); - protected Integer getExpectedRestClientConfigCalls() { + protected Integer getExpectedRestClientConfigurerCalls() { return 0; } protected abstract int getElasticsearchMajorVersion(); + + public Integer getExpectedConnectionConfigurerCalls() { + return 0; + } + + public Integer getExpectedConnectionManagerConfigurerCalls() { + return 0; + } + + public Integer getExpectedRequestConfigurerCalls() { + return 0; + } } /** @@ -423,7 +455,22 @@ protected String getDisplayName() { } @Override - protected Integer getExpectedRestClientConfigCalls() { + protected Integer getExpectedRestClientConfigurerCalls() { + return 1; + } + + @Override + public Integer getExpectedConnectionConfigurerCalls() { + return 1; + } + + @Override + public Integer getExpectedConnectionManagerConfigurerCalls() { + return 1; + } + + @Override + public Integer getExpectedRequestConfigurerCalls() { return 1; } @@ -467,7 +514,7 @@ protected String getDisplayName() { } @Override - protected Integer getExpectedRestClientConfigCalls() { + protected Integer getExpectedRestClientConfigurerCalls() { return 1; } @@ -511,7 +558,7 @@ protected String getDisplayName() { } @Override - protected Integer getExpectedRestClientConfigCalls() { + protected Integer getExpectedRestClientConfigurerCalls() { return 1; }