From 6abe4c951dcb81c16aed873d26009b99823b74bd Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Wed, 21 May 2014 13:37:03 +0200 Subject: [PATCH] Add HierarchyCircuitBreakerService Adds a breaker for request BigArrays, which are used for parent/child queries as well as some aggregations. Certain operations like Netty HTTP responses and transport responses increment the breaker, but will not trip. This also changes the output of the nodes' stats endpoint to show the parent breaker as well as the fielddata and request breakers. There are a number of new settings for breakers now: `indices.breaker.total.limit`: starting limit for all memory-use breaker, defaults to 70% `indices.breaker.fielddata.limit`: starting limit for fielddata breaker, defaults to 60% `indices.breaker.fielddata.overhead`: overhead for fielddata breaker estimations, defaults to 1.03 (the fielddata breaker settings also use the backwards-compatible setting `indices.fielddata.breaker.limit` and `indices.fielddata.breaker.overhead`) `indices.breaker.request.limit`: starting limit for request breaker, defaults to 40% `indices.breaker.request.overhead`: request breaker estimation overhead, defaults to 1.0 The breaker service infrastructure is now generic and opens the path to adding additional circuit breakers in the future. Fixes #6129 Conflicts: src/main/java/org/elasticsearch/index/fielddata/IndexFieldData.java src/main/java/org/elasticsearch/index/fielddata/IndexFieldDataService.java src/main/java/org/elasticsearch/index/fielddata/RamAccountingTermsEnum.java src/main/java/org/elasticsearch/index/fielddata/ordinals/GlobalOrdinalsBuilder.java src/main/java/org/elasticsearch/index/fielddata/ordinals/InternalGlobalOrdinalsBuilder.java src/main/java/org/elasticsearch/index/fielddata/plain/AbstractIndexOrdinalsFieldData.java src/main/java/org/elasticsearch/index/fielddata/plain/DisabledIndexFieldData.java src/main/java/org/elasticsearch/index/fielddata/plain/IndexIndexFieldData.java src/main/java/org/elasticsearch/index/fielddata/plain/NonEstimatingEstimator.java src/main/java/org/elasticsearch/index/fielddata/plain/PackedArrayIndexFieldData.java src/main/java/org/elasticsearch/index/fielddata/plain/ParentChildIndexFieldData.java src/main/java/org/elasticsearch/index/fielddata/plain/SortedSetDVOrdinalsIndexFieldData.java src/main/java/org/elasticsearch/node/internal/InternalNode.java src/test/java/org/elasticsearch/index/aliases/IndexAliasesServiceTests.java src/test/java/org/elasticsearch/index/codec/CodecTests.java src/test/java/org/elasticsearch/index/fielddata/AbstractFieldDataTests.java src/test/java/org/elasticsearch/index/fielddata/IndexFieldDataServiceTests.java src/test/java/org/elasticsearch/index/mapper/MapperTestUtils.java src/test/java/org/elasticsearch/index/query/IndexQueryParserFilterCachingTests.java src/test/java/org/elasticsearch/index/query/SimpleIndexQueryParserTests.java src/test/java/org/elasticsearch/index/query/guice/IndexQueryParserModuleTests.java src/test/java/org/elasticsearch/index/search/FieldDataTermsFilterTests.java src/test/java/org/elasticsearch/index/search/child/ChildrenConstantScoreQueryTests.java src/test/java/org/elasticsearch/index/similarity/SimilarityTests.java --- .../index-modules/fielddata.asciidoc | 61 +++- .../admin/cluster/node/stats/NodeStats.java | 10 +- .../client/transport/TransportClient.java | 2 + .../ClusterDynamicSettingsModule.java | 8 +- .../breaker/ChildMemoryCircuitBreaker.java | 224 ++++++++++++++ .../common/breaker/CircuitBreaker.java | 116 +++++++ .../breaker/CircuitBreakingException.java | 20 +- .../common/breaker/MemoryCircuitBreaker.java | 27 +- .../common/unit/MemorySizeValue.java | 2 +- .../common/util/AbstractArray.java | 2 +- .../elasticsearch/common/util/BigArrays.java | 75 +++-- .../index/fielddata/IndexFieldData.java | 2 +- .../fielddata/IndexFieldDataService.java | 4 +- .../fielddata/RamAccountingTermsEnum.java | 6 +- .../ordinals/GlobalOrdinalsBuilder.java | 5 +- .../plain/AbstractIndexOrdinalsFieldData.java | 2 +- .../plain/BytesBinaryDVIndexFieldData.java | 2 +- .../plain/DisabledIndexFieldData.java | 2 +- .../plain/DocValuesIndexFieldData.java | 2 +- .../plain/DoubleArrayIndexFieldData.java | 5 +- .../plain/FSTBytesIndexFieldData.java | 5 +- .../plain/FloatArrayIndexFieldData.java | 5 +- .../plain/GeoPointBinaryDVIndexFieldData.java | 2 +- .../GeoPointCompressedIndexFieldData.java | 5 +- .../GeoPointDoubleArrayIndexFieldData.java | 5 +- .../fielddata/plain/IndexIndexFieldData.java | 2 +- .../plain/NonEstimatingEstimator.java | 6 +- .../plain/PackedArrayIndexFieldData.java | 10 +- .../plain/PagedBytesIndexFieldData.java | 10 +- .../plain/ParentChildIndexFieldData.java | 12 +- .../SortedSetDVOrdinalsIndexFieldData.java | 2 +- .../elasticsearch/indices/IndicesModule.java | 5 +- .../breaker/AllCircuitBreakerStats.java | 102 +++++++ .../indices/breaker/BreakerSettings.java | 58 ++++ .../CircuitBreakerModule.java} | 29 +- .../breaker/CircuitBreakerService.java | 60 ++++ .../CircuitBreakerStats.java} | 60 ++-- .../HierarchyCircuitBreakerService.java | 235 ++++++++++++++ .../InternalCircuitBreakerService.java | 30 +- .../breaker/NoneCircuitBreakerService.java | 22 +- .../cache/IndicesFieldDataCacheListener.java | 5 +- .../node/internal/InternalNode.java | 5 +- .../node/service/NodeService.java | 2 +- .../percolator/PercolateContext.java | 2 +- .../search/internal/DefaultSearchContext.java | 3 +- .../fielddata/LongFieldDataBenchmark.java | 2 +- .../breaker/MemoryCircuitBreakerTests.java | 127 ++++++++ .../common/bytes/PagedBytesReferenceTest.java | 3 +- .../common/util/BigArraysTests.java | 51 ++-- .../index/query/TemplateQueryParserTest.java | 4 +- .../plugin/IndexQueryParserPlugin2Tests.java | 8 +- .../plugin/IndexQueryParserPluginTests.java | 8 +- .../breaker/CircuitBreakerServiceTests.java | 169 ---------- .../breaker/CircuitBreakerServiceTests.java | 288 ++++++++++++++++++ .../breaker/CircuitBreakerUnitTests.java | 66 ++++ .../RandomExceptionCircuitBreakerTests.java | 10 +- .../org/elasticsearch/test/TestCluster.java | 7 +- .../elasticsearch/test/TestSearchContext.java | 2 +- .../test/cache/recycler/MockBigArrays.java | 51 ++-- 59 files changed, 1640 insertions(+), 415 deletions(-) create mode 100644 src/main/java/org/elasticsearch/common/breaker/ChildMemoryCircuitBreaker.java create mode 100644 src/main/java/org/elasticsearch/common/breaker/CircuitBreaker.java create mode 100644 src/main/java/org/elasticsearch/indices/breaker/AllCircuitBreakerStats.java create mode 100644 src/main/java/org/elasticsearch/indices/breaker/BreakerSettings.java rename src/main/java/org/elasticsearch/indices/{fielddata/breaker/CircuitBreakerService.java => breaker/CircuitBreakerModule.java} (57%) create mode 100644 src/main/java/org/elasticsearch/indices/breaker/CircuitBreakerService.java rename src/main/java/org/elasticsearch/indices/{fielddata/breaker/FieldDataBreakerStats.java => breaker/CircuitBreakerStats.java} (61%) create mode 100644 src/main/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerService.java rename src/main/java/org/elasticsearch/indices/{fielddata => }/breaker/InternalCircuitBreakerService.java (83%) rename src/main/java/org/elasticsearch/indices/{fielddata => }/breaker/NoneCircuitBreakerService.java (65%) delete mode 100644 src/test/java/org/elasticsearch/indices/fielddata/breaker/CircuitBreakerServiceTests.java create mode 100644 src/test/java/org/elasticsearch/indices/memory/breaker/CircuitBreakerServiceTests.java create mode 100644 src/test/java/org/elasticsearch/indices/memory/breaker/CircuitBreakerUnitTests.java rename src/test/java/org/elasticsearch/indices/{fielddata => memory}/breaker/RandomExceptionCircuitBreakerTests.java (96%) diff --git a/docs/reference/index-modules/fielddata.asciidoc b/docs/reference/index-modules/fielddata.asciidoc index 2afd89ed19c11..d36fe47d837d2 100644 --- a/docs/reference/index-modules/fielddata.asciidoc +++ b/docs/reference/index-modules/fielddata.asciidoc @@ -24,28 +24,63 @@ field data after a certain time of inactivity. Defaults to `-1`. For example, can be set to `5m` for a 5 minute expiry. |======================================================================= +[float] +[[circuit-breaker]] +=== Circuit Breaker + +coming[1.4.0,Prior to 1.4.0 there was only a single circuit breaker for fielddata] + +Elasticsearch contains multiple circuit breakers used to prevent operations from +causing an OutOfMemoryError. Each breaker specifies a limit for how much memory +it can use. Additionally, there is a parent-level breaker that specifies the +total amount of memory that can be used across all breakers. + +The parent-level breaker can be configured with the following setting: + +`indices.breaker.total.limit`:: + Starting limit for overall parent breaker, defaults to 70% of JVM heap + +All circuit breaker settings can be changed dynamically using the cluster update +settings API. + [float] [[fielddata-circuit-breaker]] -=== Field data circuit breaker +==== Field data circuit breaker The field data circuit breaker allows Elasticsearch to estimate the amount of memory a field will required to be loaded into memory. It can then prevent the field data loading by raising an exception. By default the limit is configured to 60% of the maximum JVM heap. It can be configured with the following parameters: -[cols="<,<",options="header",] -|======================================================================= -|Setting |Description -|`indices.fielddata.breaker.limit` |Maximum size of estimated field data -to allow loading. Defaults to 60% of the maximum JVM heap. -|`indices.fielddata.breaker.overhead` |A constant that all field data -estimations are multiplied with to determine a final estimation. Defaults to -1.03 -|======================================================================= +`indices.breaker.fielddata.limit`:: + Limit for fielddata breaker, defaults to 60% of JVM heap + +`indices.breaker.fielddata.overhead`:: + A constant that all field data estimations are multiplied with to determine a + final estimation. Defaults to 1.03 + +`indices.fielddata.breaker.limit`:: + deprecated[1.4.0,Replaced by `indices.breaker.fielddata.limit`] + +`indices.fielddata.breaker.overhead`:: + deprecated[1.4.0,Replaced by `indices.breaker.fielddata.overhead`] + +[float] +[[request-circuit-breaker]] +==== Request circuit breaker + +coming[1.4.0] + +The request circuit breaker allows Elasticsearch to prevent per-request data +structures (for example, memory used for calculating aggregations during a +request) from exceeding a certain amount of memory. + +`indices.breaker.request.limit`:: + Limit for request breaker, defaults to 40% of JVM heap -Both the `indices.fielddata.breaker.limit` and -`indices.fielddata.breaker.overhead` can be changed dynamically using the -cluster update settings API. +`indices.breaker.request.overhead`:: + A constant that all request estimations are multiplied with to determine a + final estimation. Defaults to 1 [float] [[fielddata-monitoring]] diff --git a/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStats.java b/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStats.java index 0910628188350..7ee06cb7002b5 100644 --- a/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStats.java +++ b/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStats.java @@ -28,7 +28,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.http.HttpStats; import org.elasticsearch.indices.NodeIndicesStats; -import org.elasticsearch.indices.fielddata.breaker.FieldDataBreakerStats; +import org.elasticsearch.indices.breaker.AllCircuitBreakerStats; import org.elasticsearch.monitor.fs.FsStats; import org.elasticsearch.monitor.jvm.JvmStats; import org.elasticsearch.monitor.network.NetworkStats; @@ -75,7 +75,7 @@ public class NodeStats extends NodeOperationResponse implements ToXContent { private HttpStats http; @Nullable - private FieldDataBreakerStats breaker; + private AllCircuitBreakerStats breaker; NodeStats() { } @@ -83,7 +83,7 @@ public class NodeStats extends NodeOperationResponse implements ToXContent { public NodeStats(DiscoveryNode node, long timestamp, @Nullable NodeIndicesStats indices, @Nullable OsStats os, @Nullable ProcessStats process, @Nullable JvmStats jvm, @Nullable ThreadPoolStats threadPool, @Nullable NetworkStats network, @Nullable FsStats fs, @Nullable TransportStats transport, @Nullable HttpStats http, - @Nullable FieldDataBreakerStats breaker) { + @Nullable AllCircuitBreakerStats breaker) { super(node); this.timestamp = timestamp; this.indices = indices; @@ -174,7 +174,7 @@ public HttpStats getHttp() { } @Nullable - public FieldDataBreakerStats getBreaker() { + public AllCircuitBreakerStats getBreaker() { return this.breaker; } @@ -215,7 +215,7 @@ public void readFrom(StreamInput in) throws IOException { if (in.readBoolean()) { http = HttpStats.readHttpStats(in); } - breaker = FieldDataBreakerStats.readOptionalCircuitBreakerStats(in); + breaker = AllCircuitBreakerStats.readOptionalAllCircuitBreakerStats(in); } @Override diff --git a/src/main/java/org/elasticsearch/client/transport/TransportClient.java b/src/main/java/org/elasticsearch/client/transport/TransportClient.java index f3957f8b3b3b8..3201761e54aff 100644 --- a/src/main/java/org/elasticsearch/client/transport/TransportClient.java +++ b/src/main/java/org/elasticsearch/client/transport/TransportClient.java @@ -76,6 +76,7 @@ import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.env.Environment; import org.elasticsearch.env.EnvironmentModule; +import org.elasticsearch.indices.breaker.CircuitBreakerModule; import org.elasticsearch.monitor.MonitorService; import org.elasticsearch.node.internal.InternalSettingsPreparer; import org.elasticsearch.plugins.PluginsModule; @@ -187,6 +188,7 @@ public TransportClient(Settings pSettings, boolean loadConfigSettings) throws El modules.add(new TransportModule(this.settings)); modules.add(new ActionModule(true)); modules.add(new ClientTransportModule()); + modules.add(new CircuitBreakerModule(this.settings)); injector = modules.createInjector(); diff --git a/src/main/java/org/elasticsearch/cluster/settings/ClusterDynamicSettingsModule.java b/src/main/java/org/elasticsearch/cluster/settings/ClusterDynamicSettingsModule.java index 6a442a508e0d3..ab8a48bac1d2c 100644 --- a/src/main/java/org/elasticsearch/cluster/settings/ClusterDynamicSettingsModule.java +++ b/src/main/java/org/elasticsearch/cluster/settings/ClusterDynamicSettingsModule.java @@ -28,8 +28,9 @@ import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.discovery.DiscoverySettings; import org.elasticsearch.discovery.zen.elect.ElectMasterService; +import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService; import org.elasticsearch.indices.cache.filter.IndicesFilterCache; -import org.elasticsearch.indices.fielddata.breaker.InternalCircuitBreakerService; +import org.elasticsearch.indices.breaker.InternalCircuitBreakerService; import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.indices.store.IndicesStore; import org.elasticsearch.indices.ttl.IndicesTTLService; @@ -85,6 +86,11 @@ public ClusterDynamicSettingsModule() { clusterDynamicSettings.addDynamicSetting(InternalCircuitBreakerService.CIRCUIT_BREAKER_OVERHEAD_SETTING, Validator.NON_NEGATIVE_DOUBLE); clusterDynamicSettings.addDynamicSetting(DestructiveOperations.REQUIRES_NAME); clusterDynamicSettings.addDynamicSetting(DiscoverySettings.PUBLISH_TIMEOUT, Validator.TIME_NON_NEGATIVE); + clusterDynamicSettings.addDynamicSetting(HierarchyCircuitBreakerService.TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING, Validator.MEMORY_SIZE); + clusterDynamicSettings.addDynamicSetting(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING, Validator.MEMORY_SIZE); + clusterDynamicSettings.addDynamicSetting(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING, Validator.NON_NEGATIVE_DOUBLE); + clusterDynamicSettings.addDynamicSetting(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING, Validator.MEMORY_SIZE); + clusterDynamicSettings.addDynamicSetting(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_OVERHEAD_SETTING, Validator.NON_NEGATIVE_DOUBLE); } public void addDynamicSettings(String... settings) { diff --git a/src/main/java/org/elasticsearch/common/breaker/ChildMemoryCircuitBreaker.java b/src/main/java/org/elasticsearch/common/breaker/ChildMemoryCircuitBreaker.java new file mode 100644 index 0000000000000..1a55e4c3c74f7 --- /dev/null +++ b/src/main/java/org/elasticsearch/common/breaker/ChildMemoryCircuitBreaker.java @@ -0,0 +1,224 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.breaker; + +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.indices.breaker.BreakerSettings; +import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService; + +import java.util.concurrent.atomic.AtomicLong; + +/** + * Breaker that will check a parent's when incrementing + */ +public class ChildMemoryCircuitBreaker implements CircuitBreaker { + + private final long memoryBytesLimit; + private final BreakerSettings settings; + private final double overheadConstant; + private final AtomicLong used; + private final AtomicLong trippedCount; + private final ESLogger logger; + private final HierarchyCircuitBreakerService parent; + private final Name name; + + /** + * Create a circuit breaker that will break if the number of estimated + * bytes grows above the limit. All estimations will be multiplied by + * the given overheadConstant. This breaker starts with 0 bytes used. + * @param settings settings to configure this breaker + * @param parent parent circuit breaker service to delegate tripped breakers to + * @param name the name of the breaker + */ + public ChildMemoryCircuitBreaker(BreakerSettings settings, ESLogger logger, + HierarchyCircuitBreakerService parent, Name name) { + this(settings, null, logger, parent, name); + } + + /** + * Create a circuit breaker that will break if the number of estimated + * bytes grows above the limit. All estimations will be multiplied by + * the given overheadConstant. Uses the given oldBreaker to initialize + * the starting offset. + * @param settings settings to configure this breaker + * @param parent parent circuit breaker service to delegate tripped breakers to + * @param name the name of the breaker + * @param oldBreaker the previous circuit breaker to inherit the used value from (starting offset) + */ + public ChildMemoryCircuitBreaker(BreakerSettings settings, ChildMemoryCircuitBreaker oldBreaker, + ESLogger logger, HierarchyCircuitBreakerService parent, Name name) { + this.name = name; + this.settings = settings; + this.memoryBytesLimit = settings.getLimit(); + this.overheadConstant = settings.getOverhead(); + if (oldBreaker == null) { + this.used = new AtomicLong(0); + this.trippedCount = new AtomicLong(0); + } else { + this.used = oldBreaker.used; + this.trippedCount = oldBreaker.trippedCount; + } + this.logger = logger; + if (logger.isTraceEnabled()) { + logger.trace("creating ChildCircuitBreaker with settings {}", this.settings); + } + this.parent = parent; + } + + /** + * Method used to trip the breaker, delegates to the parent to determine + * whether to trip the breaker or not + */ + @Override + public void circuitBreak(String fieldName, long bytesNeeded) { + this.trippedCount.incrementAndGet(); + throw new CircuitBreakingException("[" + this.name + "] Data too large, data for [" + + fieldName + "] would be larger than limit of [" + + memoryBytesLimit + "/" + new ByteSizeValue(memoryBytesLimit) + "]", + bytesNeeded, this.memoryBytesLimit); + } + + /** + * Add a number of bytes, tripping the circuit breaker if the aggregated + * estimates are above the limit. Automatically trips the breaker if the + * memory limit is set to 0. Will never trip the breaker if the limit is + * set < 0, but can still be used to aggregate estimations. + * @param bytes number of bytes to add to the breaker + * @return number of "used" bytes so far + * @throws CircuitBreakingException + */ + @Override + public double addEstimateBytesAndMaybeBreak(long bytes, String label) throws CircuitBreakingException { + // short-circuit on no data allowed, immediately throwing an exception + if (memoryBytesLimit == 0) { + circuitBreak(label, bytes); + } + + long newUsed; + // If there is no limit (-1), we can optimize a bit by using + // .addAndGet() instead of looping (because we don't have to check a + // limit), which makes the RamAccountingTermsEnum case faster. + if (this.memoryBytesLimit == -1) { + newUsed = this.used.addAndGet(bytes); + if (logger.isTraceEnabled()) { + logger.trace("[{}] Adding [{}][{}] to used bytes [new used: [{}], limit: [-1b]]", + this.name, new ByteSizeValue(bytes), label, new ByteSizeValue(newUsed)); + } + } else { + // Otherwise, check the addition and commit the addition, looping if + // there are conflicts. May result in additional logging, but it's + // trace logging and shouldn't be counted on for additions. + long currentUsed; + do { + currentUsed = this.used.get(); + newUsed = currentUsed + bytes; + long newUsedWithOverhead = (long) (newUsed * overheadConstant); + if (logger.isTraceEnabled()) { + logger.trace("[{}] Adding [{}][{}] to used bytes [new used: [{}], limit: {} [{}], estimate: {} [{}]]", + this.name, + new ByteSizeValue(bytes), label, new ByteSizeValue(newUsed), + memoryBytesLimit, new ByteSizeValue(memoryBytesLimit), + newUsedWithOverhead, new ByteSizeValue(newUsedWithOverhead)); + } + if (memoryBytesLimit > 0 && newUsedWithOverhead > memoryBytesLimit) { + logger.error("[{}] New used memory {} [{}] from field [{}] would be larger than configured breaker: {} [{}], breaking", + this.name, + newUsedWithOverhead, new ByteSizeValue(newUsedWithOverhead), label, + memoryBytesLimit, new ByteSizeValue(memoryBytesLimit)); + circuitBreak(label, newUsedWithOverhead); + } + // Attempt to set the new used value, but make sure it hasn't changed + // underneath us, if it has, keep trying until we are able to set it + } while (!this.used.compareAndSet(currentUsed, newUsed)); + } + + // Additionally, we need to check that we haven't exceeded the parent's limit + try { + parent.checkParentLimit(label); + } catch (CircuitBreakingException e) { + // If the parent breaker is tripped, this breaker has to be + // adjusted back down because the allocation is "blocked" but the + // breaker has already been incremented + this.used.addAndGet(-bytes); + throw e; + } + return newUsed; + } + + /** + * Add an exact number of bytes, not checking for tripping the + * circuit breaker. This bypasses the overheadConstant multiplication. + * + * Also does not check with the parent breaker to see if the parent limit + * has been exceeded. + * + * @param bytes number of bytes to add to the breaker + * @return number of "used" bytes so far + */ + @Override + public long addWithoutBreaking(long bytes) { + long u = used.addAndGet(bytes); + if (logger.isTraceEnabled()) { + logger.trace("[{}] Adjusted breaker by [{}] bytes, now [{}]", this.name, bytes, u); + } + assert u >= 0 : "Used bytes: [" + u + "] must be >= 0"; + return u; + } + + /** + * @return the number of aggregated "used" bytes so far + */ + @Override + public long getUsed() { + return this.used.get(); + } + + /** + * @return the number of bytes that can be added before the breaker trips + */ + @Override + public long getLimit() { + return this.memoryBytesLimit; + } + + /** + * @return the constant multiplier the breaker uses for aggregations + */ + @Override + public double getOverhead() { + return this.overheadConstant; + } + + /** + * @return the number of times the breaker has been tripped + */ + @Override + public long getTrippedCount() { + return this.trippedCount.get(); + } + + /** + * @return the name of the breaker + */ + public Name getName() { + return this.name; + } +} diff --git a/src/main/java/org/elasticsearch/common/breaker/CircuitBreaker.java b/src/main/java/org/elasticsearch/common/breaker/CircuitBreaker.java new file mode 100644 index 0000000000000..541f6888a15dd --- /dev/null +++ b/src/main/java/org/elasticsearch/common/breaker/CircuitBreaker.java @@ -0,0 +1,116 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.breaker; + +import org.elasticsearch.ElasticsearchIllegalArgumentException; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; + +/** + * Interface for an object that can be incremented, breaking after some + * configured limit has been reached. + */ +public interface CircuitBreaker { + + /** + * Enum used for specifying different types of circuit breakers + */ + public static enum Name { + PARENT(0), + FIELDDATA(1), + REQUEST(2); + + private int ordinal; + + Name(int ordinal) { + this.ordinal = ordinal; + } + + public int getSerializableValue() { + return this.ordinal; + } + + public static Name readFrom(StreamInput in) throws IOException { + int value = in.readVInt(); + switch (value) { + case 0: + return Name.PARENT; + case 1: + return Name.FIELDDATA; + case 2: + return Name.REQUEST; + default: + throw new ElasticsearchIllegalArgumentException("No CircuitBreaker with ordinal: " + value); + } + } + + public static void writeTo(Name name, StreamOutput out) throws IOException { + out.writeVInt(name.getSerializableValue()); + } + } + + /** + * Trip the circuit breaker + * @param fieldName name of the field responsible for tripping the breaker + * @param bytesNeeded bytes asked for but unable to be allocated + */ + public void circuitBreak(String fieldName, long bytesNeeded); + + /** + * add bytes to the breaker and maybe trip + * @param bytes number of bytes to add + * @param label string label describing the bytes being added + * @return the number of "used" bytes for the circuit breaker + * @throws CircuitBreakingException + */ + public double addEstimateBytesAndMaybeBreak(long bytes, String label) throws CircuitBreakingException; + + /** + * Adjust the circuit breaker without tripping + */ + public long addWithoutBreaking(long bytes); + + /** + * @return the currently used bytes the breaker is tracking + */ + public long getUsed(); + + /** + * @return maximum number of bytes the circuit breaker can track before tripping + */ + public long getLimit(); + + /** + * @return overhead of circuit breaker + */ + public double getOverhead(); + + /** + * @return the number of times the circuit breaker has been tripped + */ + public long getTrippedCount(); + + /** + * @return the name of the breaker + */ + public Name getName(); +} diff --git a/src/main/java/org/elasticsearch/common/breaker/CircuitBreakingException.java b/src/main/java/org/elasticsearch/common/breaker/CircuitBreakingException.java index c2c2bf26d10f1..0b88a9dad7c93 100644 --- a/src/main/java/org/elasticsearch/common/breaker/CircuitBreakingException.java +++ b/src/main/java/org/elasticsearch/common/breaker/CircuitBreakingException.java @@ -25,8 +25,26 @@ */ public class CircuitBreakingException extends ElasticsearchException { - // TODO: maybe add more neat metrics here? + private final long bytesWanted; + private final long byteLimit; + public CircuitBreakingException(String message) { super(message); + this.bytesWanted = 0; + this.byteLimit = 0; + } + + public CircuitBreakingException(String message, long bytesWanted, long byteLimit) { + super(message); + this.bytesWanted = bytesWanted; + this.byteLimit = byteLimit; + } + + public long getBytesWanted() { + return this.bytesWanted; + } + + public long getByteLimit() { + return this.byteLimit; } } diff --git a/src/main/java/org/elasticsearch/common/breaker/MemoryCircuitBreaker.java b/src/main/java/org/elasticsearch/common/breaker/MemoryCircuitBreaker.java index 9e157c31d21ec..e22753a326a0f 100644 --- a/src/main/java/org/elasticsearch/common/breaker/MemoryCircuitBreaker.java +++ b/src/main/java/org/elasticsearch/common/breaker/MemoryCircuitBreaker.java @@ -27,7 +27,7 @@ * MemoryCircuitBreaker is a circuit breaker that breaks once a * configurable memory limit has been reached. */ -public class MemoryCircuitBreaker { +public class MemoryCircuitBreaker implements CircuitBreaker { private final long memoryBytesLimit; private final double overheadConstant; @@ -77,7 +77,7 @@ public MemoryCircuitBreaker(ByteSizeValue limit, double overheadConstant, Memory * Method used to trip the breaker * @throws CircuitBreakingException */ - public void circuitBreak(String fieldName) throws CircuitBreakingException { + public void circuitBreak(String fieldName, long bytesNeeded) throws CircuitBreakingException { this.trippedCount.incrementAndGet(); throw new CircuitBreakingException("Data too large, data for field [" + fieldName + "] would be larger than limit of [" + memoryBytesLimit + "/" + new ByteSizeValue(memoryBytesLimit) + "]"); @@ -92,10 +92,10 @@ public void circuitBreak(String fieldName) throws CircuitBreakingException { * @return number of "used" bytes so far * @throws CircuitBreakingException */ - public double addEstimateBytesAndMaybeBreak(long bytes, String fieldName) throws CircuitBreakingException { + public double addEstimateBytesAndMaybeBreak(long bytes, String label) throws CircuitBreakingException { // short-circuit on no data allowed, immediately throwing an exception if (memoryBytesLimit == 0) { - circuitBreak(fieldName); + circuitBreak(label, bytes); } long newUsed; @@ -106,7 +106,7 @@ public double addEstimateBytesAndMaybeBreak(long bytes, String fieldName) throws newUsed = this.used.addAndGet(bytes); if (logger.isTraceEnabled()) { logger.trace("Adding [{}][{}] to used bytes [new used: [{}], limit: [-1b]]", - new ByteSizeValue(bytes), fieldName, new ByteSizeValue(newUsed)); + new ByteSizeValue(bytes), label, new ByteSizeValue(newUsed)); } return newUsed; } @@ -121,15 +121,15 @@ public double addEstimateBytesAndMaybeBreak(long bytes, String fieldName) throws long newUsedWithOverhead = (long)(newUsed * overheadConstant); if (logger.isTraceEnabled()) { logger.trace("Adding [{}][{}] to used bytes [new used: [{}], limit: {} [{}], estimate: {} [{}]]", - new ByteSizeValue(bytes), fieldName, new ByteSizeValue(newUsed), + new ByteSizeValue(bytes), label, new ByteSizeValue(newUsed), memoryBytesLimit, new ByteSizeValue(memoryBytesLimit), newUsedWithOverhead, new ByteSizeValue(newUsedWithOverhead)); } if (memoryBytesLimit > 0 && newUsedWithOverhead > memoryBytesLimit) { logger.error("New used memory {} [{}] from field [{}] would be larger than configured breaker: {} [{}], breaking", - newUsedWithOverhead, new ByteSizeValue(newUsedWithOverhead), fieldName, + newUsedWithOverhead, new ByteSizeValue(newUsedWithOverhead), label, memoryBytesLimit, new ByteSizeValue(memoryBytesLimit)); - circuitBreak(fieldName); + circuitBreak(label, newUsedWithOverhead); } // Attempt to set the new used value, but make sure it hasn't changed // underneath us, if it has, keep trying until we are able to set it @@ -161,9 +161,9 @@ public long getUsed() { } /** - * @return the maximum number of bytes before the circuit breaker will trip + * @return the number of bytes that can be added before the breaker trips */ - public long getMaximum() { + public long getLimit() { return this.memoryBytesLimit; } @@ -180,4 +180,11 @@ public double getOverhead() { public long getTrippedCount() { return this.trippedCount.get(); } + + /** + * @return the name of the breaker + */ + public Name getName() { + return Name.FIELDDATA; + } } diff --git a/src/main/java/org/elasticsearch/common/unit/MemorySizeValue.java b/src/main/java/org/elasticsearch/common/unit/MemorySizeValue.java index 81a293459f29b..2204fe8cac54e 100644 --- a/src/main/java/org/elasticsearch/common/unit/MemorySizeValue.java +++ b/src/main/java/org/elasticsearch/common/unit/MemorySizeValue.java @@ -32,7 +32,7 @@ public enum MemorySizeValue { * 42 (default assumed unit is byte) or 2mb, or percentages of the heap size: if * the heap is 1G, 10% will be parsed as 100mb. */ public static ByteSizeValue parseBytesSizeValueOrHeapRatio(String sValue) { - if (sValue.endsWith("%")) { + if (sValue != null && sValue.endsWith("%")) { final String percentAsString = sValue.substring(0, sValue.length() - 1); try { final double percent = Double.parseDouble(percentAsString); diff --git a/src/main/java/org/elasticsearch/common/util/AbstractArray.java b/src/main/java/org/elasticsearch/common/util/AbstractArray.java index 7de3c1c3b7448..0c00a897333a5 100644 --- a/src/main/java/org/elasticsearch/common/util/AbstractArray.java +++ b/src/main/java/org/elasticsearch/common/util/AbstractArray.java @@ -33,7 +33,7 @@ abstract class AbstractArray implements BigArray { @Override public final void close() { - bigArrays.ramBytesUsed.addAndGet(-ramBytesUsed()); + bigArrays.adjustBreaker(-ramBytesUsed()); assert !released : "double release"; released = true; doClose(); diff --git a/src/main/java/org/elasticsearch/common/util/BigArrays.java b/src/main/java/org/elasticsearch/common/util/BigArrays.java index 5f362368e5ec3..a2f216361c419 100644 --- a/src/main/java/org/elasticsearch/common/util/BigArrays.java +++ b/src/main/java/org/elasticsearch/common/util/BigArrays.java @@ -23,8 +23,10 @@ import org.apache.lucene.util.ArrayUtil; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.RamUsageEstimator; -import org.elasticsearch.ElasticsearchIllegalStateException; import org.elasticsearch.cache.recycler.PageCacheRecycler; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.breaker.CircuitBreakingException; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.lease.Releasable; @@ -32,6 +34,7 @@ import org.elasticsearch.common.recycler.Recycler; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.indices.breaker.CircuitBreakerService; import java.util.Arrays; import java.util.concurrent.atomic.AtomicLong; @@ -39,9 +42,8 @@ /** Utility class to work with arrays. */ public class BigArrays extends AbstractComponent { - // TODO: switch to a circuit breaker that is shared not only on big arrays level, and applies to other request level data structures public static final String MAX_SIZE_IN_BYTES_SETTING = "requests.memory.breaker.limit"; - public static final BigArrays NON_RECYCLING_INSTANCE = new BigArrays(ImmutableSettings.EMPTY, null, Long.MAX_VALUE); + public static final BigArrays NON_RECYCLING_INSTANCE = new BigArrays(ImmutableSettings.EMPTY, null, null); /** Page size in bytes: 16KB */ public static final int PAGE_SIZE_IN_BYTES = 1 << 14; @@ -363,39 +365,73 @@ public T set(long index, T value) { } final PageCacheRecycler recycler; - final AtomicLong ramBytesUsed; - final long maxSizeInBytes; + final CircuitBreakerService breakerService; + final boolean checkBreaker; @Inject - public BigArrays(Settings settings, PageCacheRecycler recycler) { - this(settings, recycler, settings.getAsMemory(MAX_SIZE_IN_BYTES_SETTING, Long.toString(Long.MAX_VALUE)).bytes()); + public BigArrays(Settings settings, PageCacheRecycler recycler, @Nullable final CircuitBreakerService breakerService) { + // Checking the breaker is disabled if not specified + this(settings, recycler, breakerService, false); } - private BigArrays(Settings settings, PageCacheRecycler recycler, final long maxSizeInBytes) { + public BigArrays(Settings settings, PageCacheRecycler recycler, @Nullable final CircuitBreakerService breakerService, boolean checkBreaker) { super(settings); - this.maxSizeInBytes = maxSizeInBytes; + this.checkBreaker = checkBreaker; this.recycler = recycler; - ramBytesUsed = new AtomicLong(); + this.breakerService = breakerService; } - private void validate(long delta) { - final long totalSizeInBytes = ramBytesUsed.addAndGet(delta); - if (totalSizeInBytes > maxSizeInBytes) { - throw new ElasticsearchIllegalStateException("Maximum number of bytes allocated exceeded: [" + totalSizeInBytes + "] (> " + maxSizeInBytes + ")"); + /** + * Adjust the circuit breaker with the given delta, if the delta is + * negative, or checkBreaker is false, the breaker will be adjusted + * without tripping + */ + void adjustBreaker(long delta) { + if (this.breakerService != null) { + CircuitBreaker breaker = this.breakerService.getBreaker(CircuitBreaker.Name.REQUEST); + if (this.checkBreaker == true) { + // checking breaker means potentially tripping, but it doesn't + // have to if the delta is negative + if (delta > 0) { + try { + breaker.addEstimateBytesAndMaybeBreak(delta, ""); + } catch (CircuitBreakingException e) { + // since we've already created the data, we need to + // add it so closing the stream re-adjusts properly + breaker.addWithoutBreaking(delta); + // re-throw the original exception + throw e; + } + } else { + breaker.addWithoutBreaking(delta); + } + } else { + // even if we are not checking the breaker, we need to adjust + // its' totals, so add without breaking + breaker.addWithoutBreaking(delta); + } } } + /** + * Return a new instance of this BigArrays class with circuit breaking + * explicitly enabled, instead of only accounting enabled + */ + public BigArrays withCircuitBreaking() { + return new BigArrays(this.settings, this.recycler, this.breakerService, true); + } + private T resizeInPlace(T array, long newSize) { final long oldMemSize = array.ramBytesUsed(); array.resize(newSize); - validate(array.ramBytesUsed() - oldMemSize); + adjustBreaker(array.ramBytesUsed() - oldMemSize); return array; } private T validate(T array) { boolean success = false; try { - validate(array.ramBytesUsed()); + adjustBreaker(array.ramBytesUsed()); success = true; } finally { if (!success) { @@ -720,11 +756,4 @@ public ObjectArray grow(ObjectArray array, long minSize) { final long newSize = overSize(minSize, OBJECT_PAGE_SIZE, RamUsageEstimator.NUM_BYTES_OBJECT_REF); return resize(array, newSize); } - - /** - * Return an approximate number of bytes that have been allocated but not released yet. - */ - public long sizeInBytes() { - return ramBytesUsed.get(); - } } diff --git a/src/main/java/org/elasticsearch/index/fielddata/IndexFieldData.java b/src/main/java/org/elasticsearch/index/fielddata/IndexFieldData.java index 62faf5abce1e1..d8b925f735443 100644 --- a/src/main/java/org/elasticsearch/index/fielddata/IndexFieldData.java +++ b/src/main/java/org/elasticsearch/index/fielddata/IndexFieldData.java @@ -33,7 +33,7 @@ import org.elasticsearch.index.mapper.FieldMapper; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.settings.IndexSettings; -import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService; +import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.search.MultiValueMode; import java.io.IOException; diff --git a/src/main/java/org/elasticsearch/index/fielddata/IndexFieldDataService.java b/src/main/java/org/elasticsearch/index/fielddata/IndexFieldDataService.java index 7e7ba5158b0bd..105d1676466cd 100644 --- a/src/main/java/org/elasticsearch/index/fielddata/IndexFieldDataService.java +++ b/src/main/java/org/elasticsearch/index/fielddata/IndexFieldDataService.java @@ -38,11 +38,11 @@ import org.elasticsearch.index.mapper.internal.ParentFieldMapper; import org.elasticsearch.index.service.IndexService; import org.elasticsearch.index.settings.IndexSettings; -import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService; -import org.elasticsearch.indices.fielddata.breaker.NoneCircuitBreakerService; import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache; import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCacheListener; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.indices.breaker.CircuitBreakerService; +import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import java.util.ArrayList; import java.util.Collection; diff --git a/src/main/java/org/elasticsearch/index/fielddata/RamAccountingTermsEnum.java b/src/main/java/org/elasticsearch/index/fielddata/RamAccountingTermsEnum.java index 01a91879bd7f3..15aa961294c12 100644 --- a/src/main/java/org/elasticsearch/index/fielddata/RamAccountingTermsEnum.java +++ b/src/main/java/org/elasticsearch/index/fielddata/RamAccountingTermsEnum.java @@ -21,8 +21,8 @@ import org.apache.lucene.index.FilteredTermsEnum; import org.apache.lucene.index.TermsEnum; import org.apache.lucene.util.BytesRef; -import org.elasticsearch.common.breaker.MemoryCircuitBreaker; import org.elasticsearch.index.fielddata.plain.AbstractIndexFieldData; +import org.elasticsearch.common.breaker.CircuitBreaker; import java.io.IOException; @@ -36,7 +36,7 @@ public final class RamAccountingTermsEnum extends FilteredTermsEnum { // Flush every 5mb private static final long FLUSH_BUFFER_SIZE = 1024 * 1024 * 5; - private final MemoryCircuitBreaker breaker; + private final CircuitBreaker breaker; private final TermsEnum termsEnum; private final AbstractIndexFieldData.PerValueEstimator estimator; private final String fieldName; @@ -44,7 +44,7 @@ public final class RamAccountingTermsEnum extends FilteredTermsEnum { private long flushBuffer; - public RamAccountingTermsEnum(TermsEnum termsEnum, MemoryCircuitBreaker breaker, AbstractIndexFieldData.PerValueEstimator estimator, + public RamAccountingTermsEnum(TermsEnum termsEnum, CircuitBreaker breaker, AbstractIndexFieldData.PerValueEstimator estimator, String fieldName) { super(termsEnum); this.breaker = breaker; diff --git a/src/main/java/org/elasticsearch/index/fielddata/ordinals/GlobalOrdinalsBuilder.java b/src/main/java/org/elasticsearch/index/fielddata/ordinals/GlobalOrdinalsBuilder.java index 7c47c6ed15e5a..41f630371d216 100644 --- a/src/main/java/org/elasticsearch/index/fielddata/ordinals/GlobalOrdinalsBuilder.java +++ b/src/main/java/org/elasticsearch/index/fielddata/ordinals/GlobalOrdinalsBuilder.java @@ -23,11 +23,12 @@ import org.apache.lucene.index.RandomAccessOrds; import org.apache.lucene.index.XOrdinalMap; import org.apache.lucene.util.packed.PackedInts; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.fielddata.AtomicOrdinalsFieldData; import org.elasticsearch.index.fielddata.IndexOrdinalsFieldData; -import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService; +import org.elasticsearch.indices.breaker.CircuitBreakerService; import java.io.IOException; @@ -52,7 +53,7 @@ public static IndexOrdinalsFieldData build(final IndexReader indexReader, IndexO } final XOrdinalMap ordinalMap = XOrdinalMap.build(null, subs, PackedInts.DEFAULT); final long memorySizeInBytes = ordinalMap.ramBytesUsed(); - breakerService.getBreaker().addWithoutBreaking(memorySizeInBytes); + breakerService.getBreaker(CircuitBreaker.Name.FIELDDATA).addWithoutBreaking(memorySizeInBytes); if (logger.isDebugEnabled()) { logger.debug( diff --git a/src/main/java/org/elasticsearch/index/fielddata/plain/AbstractIndexOrdinalsFieldData.java b/src/main/java/org/elasticsearch/index/fielddata/plain/AbstractIndexOrdinalsFieldData.java index d3abf187ffd49..93547810e9dba 100644 --- a/src/main/java/org/elasticsearch/index/fielddata/plain/AbstractIndexOrdinalsFieldData.java +++ b/src/main/java/org/elasticsearch/index/fielddata/plain/AbstractIndexOrdinalsFieldData.java @@ -31,8 +31,8 @@ import org.elasticsearch.index.fielddata.fieldcomparator.BytesRefFieldComparatorSource; import org.elasticsearch.index.fielddata.ordinals.GlobalOrdinalsBuilder; import org.elasticsearch.index.mapper.FieldMapper.Names; -import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService; import org.elasticsearch.search.MultiValueMode; +import org.elasticsearch.indices.breaker.CircuitBreakerService; import java.io.IOException; import java.util.Map; diff --git a/src/main/java/org/elasticsearch/index/fielddata/plain/BytesBinaryDVIndexFieldData.java b/src/main/java/org/elasticsearch/index/fielddata/plain/BytesBinaryDVIndexFieldData.java index 1098f3c917a14..3864e72b62338 100644 --- a/src/main/java/org/elasticsearch/index/fielddata/plain/BytesBinaryDVIndexFieldData.java +++ b/src/main/java/org/elasticsearch/index/fielddata/plain/BytesBinaryDVIndexFieldData.java @@ -33,7 +33,7 @@ import org.elasticsearch.index.mapper.FieldMapper; import org.elasticsearch.index.mapper.FieldMapper.Names; import org.elasticsearch.index.mapper.MapperService; -import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService; +import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.search.MultiValueMode; import java.io.IOException; diff --git a/src/main/java/org/elasticsearch/index/fielddata/plain/DisabledIndexFieldData.java b/src/main/java/org/elasticsearch/index/fielddata/plain/DisabledIndexFieldData.java index 51da7ef7c1248..8ad8c5cfc5751 100644 --- a/src/main/java/org/elasticsearch/index/fielddata/plain/DisabledIndexFieldData.java +++ b/src/main/java/org/elasticsearch/index/fielddata/plain/DisabledIndexFieldData.java @@ -29,8 +29,8 @@ import org.elasticsearch.index.mapper.FieldMapper.Names; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.settings.IndexSettings; -import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService; import org.elasticsearch.search.MultiValueMode; +import org.elasticsearch.indices.breaker.CircuitBreakerService; /** * A field data implementation that forbids loading and will throw an {@link org.elasticsearch.ElasticsearchIllegalStateException} if you try to load diff --git a/src/main/java/org/elasticsearch/index/fielddata/plain/DocValuesIndexFieldData.java b/src/main/java/org/elasticsearch/index/fielddata/plain/DocValuesIndexFieldData.java index 0605b0e909e36..ec3cb13659abd 100644 --- a/src/main/java/org/elasticsearch/index/fielddata/plain/DocValuesIndexFieldData.java +++ b/src/main/java/org/elasticsearch/index/fielddata/plain/DocValuesIndexFieldData.java @@ -37,7 +37,7 @@ import org.elasticsearch.index.mapper.internal.IdFieldMapper; import org.elasticsearch.index.mapper.internal.TimestampFieldMapper; import org.elasticsearch.index.mapper.internal.UidFieldMapper; -import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService; +import org.elasticsearch.indices.breaker.CircuitBreakerService; import java.util.Map; import java.util.Set; diff --git a/src/main/java/org/elasticsearch/index/fielddata/plain/DoubleArrayIndexFieldData.java b/src/main/java/org/elasticsearch/index/fielddata/plain/DoubleArrayIndexFieldData.java index 327cba1420bca..3cf762db2db25 100644 --- a/src/main/java/org/elasticsearch/index/fielddata/plain/DoubleArrayIndexFieldData.java +++ b/src/main/java/org/elasticsearch/index/fielddata/plain/DoubleArrayIndexFieldData.java @@ -22,6 +22,7 @@ import org.apache.lucene.index.*; import org.apache.lucene.util.*; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.DoubleArray; @@ -34,7 +35,7 @@ import org.elasticsearch.index.mapper.FieldMapper; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.settings.IndexSettings; -import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService; +import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.search.MultiValueMode; /** @@ -70,7 +71,7 @@ public AtomicNumericFieldData loadDirect(AtomicReaderContext context) throws Exc Terms terms = reader.terms(getFieldNames().indexName()); AtomicNumericFieldData data = null; // TODO: Use an actual estimator to estimate before loading. - NonEstimatingEstimator estimator = new NonEstimatingEstimator(breakerService.getBreaker()); + NonEstimatingEstimator estimator = new NonEstimatingEstimator(breakerService.getBreaker(CircuitBreaker.Name.FIELDDATA)); if (terms == null) { data = AtomicDoubleFieldData.empty(reader.maxDoc()); estimator.afterLoad(null, data.ramBytesUsed()); diff --git a/src/main/java/org/elasticsearch/index/fielddata/plain/FSTBytesIndexFieldData.java b/src/main/java/org/elasticsearch/index/fielddata/plain/FSTBytesIndexFieldData.java index 5aa8e371629ef..85cd1e2c14f5c 100644 --- a/src/main/java/org/elasticsearch/index/fielddata/plain/FSTBytesIndexFieldData.java +++ b/src/main/java/org/elasticsearch/index/fielddata/plain/FSTBytesIndexFieldData.java @@ -25,6 +25,7 @@ import org.apache.lucene.util.fst.FST.INPUT_TYPE; import org.apache.lucene.util.fst.PositiveIntOutputs; import org.apache.lucene.util.fst.Util; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.Index; import org.elasticsearch.index.fielddata.*; @@ -33,7 +34,7 @@ import org.elasticsearch.index.mapper.FieldMapper; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.settings.IndexSettings; -import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService; +import org.elasticsearch.indices.breaker.CircuitBreakerService; /** */ @@ -63,7 +64,7 @@ public AtomicOrdinalsFieldData loadDirect(AtomicReaderContext context) throws Ex Terms terms = reader.terms(getFieldNames().indexName()); AtomicOrdinalsFieldData data = null; // TODO: Use an actual estimator to estimate before loading. - NonEstimatingEstimator estimator = new NonEstimatingEstimator(breakerService.getBreaker()); + NonEstimatingEstimator estimator = new NonEstimatingEstimator(breakerService.getBreaker(CircuitBreaker.Name.FIELDDATA)); if (terms == null) { data = AbstractAtomicOrdinalsFieldData.empty(); estimator.afterLoad(null, data.ramBytesUsed()); diff --git a/src/main/java/org/elasticsearch/index/fielddata/plain/FloatArrayIndexFieldData.java b/src/main/java/org/elasticsearch/index/fielddata/plain/FloatArrayIndexFieldData.java index 45415e796547a..3c0030408fec6 100644 --- a/src/main/java/org/elasticsearch/index/fielddata/plain/FloatArrayIndexFieldData.java +++ b/src/main/java/org/elasticsearch/index/fielddata/plain/FloatArrayIndexFieldData.java @@ -21,6 +21,7 @@ import org.apache.lucene.index.*; import org.apache.lucene.util.*; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.FloatArray; @@ -33,7 +34,7 @@ import org.elasticsearch.index.mapper.FieldMapper; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.settings.IndexSettings; -import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService; +import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.search.MultiValueMode; /** @@ -68,7 +69,7 @@ public AtomicNumericFieldData loadDirect(AtomicReaderContext context) throws Exc Terms terms = reader.terms(getFieldNames().indexName()); AtomicNumericFieldData data = null; // TODO: Use an actual estimator to estimate before loading. - NonEstimatingEstimator estimator = new NonEstimatingEstimator(breakerService.getBreaker()); + NonEstimatingEstimator estimator = new NonEstimatingEstimator(breakerService.getBreaker(CircuitBreaker.Name.FIELDDATA)); if (terms == null) { data = AtomicDoubleFieldData.empty(reader.maxDoc()); estimator.afterLoad(null, data.ramBytesUsed()); diff --git a/src/main/java/org/elasticsearch/index/fielddata/plain/GeoPointBinaryDVIndexFieldData.java b/src/main/java/org/elasticsearch/index/fielddata/plain/GeoPointBinaryDVIndexFieldData.java index 4165c0d8cb7a0..7f775ab13a1b8 100644 --- a/src/main/java/org/elasticsearch/index/fielddata/plain/GeoPointBinaryDVIndexFieldData.java +++ b/src/main/java/org/elasticsearch/index/fielddata/plain/GeoPointBinaryDVIndexFieldData.java @@ -31,7 +31,7 @@ import org.elasticsearch.index.mapper.FieldMapper; import org.elasticsearch.index.mapper.FieldMapper.Names; import org.elasticsearch.index.mapper.MapperService; -import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService; +import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.search.MultiValueMode; import java.io.IOException; diff --git a/src/main/java/org/elasticsearch/index/fielddata/plain/GeoPointCompressedIndexFieldData.java b/src/main/java/org/elasticsearch/index/fielddata/plain/GeoPointCompressedIndexFieldData.java index aedaac01bdbdd..c33c74c38ab60 100644 --- a/src/main/java/org/elasticsearch/index/fielddata/plain/GeoPointCompressedIndexFieldData.java +++ b/src/main/java/org/elasticsearch/index/fielddata/plain/GeoPointCompressedIndexFieldData.java @@ -25,6 +25,7 @@ import org.apache.lucene.util.FixedBitSet; import org.apache.lucene.util.packed.PackedInts; import org.apache.lucene.util.packed.PagedMutable; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.geo.GeoPoint; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.DistanceUnit; @@ -38,7 +39,7 @@ import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.geo.GeoPointFieldMapper; import org.elasticsearch.index.settings.IndexSettings; -import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService; +import org.elasticsearch.indices.breaker.CircuitBreakerService; /** */ @@ -82,7 +83,7 @@ public AtomicGeoPointFieldData loadDirect(AtomicReaderContext context) throws Ex Terms terms = reader.terms(getFieldNames().indexName()); AtomicGeoPointFieldData data = null; // TODO: Use an actual estimator to estimate before loading. - NonEstimatingEstimator estimator = new NonEstimatingEstimator(breakerService.getBreaker()); + NonEstimatingEstimator estimator = new NonEstimatingEstimator(breakerService.getBreaker(CircuitBreaker.Name.FIELDDATA)); if (terms == null) { data = AbstractAtomicGeoPointFieldData.empty(reader.maxDoc()); estimator.afterLoad(null, data.ramBytesUsed()); diff --git a/src/main/java/org/elasticsearch/index/fielddata/plain/GeoPointDoubleArrayIndexFieldData.java b/src/main/java/org/elasticsearch/index/fielddata/plain/GeoPointDoubleArrayIndexFieldData.java index fb2936c49d80a..e1684d65d6d06 100644 --- a/src/main/java/org/elasticsearch/index/fielddata/plain/GeoPointDoubleArrayIndexFieldData.java +++ b/src/main/java/org/elasticsearch/index/fielddata/plain/GeoPointDoubleArrayIndexFieldData.java @@ -23,6 +23,7 @@ import org.apache.lucene.index.RandomAccessOrds; import org.apache.lucene.index.Terms; import org.apache.lucene.util.FixedBitSet; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.geo.GeoPoint; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; @@ -34,7 +35,7 @@ import org.elasticsearch.index.mapper.FieldMapper; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.settings.IndexSettings; -import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService; +import org.elasticsearch.indices.breaker.CircuitBreakerService; /** */ @@ -64,7 +65,7 @@ public AtomicGeoPointFieldData loadDirect(AtomicReaderContext context) throws Ex Terms terms = reader.terms(getFieldNames().indexName()); AtomicGeoPointFieldData data = null; // TODO: Use an actual estimator to estimate before loading. - NonEstimatingEstimator estimator = new NonEstimatingEstimator(breakerService.getBreaker()); + NonEstimatingEstimator estimator = new NonEstimatingEstimator(breakerService.getBreaker(CircuitBreaker.Name.FIELDDATA)); if (terms == null) { data = AbstractAtomicGeoPointFieldData.empty(reader.maxDoc()); estimator.afterLoad(null, data.ramBytesUsed()); diff --git a/src/main/java/org/elasticsearch/index/fielddata/plain/IndexIndexFieldData.java b/src/main/java/org/elasticsearch/index/fielddata/plain/IndexIndexFieldData.java index 3850d7920c585..ab875c62ab412 100644 --- a/src/main/java/org/elasticsearch/index/fielddata/plain/IndexIndexFieldData.java +++ b/src/main/java/org/elasticsearch/index/fielddata/plain/IndexIndexFieldData.java @@ -27,7 +27,7 @@ import org.elasticsearch.index.fielddata.*; import org.elasticsearch.index.mapper.FieldMapper; import org.elasticsearch.index.mapper.MapperService; -import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService; +import org.elasticsearch.indices.breaker.CircuitBreakerService; public class IndexIndexFieldData extends AbstractIndexOrdinalsFieldData { diff --git a/src/main/java/org/elasticsearch/index/fielddata/plain/NonEstimatingEstimator.java b/src/main/java/org/elasticsearch/index/fielddata/plain/NonEstimatingEstimator.java index 5b35aa70eb1f8..481b2b3c84ca0 100644 --- a/src/main/java/org/elasticsearch/index/fielddata/plain/NonEstimatingEstimator.java +++ b/src/main/java/org/elasticsearch/index/fielddata/plain/NonEstimatingEstimator.java @@ -22,7 +22,7 @@ import org.apache.lucene.index.TermsEnum; import org.apache.lucene.util.BytesRef; import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.breaker.MemoryCircuitBreaker; +import org.elasticsearch.common.breaker.CircuitBreaker; import java.io.IOException; @@ -33,9 +33,9 @@ */ public class NonEstimatingEstimator implements AbstractIndexFieldData.PerValueEstimator { - private final MemoryCircuitBreaker breaker; + private final CircuitBreaker breaker; - NonEstimatingEstimator(MemoryCircuitBreaker breaker) { + NonEstimatingEstimator(CircuitBreaker breaker) { this.breaker = breaker; } diff --git a/src/main/java/org/elasticsearch/index/fielddata/plain/PackedArrayIndexFieldData.java b/src/main/java/org/elasticsearch/index/fielddata/plain/PackedArrayIndexFieldData.java index 971b3a0f933a3..2cffa02a2cb4c 100644 --- a/src/main/java/org/elasticsearch/index/fielddata/plain/PackedArrayIndexFieldData.java +++ b/src/main/java/org/elasticsearch/index/fielddata/plain/PackedArrayIndexFieldData.java @@ -27,7 +27,7 @@ import org.apache.lucene.util.packed.PackedInts; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.breaker.MemoryCircuitBreaker; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.Index; import org.elasticsearch.index.fielddata.*; @@ -38,7 +38,7 @@ import org.elasticsearch.index.mapper.FieldMapper; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.settings.IndexSettings; -import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService; +import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.search.MultiValueMode; import java.io.IOException; @@ -88,7 +88,7 @@ public AtomicNumericFieldData loadDirect(AtomicReaderContext context) throws Exc final AtomicReader reader = context.reader(); Terms terms = reader.terms(getFieldNames().indexName()); AtomicNumericFieldData data = null; - PackedArrayEstimator estimator = new PackedArrayEstimator(breakerService.getBreaker(), getNumericType(), getFieldNames().fullName()); + PackedArrayEstimator estimator = new PackedArrayEstimator(breakerService.getBreaker(CircuitBreaker.Name.FIELDDATA), getNumericType(), getFieldNames().fullName()); if (terms == null) { data = AtomicLongFieldData.empty(reader.maxDoc()); estimator.adjustForNoTerms(data.ramBytesUsed()); @@ -355,11 +355,11 @@ public XFieldComparatorSource comparatorSource(@Nullable Object missingValue, Mu */ public class PackedArrayEstimator implements PerValueEstimator { - private final MemoryCircuitBreaker breaker; + private final CircuitBreaker breaker; private final NumericType type; private final String fieldName; - public PackedArrayEstimator(MemoryCircuitBreaker breaker, NumericType type, String fieldName) { + public PackedArrayEstimator(CircuitBreaker breaker, NumericType type, String fieldName) { this.breaker = breaker; this.type = type; this.fieldName = fieldName; diff --git a/src/main/java/org/elasticsearch/index/fielddata/plain/PagedBytesIndexFieldData.java b/src/main/java/org/elasticsearch/index/fielddata/plain/PagedBytesIndexFieldData.java index d6d3e051a5fca..fbfda4af76501 100644 --- a/src/main/java/org/elasticsearch/index/fielddata/plain/PagedBytesIndexFieldData.java +++ b/src/main/java/org/elasticsearch/index/fielddata/plain/PagedBytesIndexFieldData.java @@ -24,7 +24,7 @@ import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.PagedBytes; import org.apache.lucene.util.packed.MonotonicAppendingLongBuffer; -import org.elasticsearch.common.breaker.MemoryCircuitBreaker; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.Index; import org.elasticsearch.index.fielddata.*; @@ -33,7 +33,7 @@ import org.elasticsearch.index.mapper.FieldMapper; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.settings.IndexSettings; -import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService; +import org.elasticsearch.indices.breaker.CircuitBreakerService; import java.io.IOException; @@ -61,7 +61,7 @@ public AtomicOrdinalsFieldData loadDirect(AtomicReaderContext context) throws Ex AtomicReader reader = context.reader(); AtomicOrdinalsFieldData data = null; - PagedBytesEstimator estimator = new PagedBytesEstimator(context, breakerService.getBreaker(), getFieldNames().fullName()); + PagedBytesEstimator estimator = new PagedBytesEstimator(context, breakerService.getBreaker(CircuitBreaker.Name.FIELDDATA), getFieldNames().fullName()); Terms terms = reader.terms(getFieldNames().indexName()); if (terms == null) { data = AbstractAtomicOrdinalsFieldData.empty(); @@ -125,11 +125,11 @@ public AtomicOrdinalsFieldData loadDirect(AtomicReaderContext context) throws Ex public class PagedBytesEstimator implements PerValueEstimator { private final AtomicReaderContext context; - private final MemoryCircuitBreaker breaker; + private final CircuitBreaker breaker; private final String fieldName; private long estimatedBytes; - PagedBytesEstimator(AtomicReaderContext context, MemoryCircuitBreaker breaker, String fieldName) { + PagedBytesEstimator(AtomicReaderContext context, CircuitBreaker breaker, String fieldName) { this.breaker = breaker; this.context = context; this.fieldName = fieldName; diff --git a/src/main/java/org/elasticsearch/index/fielddata/plain/ParentChildIndexFieldData.java b/src/main/java/org/elasticsearch/index/fielddata/plain/ParentChildIndexFieldData.java index 1b3f30e0bc64d..5e52b5a438f75 100644 --- a/src/main/java/org/elasticsearch/index/fielddata/plain/ParentChildIndexFieldData.java +++ b/src/main/java/org/elasticsearch/index/fielddata/plain/ParentChildIndexFieldData.java @@ -32,7 +32,7 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchIllegalStateException; import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.breaker.MemoryCircuitBreaker; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; @@ -47,7 +47,7 @@ import org.elasticsearch.index.mapper.internal.ParentFieldMapper; import org.elasticsearch.index.mapper.internal.UidFieldMapper; import org.elasticsearch.index.settings.IndexSettings; -import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService; +import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.search.MultiValueMode; import java.io.IOException; @@ -101,7 +101,7 @@ public ParentChildAtomicFieldData loadDirect(AtomicReaderContext context) throws new ParentChildIntersectTermsEnum(reader, UidFieldMapper.NAME, ParentFieldMapper.NAME), parentTypes ); - ParentChildEstimator estimator = new ParentChildEstimator(breakerService.getBreaker(), termsEnum); + ParentChildEstimator estimator = new ParentChildEstimator(breakerService.getBreaker(CircuitBreaker.Name.FIELDDATA), termsEnum); TermsEnum estimatedTermsEnum = estimator.beforeLoad(null); ObjectObjectOpenHashMap typeBuilders = ObjectObjectOpenHashMap.newInstance(); try { @@ -210,13 +210,13 @@ public IndexFieldData build(Index index, @IndexSettings Settings indexSetting */ public class ParentChildEstimator implements PerValueEstimator { - private final MemoryCircuitBreaker breaker; + private final CircuitBreaker breaker; private final TermsEnum filteredEnum; // The TermsEnum is passed in here instead of being generated in the // beforeLoad() function since it's filtered inside the previous // TermsEnum wrappers - public ParentChildEstimator(MemoryCircuitBreaker breaker, TermsEnum filteredEnum) { + public ParentChildEstimator(CircuitBreaker breaker, TermsEnum filteredEnum) { this.breaker = breaker; this.filteredEnum = filteredEnum; } @@ -337,7 +337,7 @@ public int getOrd(int docID) { } } - breakerService.getBreaker().addWithoutBreaking(ramBytesUsed); + breakerService.getBreaker(CircuitBreaker.Name.FIELDDATA).addWithoutBreaking(ramBytesUsed); if (logger.isDebugEnabled()) { logger.debug( "Global-ordinals[_parent] took {}", diff --git a/src/main/java/org/elasticsearch/index/fielddata/plain/SortedSetDVOrdinalsIndexFieldData.java b/src/main/java/org/elasticsearch/index/fielddata/plain/SortedSetDVOrdinalsIndexFieldData.java index 3a1b85251a3d8..bf615b746a42f 100644 --- a/src/main/java/org/elasticsearch/index/fielddata/plain/SortedSetDVOrdinalsIndexFieldData.java +++ b/src/main/java/org/elasticsearch/index/fielddata/plain/SortedSetDVOrdinalsIndexFieldData.java @@ -29,8 +29,8 @@ import org.elasticsearch.index.fielddata.fieldcomparator.BytesRefFieldComparatorSource; import org.elasticsearch.index.fielddata.ordinals.GlobalOrdinalsBuilder; import org.elasticsearch.index.mapper.FieldMapper.Names; -import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService; import org.elasticsearch.search.MultiValueMode; +import org.elasticsearch.indices.breaker.CircuitBreakerService; public class SortedSetDVOrdinalsIndexFieldData extends DocValuesIndexFieldData implements IndexOrdinalsFieldData { diff --git a/src/main/java/org/elasticsearch/indices/IndicesModule.java b/src/main/java/org/elasticsearch/indices/IndicesModule.java index 76ac7e165d70e..dad12f9a295bb 100644 --- a/src/main/java/org/elasticsearch/indices/IndicesModule.java +++ b/src/main/java/org/elasticsearch/indices/IndicesModule.java @@ -29,10 +29,8 @@ import org.elasticsearch.indices.cache.filter.IndicesFilterCache; import org.elasticsearch.indices.cache.filter.terms.IndicesTermsFilterCache; import org.elasticsearch.indices.cluster.IndicesClusterStateService; -import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService; -import org.elasticsearch.indices.fielddata.breaker.InternalCircuitBreakerService; -import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCacheListener; import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache; +import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCacheListener; import org.elasticsearch.indices.memory.IndexingMemoryController; import org.elasticsearch.indices.query.IndicesQueriesModule; import org.elasticsearch.indices.recovery.RecoverySettings; @@ -81,7 +79,6 @@ protected void configure() { bind(IndicesWarmer.class).to(InternalIndicesWarmer.class).asEagerSingleton(); bind(UpdateHelper.class).asEagerSingleton(); - bind(CircuitBreakerService.class).to(InternalCircuitBreakerService.class).asEagerSingleton(); bind(IndicesFieldDataCacheListener.class).asEagerSingleton(); } } diff --git a/src/main/java/org/elasticsearch/indices/breaker/AllCircuitBreakerStats.java b/src/main/java/org/elasticsearch/indices/breaker/AllCircuitBreakerStats.java new file mode 100644 index 0000000000000..4ba07a50d5909 --- /dev/null +++ b/src/main/java/org/elasticsearch/indices/breaker/AllCircuitBreakerStats.java @@ -0,0 +1,102 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.indices.breaker; + +import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Streamable; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentBuilderString; + +import java.io.IOException; + +/** + * Stats class encapsulating all of the different circuit breaker stats + */ +public class AllCircuitBreakerStats implements Streamable, ToXContent { + + private CircuitBreakerStats[] allStats = new CircuitBreakerStats[0]; + + public AllCircuitBreakerStats() { + + } + + public AllCircuitBreakerStats(CircuitBreakerStats[] allStats) { + this.allStats = allStats; + } + + public CircuitBreakerStats[] getAllStats() { + return this.allStats; + } + + public CircuitBreakerStats getStats(CircuitBreaker.Name name) { + for (CircuitBreakerStats stats : allStats) { + if (stats.getName() == name) { + return stats; + } + } + return null; + } + + public static AllCircuitBreakerStats readOptionalAllCircuitBreakerStats(StreamInput in) throws IOException { + AllCircuitBreakerStats stats = in.readOptionalStreamable(new AllCircuitBreakerStats()); + return stats; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + int statCount = in.readVInt(); + CircuitBreakerStats[] newStats = new CircuitBreakerStats[statCount]; + for (int i = 0; i < statCount; i++) { + CircuitBreakerStats stats = new CircuitBreakerStats(); + stats.readFrom(in); + newStats[i] = stats; + } + allStats = newStats; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVInt(allStats.length); + for (CircuitBreakerStats stats : allStats) { + if (stats != null) { + stats.writeTo(out); + } + } + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(Fields.BREAKERS); + for (CircuitBreakerStats stats : allStats) { + if (stats != null) { + stats.toXContent(builder, params); + } + } + builder.endObject(); + return builder; + } + + static final class Fields { + static final XContentBuilderString BREAKERS = new XContentBuilderString("breakers"); + } +} diff --git a/src/main/java/org/elasticsearch/indices/breaker/BreakerSettings.java b/src/main/java/org/elasticsearch/indices/breaker/BreakerSettings.java new file mode 100644 index 0000000000000..b76e4b45dee53 --- /dev/null +++ b/src/main/java/org/elasticsearch/indices/breaker/BreakerSettings.java @@ -0,0 +1,58 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.indices.breaker; + +import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.unit.ByteSizeValue; + +/** + * Settings for a {@link CircuitBreaker} + */ +public class BreakerSettings { + + private final CircuitBreaker.Name name; + private final long limitBytes; + private final double overhead; + + public BreakerSettings(CircuitBreaker.Name name, long limitBytes, double overhead) { + this.name = name; + this.limitBytes = limitBytes; + this.overhead = overhead; + } + + public CircuitBreaker.Name getName() { + return this.name; + } + + public long getLimit() { + return this.limitBytes; + } + + public double getOverhead() { + return this.overhead; + } + + @Override + public String toString() { + return "[" + this.name.toString() + + ",limit=" + this.limitBytes + "/" + new ByteSizeValue(this.limitBytes) + + ",overhead=" + this.overhead + "]"; + } +} diff --git a/src/main/java/org/elasticsearch/indices/fielddata/breaker/CircuitBreakerService.java b/src/main/java/org/elasticsearch/indices/breaker/CircuitBreakerModule.java similarity index 57% rename from src/main/java/org/elasticsearch/indices/fielddata/breaker/CircuitBreakerService.java rename to src/main/java/org/elasticsearch/indices/breaker/CircuitBreakerModule.java index c53cc3e1ba202..6033015ce546a 100644 --- a/src/main/java/org/elasticsearch/indices/fielddata/breaker/CircuitBreakerService.java +++ b/src/main/java/org/elasticsearch/indices/breaker/CircuitBreakerModule.java @@ -17,24 +17,23 @@ * under the License. */ -package org.elasticsearch.indices.fielddata.breaker; +package org.elasticsearch.indices.breaker; -import org.elasticsearch.common.breaker.MemoryCircuitBreaker; +import org.elasticsearch.common.inject.AbstractModule; +import org.elasticsearch.common.settings.Settings; -/** - * Interface for Circuit Breaker services, which provide breakers to classes - * that load field data. - */ -public interface CircuitBreakerService { +public class CircuitBreakerModule extends AbstractModule { + + public static final String IMPL = "indices.breaker.breaker_impl"; - /** - * @return the breaker that can be used to register estimates against - */ - public MemoryCircuitBreaker getBreaker(); + private final Settings settings; + public CircuitBreakerModule(Settings settings) { + this.settings = settings; + } - /** - * @return stats about the breaker - */ - public FieldDataBreakerStats stats(); + @Override + protected void configure() { + bind(CircuitBreakerService.class).to(settings.getAsClass(IMPL, HierarchyCircuitBreakerService.class)).asEagerSingleton(); + } } diff --git a/src/main/java/org/elasticsearch/indices/breaker/CircuitBreakerService.java b/src/main/java/org/elasticsearch/indices/breaker/CircuitBreakerService.java new file mode 100644 index 0000000000000..35e5ee80e5a2e --- /dev/null +++ b/src/main/java/org/elasticsearch/indices/breaker/CircuitBreakerService.java @@ -0,0 +1,60 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.indices.breaker; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.component.AbstractLifecycleComponent; +import org.elasticsearch.common.settings.Settings; + +/** + * Interface for Circuit Breaker services, which provide breakers to classes + * that load field data. + */ +public abstract class CircuitBreakerService extends AbstractLifecycleComponent { + + protected CircuitBreakerService(Settings settings) { + super(settings); + } + + /** + * @return the breaker that can be used to register estimates against + */ + public abstract CircuitBreaker getBreaker(CircuitBreaker.Name type); + + /** + * @return stats about all breakers + */ + public abstract AllCircuitBreakerStats stats(); + + /** + * @return stats about a specific breaker + */ + public abstract CircuitBreakerStats stats(CircuitBreaker.Name name); + + protected void doStart() throws ElasticsearchException { + } + + protected void doStop() throws ElasticsearchException { + } + + protected void doClose() throws ElasticsearchException { + } +} diff --git a/src/main/java/org/elasticsearch/indices/fielddata/breaker/FieldDataBreakerStats.java b/src/main/java/org/elasticsearch/indices/breaker/CircuitBreakerStats.java similarity index 61% rename from src/main/java/org/elasticsearch/indices/fielddata/breaker/FieldDataBreakerStats.java rename to src/main/java/org/elasticsearch/indices/breaker/CircuitBreakerStats.java index 555c0f09efbd4..67bdd358dcd7a 100644 --- a/src/main/java/org/elasticsearch/indices/fielddata/breaker/FieldDataBreakerStats.java +++ b/src/main/java/org/elasticsearch/indices/breaker/CircuitBreakerStats.java @@ -17,9 +17,10 @@ * under the License. */ -package org.elasticsearch.indices.fielddata.breaker; +package org.elasticsearch.indices.breaker; import org.elasticsearch.Version; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Streamable; @@ -29,30 +30,37 @@ import org.elasticsearch.common.xcontent.XContentBuilderString; import java.io.IOException; +import java.util.Locale; /** * Class encapsulating stats about the circuit breaker */ -public class FieldDataBreakerStats implements Streamable, ToXContent { +public class CircuitBreakerStats implements Streamable, ToXContent { - private long maximum; + private CircuitBreaker.Name name; + private long limit; private long estimated; private long trippedCount; private double overhead; - FieldDataBreakerStats() { + CircuitBreakerStats() { } - public FieldDataBreakerStats(long maximum, long estimated, double overhead, long trippedCount) { - this.maximum = maximum; + public CircuitBreakerStats(CircuitBreaker.Name name, long limit, long estimated, double overhead, long trippedCount) { + this.name = name; + this.limit = limit; this.estimated = estimated; this.trippedCount = trippedCount; this.overhead = overhead; } - public long getMaximum() { - return this.maximum; + public CircuitBreaker.Name getName() { + return this.name; + } + + public long getLimit() { + return this.limit; } public long getEstimated() { @@ -67,14 +75,15 @@ public double getOverhead() { return this.overhead; } - public static FieldDataBreakerStats readOptionalCircuitBreakerStats(StreamInput in) throws IOException { - FieldDataBreakerStats stats = in.readOptionalStreamable(new FieldDataBreakerStats()); + public static CircuitBreakerStats readOptionalCircuitBreakerStats(StreamInput in) throws IOException { + CircuitBreakerStats stats = in.readOptionalStreamable(new CircuitBreakerStats()); return stats; } @Override public void readFrom(StreamInput in) throws IOException { - maximum = in.readLong(); + // limit is the maximum from the old circuit breaker stats for backwards compatibility + limit = in.readLong(); estimated = in.readLong(); overhead = in.readDouble(); if (in.getVersion().onOrAfter(Version.V_1_2_0)) { @@ -82,23 +91,31 @@ public void readFrom(StreamInput in) throws IOException { } else { this.trippedCount = -1; } + if (in.getVersion().onOrAfter(Version.V_1_4_0)) { + this.name = CircuitBreaker.Name.readFrom(in); + } else { + this.name = CircuitBreaker.Name.FIELDDATA; + } } @Override public void writeTo(StreamOutput out) throws IOException { - out.writeLong(maximum); + out.writeLong(limit); out.writeLong(estimated); out.writeDouble(overhead); if (out.getVersion().onOrAfter(Version.V_1_2_0)) { out.writeLong(trippedCount); } + if (out.getVersion().onOrAfter(Version.V_1_4_0)) { + CircuitBreaker.Name.writeTo(name, out); + } } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.startObject(Fields.BREAKER); - builder.field(Fields.MAX, maximum); - builder.field(Fields.MAX_HUMAN, new ByteSizeValue(maximum)); + builder.startObject(name.toString().toLowerCase(Locale.ROOT)); + builder.field(Fields.LIMIT, limit); + builder.field(Fields.LIMIT_HUMAN, new ByteSizeValue(limit)); builder.field(Fields.ESTIMATED, estimated); builder.field(Fields.ESTIMATED_HUMAN, new ByteSizeValue(estimated)); builder.field(Fields.OVERHEAD, overhead); @@ -107,10 +124,17 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws return builder; } + @Override + public String toString() { + return "[" + this.name.toString() + + ",limit=" + this.limit + "/" + new ByteSizeValue(this.limit) + + ",estimated=" + this.estimated + "/" + new ByteSizeValue(this.estimated) + + ",overhead=" + this.overhead + ",tripped=" + this.trippedCount + "]"; + } + static final class Fields { - static final XContentBuilderString BREAKER = new XContentBuilderString("fielddata_breaker"); - static final XContentBuilderString MAX = new XContentBuilderString("maximum_size_in_bytes"); - static final XContentBuilderString MAX_HUMAN = new XContentBuilderString("maximum_size"); + static final XContentBuilderString LIMIT = new XContentBuilderString("limit_size_in_bytes"); + static final XContentBuilderString LIMIT_HUMAN = new XContentBuilderString("limit_size"); static final XContentBuilderString ESTIMATED = new XContentBuilderString("estimated_size_in_bytes"); static final XContentBuilderString ESTIMATED_HUMAN = new XContentBuilderString("estimated_size"); static final XContentBuilderString OVERHEAD = new XContentBuilderString("overhead"); diff --git a/src/main/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerService.java b/src/main/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerService.java new file mode 100644 index 0000000000000..67bfaf4688815 --- /dev/null +++ b/src/main/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerService.java @@ -0,0 +1,235 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.indices.breaker; + +import com.google.common.collect.ImmutableMap; +import org.elasticsearch.ElasticsearchIllegalStateException; +import org.elasticsearch.common.breaker.ChildMemoryCircuitBreaker; +import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.breaker.CircuitBreakingException; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.node.settings.NodeSettingsService; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +import static com.google.common.collect.Lists.newArrayList; + +/** + * CircuitBreakerService that attempts to redistribute space between breakers + * if tripped + */ +public class HierarchyCircuitBreakerService extends CircuitBreakerService { + + private volatile ImmutableMap breakers; + + public static final String TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING = "indices.breaker.total.limit"; + public static final String DEFAULT_TOTAL_CIRCUIT_BREAKER_LIMIT = "70%"; + + public static final String FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING = "indices.breaker.fielddata.limit"; + public static final String FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING = "indices.breaker.fielddata.overhead"; + public static final String DEFAULT_FIELDDATA_BREAKER_LIMIT = "60%"; + public static final double DEFAULT_FIELDDATA_OVERHEAD_CONSTANT = 1.03; + + public static final String REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING = "indices.breaker.request.limit"; + public static final String REQUEST_CIRCUIT_BREAKER_OVERHEAD_SETTING = "indices.breaker.request.overhead"; + public static final String DEFAULT_REQUEST_BREAKER_LIMIT = "40%"; + + private volatile BreakerSettings parentSettings; + private volatile BreakerSettings fielddataSettings; + private volatile BreakerSettings requestSettings; + + // Tripped count for when redistribution was attempted but wasn't successful + private final AtomicLong parentTripCount = new AtomicLong(0); + + @Inject + public HierarchyCircuitBreakerService(Settings settings, NodeSettingsService nodeSettingsService) { + super(settings); + + // This uses the old InternalCircuitBreakerService.CIRCUIT_BREAKER_MAX_BYTES_SETTING + // setting to keep backwards compatibility with 1.2, it can be safely + // removed when compatibility with 1.2 is no longer needed + String compatibilityFielddataLimitDefault = DEFAULT_FIELDDATA_BREAKER_LIMIT; + ByteSizeValue compatibilityFielddataLimit = settings.getAsMemory(InternalCircuitBreakerService.CIRCUIT_BREAKER_MAX_BYTES_SETTING, null); + if (compatibilityFielddataLimit != null) { + compatibilityFielddataLimitDefault = compatibilityFielddataLimit.toString(); + } + + // This uses the old InternalCircuitBreakerService.CIRCUIT_BREAKER_OVERHEAD_SETTING + // setting to keep backwards compatibility with 1.2, it can be safely + // removed when compatibility with 1.2 is no longer needed + double compatibilityFielddataOverheadDefault = DEFAULT_FIELDDATA_OVERHEAD_CONSTANT; + Double compatibilityFielddataOverhead = settings.getAsDouble(InternalCircuitBreakerService.CIRCUIT_BREAKER_OVERHEAD_SETTING, null); + if (compatibilityFielddataOverhead != null) { + compatibilityFielddataOverheadDefault = compatibilityFielddataOverhead; + } + + this.fielddataSettings = new BreakerSettings(CircuitBreaker.Name.FIELDDATA, + settings.getAsMemory(FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING, compatibilityFielddataLimitDefault).bytes(), + settings.getAsDouble(FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING, compatibilityFielddataOverheadDefault)); + + this.requestSettings = new BreakerSettings(CircuitBreaker.Name.REQUEST, + settings.getAsMemory(REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING, DEFAULT_REQUEST_BREAKER_LIMIT).bytes(), + settings.getAsDouble(REQUEST_CIRCUIT_BREAKER_OVERHEAD_SETTING, 1.0)); + + // Validate the configured settings + validateSettings(new BreakerSettings[] {this.requestSettings, this.fielddataSettings}); + + this.parentSettings = new BreakerSettings(CircuitBreaker.Name.PARENT, + settings.getAsMemory(TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING, DEFAULT_TOTAL_CIRCUIT_BREAKER_LIMIT).bytes(), 1.0); + if (logger.isTraceEnabled()) { + logger.trace("parent circuit breaker with settings {}", this.parentSettings); + } + + Map tempBreakers = new HashMap<>(); + tempBreakers.put(CircuitBreaker.Name.FIELDDATA, new ChildMemoryCircuitBreaker(fielddataSettings, logger, this, CircuitBreaker.Name.FIELDDATA)); + tempBreakers.put(CircuitBreaker.Name.REQUEST, new ChildMemoryCircuitBreaker(requestSettings, logger, this, CircuitBreaker.Name.REQUEST)); + this.breakers = ImmutableMap.copyOf(tempBreakers); + + nodeSettingsService.addListener(new ApplySettings()); + } + + public class ApplySettings implements NodeSettingsService.Listener { + + @Override + public void onRefreshSettings(Settings settings) { + boolean changed = false; + + // Fielddata settings + BreakerSettings newFielddataSettings = HierarchyCircuitBreakerService.this.fielddataSettings; + ByteSizeValue newFielddataMax = settings.getAsMemory(FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING, null); + Double newFielddataOverhead = settings.getAsDouble(FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING, null); + if (newFielddataMax != null || newFielddataOverhead != null) { + changed = true; + long newFielddataLimitBytes = newFielddataMax == null ? HierarchyCircuitBreakerService.this.fielddataSettings.getLimit() : newFielddataMax.bytes(); + newFielddataOverhead = newFielddataOverhead == null ? HierarchyCircuitBreakerService.this.fielddataSettings.getOverhead() : newFielddataOverhead; + + newFielddataSettings = new BreakerSettings(CircuitBreaker.Name.FIELDDATA, newFielddataLimitBytes, newFielddataOverhead); + } + + // Request settings + BreakerSettings newRequestSettings = HierarchyCircuitBreakerService.this.requestSettings; + ByteSizeValue newRequestMax = settings.getAsMemory(REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING, null); + Double newRequestOverhead = settings.getAsDouble(REQUEST_CIRCUIT_BREAKER_OVERHEAD_SETTING, null); + if (newRequestMax != null || newRequestOverhead != null) { + changed = true; + long newRequestLimitBytes = newRequestMax == null ? HierarchyCircuitBreakerService.this.requestSettings.getLimit() : newRequestMax.bytes(); + newRequestOverhead = newRequestOverhead == null ? HierarchyCircuitBreakerService.this.requestSettings.getOverhead() : newRequestOverhead; + + newRequestSettings = new BreakerSettings(CircuitBreaker.Name.REQUEST, newRequestLimitBytes, newRequestOverhead); + } + + // Parent settings + BreakerSettings newParentSettings = HierarchyCircuitBreakerService.this.parentSettings; + long oldParentMax = HierarchyCircuitBreakerService.this.parentSettings.getLimit(); + ByteSizeValue newParentMax = settings.getAsMemory(TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING, null); + if (newParentMax != null && (newParentMax.bytes() != oldParentMax)) { + changed = true; + newParentSettings = new BreakerSettings(CircuitBreaker.Name.PARENT, newParentMax.bytes(), 1.0); + } + + if (changed) { + // change all the things + validateSettings(new BreakerSettings[] {newFielddataSettings, newRequestSettings}); + logger.info("Updating settings parent: {}, fielddata: {}, request: {}", newParentSettings, newFielddataSettings, newRequestSettings); + HierarchyCircuitBreakerService.this.parentSettings = newParentSettings; + HierarchyCircuitBreakerService.this.fielddataSettings = newFielddataSettings; + HierarchyCircuitBreakerService.this.requestSettings = newRequestSettings; + Map tempBreakers = new HashMap<>(); + tempBreakers.put(CircuitBreaker.Name.FIELDDATA, new ChildMemoryCircuitBreaker(newFielddataSettings, + (ChildMemoryCircuitBreaker)HierarchyCircuitBreakerService.this.breakers.get(CircuitBreaker.Name.FIELDDATA), + logger, HierarchyCircuitBreakerService.this, CircuitBreaker.Name.FIELDDATA)); + tempBreakers.put(CircuitBreaker.Name.REQUEST, new ChildMemoryCircuitBreaker(newRequestSettings, + (ChildMemoryCircuitBreaker)HierarchyCircuitBreakerService.this.breakers.get(CircuitBreaker.Name.REQUEST), + logger, HierarchyCircuitBreakerService.this, CircuitBreaker.Name.REQUEST)); + HierarchyCircuitBreakerService.this.breakers = ImmutableMap.copyOf(tempBreakers); + } + } + } + + /** + * Validate that child settings are valid + * @throws ElasticsearchIllegalStateException + */ + public static void validateSettings(BreakerSettings[] childrenSettings) throws ElasticsearchIllegalStateException { + for (BreakerSettings childSettings : childrenSettings) { + // If the child is disabled, ignore it + if (childSettings.getLimit() == -1) { + continue; + } + + if (childSettings.getOverhead() < 0) { + throw new ElasticsearchIllegalStateException("Child breaker overhead " + childSettings + " must be non-negative"); + } + } + } + + @Override + public CircuitBreaker getBreaker(CircuitBreaker.Name name) { + return this.breakers.get(name); + } + + @Override + public AllCircuitBreakerStats stats() { + long parentEstimated = 0; + List allStats = newArrayList(); + // Gather the "estimated" count for the parent breaker by adding the + // estimations for each individual breaker + for (CircuitBreaker breaker : this.breakers.values()) { + allStats.add(stats(breaker.getName())); + parentEstimated += breaker.getUsed(); + } + // Manually add the parent breaker settings since they aren't part of the breaker map + allStats.add(new CircuitBreakerStats(CircuitBreaker.Name.PARENT, parentSettings.getLimit(), + parentEstimated, 1.0, parentTripCount.get())); + return new AllCircuitBreakerStats(allStats.toArray(new CircuitBreakerStats[allStats.size()])); + } + + @Override + public CircuitBreakerStats stats(CircuitBreaker.Name name) { + CircuitBreaker breaker = this.breakers.get(name); + return new CircuitBreakerStats(breaker.getName(), breaker.getLimit(), breaker.getUsed(), breaker.getOverhead(), breaker.getTrippedCount()); + } + + /** + * Checks whether the parent breaker has been tripped + * @param label + * @throws CircuitBreakingException + */ + public void checkParentLimit(String label) throws CircuitBreakingException { + long totalUsed = 0; + for (CircuitBreaker breaker : this.breakers.values()) { + totalUsed += (breaker.getUsed() * breaker.getOverhead()); + } + + long parentLimit = this.parentSettings.getLimit(); + if (totalUsed > parentLimit) { + this.parentTripCount.incrementAndGet(); + throw new CircuitBreakingException("[PARENT] Data too large, data for [" + + label + "] would be larger than limit of [" + + parentLimit + "/" + new ByteSizeValue(parentLimit) + "]", + totalUsed, parentLimit); + } + } +} diff --git a/src/main/java/org/elasticsearch/indices/fielddata/breaker/InternalCircuitBreakerService.java b/src/main/java/org/elasticsearch/indices/breaker/InternalCircuitBreakerService.java similarity index 83% rename from src/main/java/org/elasticsearch/indices/fielddata/breaker/InternalCircuitBreakerService.java rename to src/main/java/org/elasticsearch/indices/breaker/InternalCircuitBreakerService.java index 7eabc0c24d84f..c64779c96b468 100644 --- a/src/main/java/org/elasticsearch/indices/fielddata/breaker/InternalCircuitBreakerService.java +++ b/src/main/java/org/elasticsearch/indices/breaker/InternalCircuitBreakerService.java @@ -16,11 +16,10 @@ * specific language governing permissions and limitations * under the License. */ -package org.elasticsearch.indices.fielddata.breaker; +package org.elasticsearch.indices.breaker; -import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.breaker.MemoryCircuitBreaker; -import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; @@ -32,7 +31,7 @@ * that can be used to keep track of memory usage across the node, preventing * actions that could cause an {@link OutOfMemoryError} on the node. */ -public class InternalCircuitBreakerService extends AbstractLifecycleComponent implements CircuitBreakerService { +public class InternalCircuitBreakerService extends CircuitBreakerService { public static final String CIRCUIT_BREAKER_MAX_BYTES_SETTING = "indices.fielddata.breaker.limit"; public static final String CIRCUIT_BREAKER_OVERHEAD_SETTING = "indices.fielddata.breaker.overhead"; @@ -87,7 +86,8 @@ InternalCircuitBreakerService.this.maxBytes, new ByteSizeValue(InternalCircuitBr /** * @return a {@link org.elasticsearch.common.breaker.MemoryCircuitBreaker} that can be used for aggregating memory usage */ - public MemoryCircuitBreaker getBreaker() { + public MemoryCircuitBreaker getBreaker(CircuitBreaker.Name name) { + // Return the only breaker, regardless of name return this.breaker; } @@ -104,19 +104,17 @@ public synchronized void resetBreaker() { } @Override - public FieldDataBreakerStats stats() { - return new FieldDataBreakerStats(breaker.getMaximum(), breaker.getUsed(), breaker.getOverhead(), breaker.getTrippedCount()); + public AllCircuitBreakerStats stats() { + return new AllCircuitBreakerStats(new CircuitBreakerStats[]{ + new CircuitBreakerStats(CircuitBreaker.Name.FIELDDATA, breaker.getLimit(), breaker.getUsed(), + breaker.getOverhead(), breaker.getTrippedCount()) + }); } @Override - protected void doStart() throws ElasticsearchException { - } - - @Override - protected void doStop() throws ElasticsearchException { - } - - @Override - protected void doClose() throws ElasticsearchException { + public CircuitBreakerStats stats(CircuitBreaker.Name name) { + // There is only a single breaker, so always return it + return new CircuitBreakerStats(CircuitBreaker.Name.FIELDDATA, breaker.getLimit(), breaker.getUsed(), + breaker.getOverhead(), breaker.getTrippedCount()); } } diff --git a/src/main/java/org/elasticsearch/indices/fielddata/breaker/NoneCircuitBreakerService.java b/src/main/java/org/elasticsearch/indices/breaker/NoneCircuitBreakerService.java similarity index 65% rename from src/main/java/org/elasticsearch/indices/fielddata/breaker/NoneCircuitBreakerService.java rename to src/main/java/org/elasticsearch/indices/breaker/NoneCircuitBreakerService.java index f5fa7dac8d9f0..c8d76ee9bfd90 100644 --- a/src/main/java/org/elasticsearch/indices/fielddata/breaker/NoneCircuitBreakerService.java +++ b/src/main/java/org/elasticsearch/indices/breaker/NoneCircuitBreakerService.java @@ -17,31 +17,41 @@ * under the License. */ -package org.elasticsearch.indices.fielddata.breaker; +package org.elasticsearch.indices.breaker; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.breaker.MemoryCircuitBreaker; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.unit.ByteSizeValue; /** * Class that returns a breaker that never breaks */ -public class NoneCircuitBreakerService implements CircuitBreakerService { +public class NoneCircuitBreakerService extends CircuitBreakerService { private final ESLogger logger = Loggers.getLogger(NoneCircuitBreakerService.class); private final MemoryCircuitBreaker breaker = new MemoryCircuitBreaker(new ByteSizeValue(Long.MAX_VALUE), 0.0, logger); - public NoneCircuitBreakerService() {} + public NoneCircuitBreakerService() { + super(ImmutableSettings.EMPTY); + } + @Override - public MemoryCircuitBreaker getBreaker() { + public MemoryCircuitBreaker getBreaker(CircuitBreaker.Name name) { return breaker; } @Override - public FieldDataBreakerStats stats() { - return new FieldDataBreakerStats(-1, -1, 0, 0); + public AllCircuitBreakerStats stats() { + return new AllCircuitBreakerStats(new CircuitBreakerStats[] {stats(CircuitBreaker.Name.FIELDDATA)}); + } + + @Override + public CircuitBreakerStats stats(CircuitBreaker.Name name) { + return new CircuitBreakerStats(CircuitBreaker.Name.FIELDDATA, -1, -1, 0, 0); } } diff --git a/src/main/java/org/elasticsearch/indices/fielddata/cache/IndicesFieldDataCacheListener.java b/src/main/java/org/elasticsearch/indices/fielddata/cache/IndicesFieldDataCacheListener.java index 30dcf64ac376e..5bbdc5fa8deba 100644 --- a/src/main/java/org/elasticsearch/indices/fielddata/cache/IndicesFieldDataCacheListener.java +++ b/src/main/java/org/elasticsearch/indices/fielddata/cache/IndicesFieldDataCacheListener.java @@ -20,11 +20,12 @@ package org.elasticsearch.indices.fielddata.cache; import org.apache.lucene.util.Accountable; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.index.fielddata.FieldDataType; import org.elasticsearch.index.fielddata.IndexFieldDataCache; import org.elasticsearch.index.mapper.FieldMapper; -import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService; +import org.elasticsearch.indices.breaker.CircuitBreakerService; /** * A {@link IndexFieldDataCache.Listener} implementation that updates indices (node) level statistics / service about @@ -48,7 +49,7 @@ public void onLoad(FieldMapper.Names fieldNames, FieldDataType fieldDataType, Ac @Override public void onUnload(FieldMapper.Names fieldNames, FieldDataType fieldDataType, boolean wasEvicted, long sizeInBytes) { assert sizeInBytes >= 0 : "When reducing circuit breaker, it should be adjusted with a number higher or equal to 0 and not [" + sizeInBytes + "]"; - circuitBreakerService.getBreaker().addWithoutBreaking(-sizeInBytes); + circuitBreakerService.getBreaker(CircuitBreaker.Name.FIELDDATA).addWithoutBreaking(-sizeInBytes); } } diff --git a/src/main/java/org/elasticsearch/node/internal/InternalNode.java b/src/main/java/org/elasticsearch/node/internal/InternalNode.java index 8668383db03c1..c8a12fa221978 100644 --- a/src/main/java/org/elasticsearch/node/internal/InternalNode.java +++ b/src/main/java/org/elasticsearch/node/internal/InternalNode.java @@ -19,7 +19,6 @@ package org.elasticsearch.node.internal; -import org.apache.lucene.util.IOUtils; import org.elasticsearch.Build; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; @@ -39,7 +38,6 @@ import org.elasticsearch.cluster.action.index.MappingUpdatedAction; import org.elasticsearch.cluster.routing.RoutingService; import org.elasticsearch.cluster.routing.allocation.AllocationService; -import org.elasticsearch.cluster.service.InternalClusterService; import org.elasticsearch.common.StopWatch; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.component.Lifecycle; @@ -49,7 +47,6 @@ import org.elasticsearch.common.inject.Injectors; import org.elasticsearch.common.inject.ModulesBuilder; import org.elasticsearch.common.io.CachedStreams; -import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.Loggers; @@ -72,6 +69,7 @@ import org.elasticsearch.index.search.shape.ShapeModule; import org.elasticsearch.indices.IndicesModule; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.indices.breaker.CircuitBreakerModule; import org.elasticsearch.indices.cache.filter.IndicesFilterCache; import org.elasticsearch.indices.cluster.IndicesClusterStateService; import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache; @@ -162,6 +160,7 @@ public InternalNode(Settings pSettings, boolean loadConfigSettings) throws Elast modules.add(new Version.Module(version)); modules.add(new CacheRecyclerModule(settings)); modules.add(new PageCacheRecyclerModule(settings)); + modules.add(new CircuitBreakerModule(settings)); modules.add(new BigArraysModule(settings)); modules.add(new PluginsModule(settings, pluginsService)); modules.add(new SettingsModule(settings)); diff --git a/src/main/java/org/elasticsearch/node/service/NodeService.java b/src/main/java/org/elasticsearch/node/service/NodeService.java index b498a013e686c..e08aae33aff04 100644 --- a/src/main/java/org/elasticsearch/node/service/NodeService.java +++ b/src/main/java/org/elasticsearch/node/service/NodeService.java @@ -33,7 +33,7 @@ import org.elasticsearch.discovery.Discovery; import org.elasticsearch.http.HttpServer; import org.elasticsearch.indices.IndicesService; -import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService; +import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.monitor.MonitorService; import org.elasticsearch.plugins.PluginsService; import org.elasticsearch.threadpool.ThreadPool; diff --git a/src/main/java/org/elasticsearch/percolator/PercolateContext.java b/src/main/java/org/elasticsearch/percolator/PercolateContext.java index 44fcde3e1ea01..701e0648ffb2e 100644 --- a/src/main/java/org/elasticsearch/percolator/PercolateContext.java +++ b/src/main/java/org/elasticsearch/percolator/PercolateContext.java @@ -122,7 +122,7 @@ public PercolateContext(PercolateShardRequest request, SearchShardTarget searchS this.types = new String[]{request.documentType()}; this.cacheRecycler = cacheRecycler; this.pageCacheRecycler = pageCacheRecycler; - this.bigArrays = bigArrays; + this.bigArrays = bigArrays.withCircuitBreaking(); this.querySearchResult = new QuerySearchResult(0, searchShardTarget); this.engineSearcher = indexShard.acquireSearcher("percolate"); this.searcher = new ContextIndexSearcher(this, engineSearcher); diff --git a/src/main/java/org/elasticsearch/search/internal/DefaultSearchContext.java b/src/main/java/org/elasticsearch/search/internal/DefaultSearchContext.java index ce7a93c145e33..94dd0b7d81cb4 100644 --- a/src/main/java/org/elasticsearch/search/internal/DefaultSearchContext.java +++ b/src/main/java/org/elasticsearch/search/internal/DefaultSearchContext.java @@ -193,7 +193,8 @@ public DefaultSearchContext(long id, ShardSearchRequest request, SearchShardTarg this.scriptService = scriptService; this.cacheRecycler = cacheRecycler; this.pageCacheRecycler = pageCacheRecycler; - this.bigArrays = bigArrays; + // SearchContexts use a BigArrays that can circuit break + this.bigArrays = bigArrays.withCircuitBreaking(); this.dfsResult = new DfsSearchResult(id, shardTarget); this.queryResult = new QuerySearchResult(id, shardTarget); this.fetchResult = new FetchSearchResult(id, shardTarget); diff --git a/src/test/java/org/elasticsearch/benchmark/fielddata/LongFieldDataBenchmark.java b/src/test/java/org/elasticsearch/benchmark/fielddata/LongFieldDataBenchmark.java index 9e5ca40f8ea36..da57aa8867ae2 100644 --- a/src/test/java/org/elasticsearch/benchmark/fielddata/LongFieldDataBenchmark.java +++ b/src/test/java/org/elasticsearch/benchmark/fielddata/LongFieldDataBenchmark.java @@ -37,7 +37,7 @@ import org.elasticsearch.index.mapper.ContentPath; import org.elasticsearch.index.mapper.Mapper.BuilderContext; import org.elasticsearch.index.mapper.core.LongFieldMapper; -import org.elasticsearch.indices.fielddata.breaker.NoneCircuitBreakerService; +import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import java.util.Random; diff --git a/src/test/java/org/elasticsearch/common/breaker/MemoryCircuitBreakerTests.java b/src/test/java/org/elasticsearch/common/breaker/MemoryCircuitBreakerTests.java index cf65951049dd2..763b7b5173ee6 100644 --- a/src/test/java/org/elasticsearch/common/breaker/MemoryCircuitBreakerTests.java +++ b/src/test/java/org/elasticsearch/common/breaker/MemoryCircuitBreakerTests.java @@ -19,11 +19,17 @@ package org.elasticsearch.common.breaker; +import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.indices.breaker.BreakerSettings; +import org.elasticsearch.indices.breaker.CircuitBreakerService; +import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService; +import org.elasticsearch.node.settings.NodeSettingsService; import org.elasticsearch.test.ElasticsearchTestCase; import org.junit.Test; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import static org.hamcrest.Matchers.equalTo; @@ -75,6 +81,127 @@ public void run() { assertThat("breaker was tripped exactly once", breaker.getTrippedCount(), equalTo(1L)); } + @Test + public void testThreadedUpdatesToChildBreaker() throws Exception { + final int NUM_THREADS = 5; + final int BYTES_PER_THREAD = 1000; + final Thread[] threads = new Thread[NUM_THREADS]; + final AtomicBoolean tripped = new AtomicBoolean(false); + final AtomicReference lastException = new AtomicReference<>(null); + + final AtomicReference breakerRef = new AtomicReference<>(null); + final CircuitBreakerService service = new HierarchyCircuitBreakerService(ImmutableSettings.EMPTY, new NodeSettingsService(ImmutableSettings.EMPTY)) { + + @Override + public CircuitBreaker getBreaker(CircuitBreaker.Name type) { + return breakerRef.get(); + } + + @Override + public void checkParentLimit(String label) throws CircuitBreakingException { + // never trip + } + }; + final BreakerSettings settings = new BreakerSettings(CircuitBreaker.Name.REQUEST, (BYTES_PER_THREAD * NUM_THREADS) - 1, 1.0); + final ChildMemoryCircuitBreaker breaker = new ChildMemoryCircuitBreaker(settings, logger, + (HierarchyCircuitBreakerService)service, CircuitBreaker.Name.REQUEST); + breakerRef.set(breaker); + + for (int i = 0; i < NUM_THREADS; i++) { + threads[i] = new Thread(new Runnable() { + @Override + public void run() { + for (int j = 0; j < BYTES_PER_THREAD; j++) { + try { + breaker.addEstimateBytesAndMaybeBreak(1L, "test"); + } catch (CircuitBreakingException e) { + if (tripped.get()) { + assertThat("tripped too many times", true, equalTo(false)); + } else { + assertThat(tripped.compareAndSet(false, true), equalTo(true)); + } + } catch (Throwable e2) { + lastException.set(e2); + } + } + } + }); + + threads[i].start(); + } + + for (Thread t : threads) { + t.join(); + } + + assertThat("no other exceptions were thrown", lastException.get(), equalTo(null)); + assertThat("breaker was tripped exactly once", tripped.get(), equalTo(true)); + assertThat("breaker was tripped exactly once", breaker.getTrippedCount(), equalTo(1L)); + } + + @Test + public void testThreadedUpdatesToChildBreakerWithParentLimit() throws Exception { + final int NUM_THREADS = 5; + final int BYTES_PER_THREAD = 1000; + final Thread[] threads = new Thread[NUM_THREADS]; + final AtomicInteger tripped = new AtomicInteger(0); + final AtomicReference lastException = new AtomicReference<>(null); + + final AtomicInteger parentTripped = new AtomicInteger(0); + final AtomicReference breakerRef = new AtomicReference<>(null); + final CircuitBreakerService service = new HierarchyCircuitBreakerService(ImmutableSettings.EMPTY, new NodeSettingsService(ImmutableSettings.EMPTY)) { + + @Override + public CircuitBreaker getBreaker(CircuitBreaker.Name type) { + return breakerRef.get(); + } + + @Override + public void checkParentLimit(String label) throws CircuitBreakingException { + // Parent will trip right before regular breaker would trip + if (getBreaker(CircuitBreaker.Name.REQUEST).getUsed() > (BYTES_PER_THREAD * NUM_THREADS) - 2) { + parentTripped.incrementAndGet(); + throw new CircuitBreakingException("parent tripped"); + } + } + }; + final BreakerSettings settings = new BreakerSettings(CircuitBreaker.Name.REQUEST, (BYTES_PER_THREAD * NUM_THREADS) - 1, 1.0); + final ChildMemoryCircuitBreaker breaker = new ChildMemoryCircuitBreaker(settings, logger, + (HierarchyCircuitBreakerService)service, CircuitBreaker.Name.REQUEST); + breakerRef.set(breaker); + + for (int i = 0; i < NUM_THREADS; i++) { + threads[i] = new Thread(new Runnable() { + @Override + public void run() { + for (int j = 0; j < BYTES_PER_THREAD; j++) { + try { + breaker.addEstimateBytesAndMaybeBreak(1L, "test"); + } catch (CircuitBreakingException e) { + tripped.incrementAndGet(); + if (tripped.get() > 2) { + assertThat("tripped too many times: " + tripped.get(), true, equalTo(false)); + } + } catch (Throwable e2) { + lastException.set(e2); + } + } + } + }); + + threads[i].start(); + } + + for (Thread t : threads) { + t.join(); + } + + assertThat("no other exceptions were thrown", lastException.get(), equalTo(null)); + assertThat("breaker was tripped exactly once", breaker.getTrippedCount(), equalTo(0L)); + assertThat("parent breaker was tripped exactly twice", parentTripped.get(), equalTo(2)); + assertThat("total breaker was tripped exactly twice", tripped.get(), equalTo(2)); + } + @Test public void testConstantFactor() throws Exception { final MemoryCircuitBreaker breaker = new MemoryCircuitBreaker(new ByteSizeValue(15), 1.6, logger); diff --git a/src/test/java/org/elasticsearch/common/bytes/PagedBytesReferenceTest.java b/src/test/java/org/elasticsearch/common/bytes/PagedBytesReferenceTest.java index becce28b85f6a..0b5fe1d6743cc 100644 --- a/src/test/java/org/elasticsearch/common/bytes/PagedBytesReferenceTest.java +++ b/src/test/java/org/elasticsearch/common/bytes/PagedBytesReferenceTest.java @@ -27,6 +27,7 @@ import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.ByteArray; +import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.test.ElasticsearchTestCase; import org.hamcrest.Matchers; import org.jboss.netty.buffer.ChannelBuffer; @@ -49,7 +50,7 @@ public class PagedBytesReferenceTest extends ElasticsearchTestCase { @Before public void setUp() throws Exception { super.setUp(); - bigarrays = new BigArrays(ImmutableSettings.EMPTY, null); + bigarrays = new BigArrays(ImmutableSettings.EMPTY, null, new NoneCircuitBreakerService()); } @After diff --git a/src/test/java/org/elasticsearch/common/util/BigArraysTests.java b/src/test/java/org/elasticsearch/common/util/BigArraysTests.java index 89c5fe816b32f..bcf7e4a5dc5a6 100644 --- a/src/test/java/org/elasticsearch/common/util/BigArraysTests.java +++ b/src/test/java/org/elasticsearch/common/util/BigArraysTests.java @@ -20,9 +20,13 @@ package org.elasticsearch.common.util; import org.apache.lucene.util.BytesRef; -import org.elasticsearch.ElasticsearchIllegalStateException; import org.elasticsearch.cache.recycler.PageCacheRecycler; +import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.breaker.CircuitBreakingException; import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService; +import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; +import org.elasticsearch.node.settings.NodeSettingsService; import org.elasticsearch.test.ElasticsearchTestCase; import org.elasticsearch.test.cache.recycler.MockBigArrays; import org.elasticsearch.test.cache.recycler.MockPageCacheRecycler; @@ -37,7 +41,7 @@ public class BigArraysTests extends ElasticsearchTestCase { public static BigArrays randombigArrays() { final PageCacheRecycler recycler = randomBoolean() ? null : new MockPageCacheRecycler(ImmutableSettings.EMPTY, new ThreadPool("BigArraysTests")); - return new MockBigArrays(ImmutableSettings.EMPTY, recycler); + return new MockBigArrays(ImmutableSettings.EMPTY, recycler, new NoneCircuitBreakerService()); } private BigArrays bigArrays; @@ -283,7 +287,7 @@ public void testByteArrayEquals() { // not equal: contents differ final ByteArray a3 = byteArrayWithBytes(new byte[]{1,2,3}); - final ByteArray a4 = byteArrayWithBytes(new byte[]{1,1,3}); + final ByteArray a4 = byteArrayWithBytes(new byte[]{1, 1, 3}); assertFalse(bigArrays.equals(a3, a4)); a3.close(); a4.close(); @@ -329,58 +333,51 @@ private ByteArray byteArrayWithBytes(byte[] bytes) { return bytearray; } - public void testByteAccounting() throws Exception { - for (String type : Arrays.asList("Byte", "Int", "Long", "Float", "Double", "Object")) { - BigArrays bigArrays = new BigArrays(ImmutableSettings.builder().put(BigArrays.MAX_SIZE_IN_BYTES_SETTING, Long.MAX_VALUE).build(), null); - Method create = BigArrays.class.getMethod("new" + type + "Array", long.class); - final int size = scaledRandomIntBetween(5, 1 << 16); - BigArray array = (BigArray) create.invoke(bigArrays, size); - assertEquals(array.ramBytesUsed(), bigArrays.sizeInBytes()); - Method resize = BigArrays.class.getMethod("resize", array.getClass().getInterfaces()[0], long.class); - int newSize = scaledRandomIntBetween(5, 1 << 16); - array = (BigArray) resize.invoke(bigArrays, array, newSize); - assertEquals(array.ramBytesUsed(), bigArrays.sizeInBytes()); - array.close(); - assertEquals(0, bigArrays.sizeInBytes()); - } - } - public void testMaxSizeExceededOnNew() throws Exception { final int size = scaledRandomIntBetween(5, 1 << 22); for (String type : Arrays.asList("Byte", "Int", "Long", "Float", "Double", "Object")) { - BigArrays bigArrays = new BigArrays(ImmutableSettings.builder().put(BigArrays.MAX_SIZE_IN_BYTES_SETTING, randomIntBetween(1, size)).build(), null); + HierarchyCircuitBreakerService hcbs = new HierarchyCircuitBreakerService( + ImmutableSettings.builder() + .put(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING, size) + .build(), + new NodeSettingsService(ImmutableSettings.EMPTY)); + BigArrays bigArrays = new BigArrays(ImmutableSettings.EMPTY, null, hcbs).withCircuitBreaking(); Method create = BigArrays.class.getMethod("new" + type + "Array", long.class); try { create.invoke(bigArrays, size); fail("expected an exception on " + create); } catch (InvocationTargetException e) { - assertTrue(e.getCause() instanceof ElasticsearchIllegalStateException); + assertTrue(e.getCause() instanceof CircuitBreakingException); } - assertEquals(0, bigArrays.sizeInBytes()); + assertEquals(0, hcbs.getBreaker(CircuitBreaker.Name.REQUEST).getUsed()); } } public void testMaxSizeExceededOnResize() throws Exception { for (String type : Arrays.asList("Byte", "Int", "Long", "Float", "Double", "Object")) { final long maxSize = randomIntBetween(1 << 10, 1 << 22); - BigArrays bigArrays = new BigArrays(ImmutableSettings.builder().put(BigArrays.MAX_SIZE_IN_BYTES_SETTING, maxSize).build(), null); + HierarchyCircuitBreakerService hcbs = new HierarchyCircuitBreakerService( + ImmutableSettings.builder() + .put(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING, maxSize) + .build(), + new NodeSettingsService(ImmutableSettings.EMPTY)); + BigArrays bigArrays = new BigArrays(ImmutableSettings.EMPTY, null, hcbs).withCircuitBreaking(); Method create = BigArrays.class.getMethod("new" + type + "Array", long.class); final int size = scaledRandomIntBetween(1, 20); BigArray array = (BigArray) create.invoke(bigArrays, size); Method resize = BigArrays.class.getMethod("resize", array.getClass().getInterfaces()[0], long.class); while (true) { long newSize = array.size() * 2; - assertEquals(array.ramBytesUsed(), bigArrays.sizeInBytes()); try { array = (BigArray) resize.invoke(bigArrays, array, newSize); } catch (InvocationTargetException e) { - assertTrue(e.getCause() instanceof ElasticsearchIllegalStateException); + assertTrue(e.getCause() instanceof CircuitBreakingException); break; } } - assertEquals(array.ramBytesUsed(), bigArrays.sizeInBytes()); + assertEquals(array.ramBytesUsed(), hcbs.getBreaker(CircuitBreaker.Name.REQUEST).getUsed()); array.close(); - assertEquals(0, bigArrays.sizeInBytes()); + assertEquals(0, hcbs.getBreaker(CircuitBreaker.Name.REQUEST).getUsed()); } } diff --git a/src/test/java/org/elasticsearch/index/query/TemplateQueryParserTest.java b/src/test/java/org/elasticsearch/index/query/TemplateQueryParserTest.java index 58458c89c1038..508f69f6718eb 100644 --- a/src/test/java/org/elasticsearch/index/query/TemplateQueryParserTest.java +++ b/src/test/java/org/elasticsearch/index/query/TemplateQueryParserTest.java @@ -42,8 +42,8 @@ import org.elasticsearch.index.query.functionscore.FunctionScoreModule; import org.elasticsearch.index.settings.IndexSettingsModule; import org.elasticsearch.index.similarity.SimilarityModule; -import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService; -import org.elasticsearch.indices.fielddata.breaker.NoneCircuitBreakerService; +import org.elasticsearch.indices.breaker.CircuitBreakerService; +import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.indices.query.IndicesQueriesModule; import org.elasticsearch.script.ScriptModule; import org.elasticsearch.test.ElasticsearchTestCase; diff --git a/src/test/java/org/elasticsearch/index/query/plugin/IndexQueryParserPlugin2Tests.java b/src/test/java/org/elasticsearch/index/query/plugin/IndexQueryParserPlugin2Tests.java index 93643b9b0edb7..ef2c94f9f2a63 100644 --- a/src/test/java/org/elasticsearch/index/query/plugin/IndexQueryParserPlugin2Tests.java +++ b/src/test/java/org/elasticsearch/index/query/plugin/IndexQueryParserPlugin2Tests.java @@ -39,10 +39,8 @@ import org.elasticsearch.index.query.functionscore.FunctionScoreModule; import org.elasticsearch.index.settings.IndexSettingsModule; import org.elasticsearch.index.similarity.SimilarityModule; -import org.elasticsearch.indices.fielddata.breaker.NoneCircuitBreakerService; -import org.elasticsearch.indices.query.IndicesQueriesModule; -import org.elasticsearch.script.ScriptModule; -import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService; +import org.elasticsearch.indices.breaker.CircuitBreakerService; +import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.indices.query.IndicesQueriesModule; import org.elasticsearch.script.ScriptModule; import org.elasticsearch.test.ElasticsearchTestCase; @@ -101,4 +99,4 @@ protected void configure() { injector.getInstance(ThreadPool.class).shutdownNow(); } -} \ No newline at end of file +} diff --git a/src/test/java/org/elasticsearch/index/query/plugin/IndexQueryParserPluginTests.java b/src/test/java/org/elasticsearch/index/query/plugin/IndexQueryParserPluginTests.java index 0e3b322e9f995..b6265f9633d60 100644 --- a/src/test/java/org/elasticsearch/index/query/plugin/IndexQueryParserPluginTests.java +++ b/src/test/java/org/elasticsearch/index/query/plugin/IndexQueryParserPluginTests.java @@ -39,10 +39,8 @@ import org.elasticsearch.index.query.functionscore.FunctionScoreModule; import org.elasticsearch.index.settings.IndexSettingsModule; import org.elasticsearch.index.similarity.SimilarityModule; -import org.elasticsearch.indices.fielddata.breaker.NoneCircuitBreakerService; -import org.elasticsearch.indices.query.IndicesQueriesModule; -import org.elasticsearch.script.ScriptModule; -import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService; +import org.elasticsearch.indices.breaker.CircuitBreakerService; +import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.indices.query.IndicesQueriesModule; import org.elasticsearch.script.ScriptModule; import org.elasticsearch.test.ElasticsearchTestCase; @@ -110,4 +108,4 @@ protected void configure() { injector.getInstance(ThreadPool.class).shutdownNow(); } -} \ No newline at end of file +} diff --git a/src/test/java/org/elasticsearch/indices/fielddata/breaker/CircuitBreakerServiceTests.java b/src/test/java/org/elasticsearch/indices/fielddata/breaker/CircuitBreakerServiceTests.java deleted file mode 100644 index 76da699fc1668..0000000000000 --- a/src/test/java/org/elasticsearch/indices/fielddata/breaker/CircuitBreakerServiceTests.java +++ /dev/null @@ -1,169 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.indices.fielddata.breaker; - -import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; -import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; -import org.elasticsearch.client.Client; -import org.elasticsearch.common.collect.MapBuilder; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.rest.RestStatus; -import org.elasticsearch.test.ElasticsearchIntegrationTest; -import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope; -import org.elasticsearch.test.junit.annotations.TestLogging; -import org.junit.Test; - -import java.util.Arrays; - -import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; -import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; -import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope.TEST; -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertFailures; -import static org.hamcrest.CoreMatchers.containsString; -import static org.hamcrest.Matchers.greaterThanOrEqualTo; - -/** - * Integration tests for InternalCircuitBreakerService - */ -@ClusterScope(scope = TEST, randomDynamicTemplates = false) -public class CircuitBreakerServiceTests extends ElasticsearchIntegrationTest { - - private String randomRidiculouslySmallLimit() { - // 3 different ways to say 100 bytes - return randomFrom(Arrays.asList("100b", "100")); - //, (10000. / JvmInfo.jvmInfo().getMem().getHeapMax().bytes()) + "%")); // this is prone to rounding errors and will fail if JVM memory changes! - } - - @Test - @TestLogging("org.elasticsearch.indices.fielddata.breaker:TRACE,org.elasticsearch.index.fielddata:TRACE,org.elasticsearch.common.breaker:TRACE") - public void testMemoryBreaker() { - assertAcked(prepareCreate("cb-test", 1, settingsBuilder().put(SETTING_NUMBER_OF_REPLICAS, between(0, 1)))); - final Client client = client(); - - try { - // index some different terms so we have some field data for loading - int docCount = scaledRandomIntBetween(300, 1000); - for (long id = 0; id < docCount; id++) { - client.prepareIndex("cb-test", "type", Long.toString(id)) - .setSource(MapBuilder.newMapBuilder().put("test", "value" + id).map()).execute().actionGet(); - } - - // refresh - refresh(); - - // execute a search that loads field data (sorting on the "test" field) - client.prepareSearch("cb-test").setSource("{\"sort\": \"test\",\"query\":{\"match_all\":{}}}") - .execute().actionGet(); - - // clear field data cache (thus setting the loaded field data back to 0) - client.admin().indices().prepareClearCache("cb-test").setFieldDataCache(true).execute().actionGet(); - - // Update circuit breaker settings - Settings settings = settingsBuilder() - .put(InternalCircuitBreakerService.CIRCUIT_BREAKER_MAX_BYTES_SETTING, randomRidiculouslySmallLimit()) - .put(InternalCircuitBreakerService.CIRCUIT_BREAKER_OVERHEAD_SETTING, 1.05) - .build(); - client.admin().cluster().prepareUpdateSettings().setTransientSettings(settings).execute().actionGet(); - - // execute a search that loads field data (sorting on the "test" field) - // again, this time it should trip the breaker - assertFailures(client.prepareSearch("cb-test").setSource("{\"sort\": \"test\",\"query\":{\"match_all\":{}}}"), - RestStatus.INTERNAL_SERVER_ERROR, - containsString("Data too large, data for field [test] would be larger than limit of [100/100b]")); - - NodesStatsResponse stats = client.admin().cluster().prepareNodesStats().setBreaker(true).get(); - int breaks = 0; - for (NodeStats stat : stats.getNodes()) { - FieldDataBreakerStats breakerStats = stat.getBreaker(); - breaks += breakerStats.getTrippedCount(); - } - assertThat(breaks, greaterThanOrEqualTo(1)); - } finally { - // Reset settings - Settings resetSettings = settingsBuilder() - .put(InternalCircuitBreakerService.CIRCUIT_BREAKER_MAX_BYTES_SETTING, "-1") - .put(InternalCircuitBreakerService.CIRCUIT_BREAKER_OVERHEAD_SETTING, InternalCircuitBreakerService.DEFAULT_OVERHEAD_CONSTANT) - .build(); - client.admin().cluster().prepareUpdateSettings().setTransientSettings(resetSettings).execute().actionGet(); - } - } - - @Test - @TestLogging("org.elasticsearch.indices.fielddata.breaker:TRACE,org.elasticsearch.index.fielddata:TRACE,org.elasticsearch.common.breaker:TRACE") - public void testRamAccountingTermsEnum() { - final Client client = client(); - - try { - // Create an index where the mappings have a field data filter - assertAcked(prepareCreate("ramtest").setSource("{\"mappings\": {\"type\": {\"properties\": {\"test\": " + - "{\"type\": \"string\",\"fielddata\": {\"filter\": {\"regex\": {\"pattern\": \"^value.*\"}}}}}}}}")); - - // Wait 10 seconds for green - client.admin().cluster().prepareHealth("ramtest").setWaitForGreenStatus().setTimeout("10s").execute().actionGet(); - - // index some different terms so we have some field data for loading - int docCount = scaledRandomIntBetween(300, 1000); - for (long id = 0; id < docCount; id++) { - client.prepareIndex("ramtest", "type", Long.toString(id)) - .setSource(MapBuilder.newMapBuilder().put("test", "value" + id).map()).execute().actionGet(); - } - - // refresh - refresh(); - - // execute a search that loads field data (sorting on the "test" field) - client.prepareSearch("ramtest").setSource("{\"sort\": \"test\",\"query\":{\"match_all\":{}}}") - .execute().actionGet(); - - // clear field data cache (thus setting the loaded field data back to 0) - client.admin().indices().prepareClearCache("ramtest").setFieldDataCache(true).execute().actionGet(); - - // Update circuit breaker settings - Settings settings = settingsBuilder() - .put(InternalCircuitBreakerService.CIRCUIT_BREAKER_MAX_BYTES_SETTING, randomRidiculouslySmallLimit()) - .put(InternalCircuitBreakerService.CIRCUIT_BREAKER_OVERHEAD_SETTING, 1.05) - .build(); - client.admin().cluster().prepareUpdateSettings().setTransientSettings(settings).execute().actionGet(); - - // execute a search that loads field data (sorting on the "test" field) - // again, this time it should trip the breaker - assertFailures(client.prepareSearch("ramtest").setSource("{\"sort\": \"test\",\"query\":{\"match_all\":{}}}"), - RestStatus.INTERNAL_SERVER_ERROR, - containsString("Data too large, data for field [test] would be larger than limit of [100/100b]")); - - NodesStatsResponse stats = client.admin().cluster().prepareNodesStats().setBreaker(true).get(); - int breaks = 0; - for (NodeStats stat : stats.getNodes()) { - FieldDataBreakerStats breakerStats = stat.getBreaker(); - breaks += breakerStats.getTrippedCount(); - } - assertThat(breaks, greaterThanOrEqualTo(1)); - - } finally { - // Reset settings - Settings resetSettings = settingsBuilder() - .put(InternalCircuitBreakerService.CIRCUIT_BREAKER_MAX_BYTES_SETTING, "-1") - .put(InternalCircuitBreakerService.CIRCUIT_BREAKER_OVERHEAD_SETTING, InternalCircuitBreakerService.DEFAULT_OVERHEAD_CONSTANT) - .build(); - client.admin().cluster().prepareUpdateSettings().setTransientSettings(resetSettings).execute().actionGet(); - } - } -} diff --git a/src/test/java/org/elasticsearch/indices/memory/breaker/CircuitBreakerServiceTests.java b/src/test/java/org/elasticsearch/indices/memory/breaker/CircuitBreakerServiceTests.java new file mode 100644 index 0000000000000..40aaf2899bdb7 --- /dev/null +++ b/src/test/java/org/elasticsearch/indices/memory/breaker/CircuitBreakerServiceTests.java @@ -0,0 +1,288 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.indices.memory.breaker; + +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; +import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.collect.MapBuilder; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.indices.breaker.CircuitBreakerStats; +import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.search.sort.SortOrder; +import org.elasticsearch.test.ElasticsearchIntegrationTest; +import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope; +import org.elasticsearch.test.junit.annotations.TestLogging; +import org.junit.Test; + +import java.util.Arrays; + +import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; +import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; +import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; +import static org.elasticsearch.search.aggregations.AggregationBuilders.cardinality; +import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope.TEST; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertFailures; +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.Matchers.*; + +/** + * Integration tests for InternalCircuitBreakerService + */ +@ClusterScope(scope = TEST, randomDynamicTemplates = false) +public class CircuitBreakerServiceTests extends ElasticsearchIntegrationTest { + + private String randomRidiculouslySmallLimit() { + // 3 different ways to say 100 bytes + return randomFrom(Arrays.asList("100b", "100")); + //, (10000. / JvmInfo.jvmInfo().getMem().getHeapMax().bytes()) + "%")); // this is prone to rounding errors and will fail if JVM memory changes! + } + + @Test + @TestLogging("org.elasticsearch.indices.memory.breaker:TRACE,org.elasticsearch.index.fielddata:TRACE,org.elasticsearch.common.breaker:TRACE") + public void testMemoryBreaker() { + assertAcked(prepareCreate("cb-test", 1, settingsBuilder().put(SETTING_NUMBER_OF_REPLICAS, between(0, 1)))); + final Client client = client(); + + try { + // index some different terms so we have some field data for loading + int docCount = scaledRandomIntBetween(300, 1000); + for (long id = 0; id < docCount; id++) { + client.prepareIndex("cb-test", "type", Long.toString(id)) + .setSource(MapBuilder.newMapBuilder().put("test", "value" + id).map()).execute().actionGet(); + } + + // refresh + refresh(); + + // execute a search that loads field data (sorting on the "test" field) + client.prepareSearch("cb-test").setSource("{\"sort\": \"test\",\"query\":{\"match_all\":{}}}") + .execute().actionGet(); + + // clear field data cache (thus setting the loaded field data back to 0) + client.admin().indices().prepareClearCache("cb-test").setFieldDataCache(true).execute().actionGet(); + + // Update circuit breaker settings + Settings settings = settingsBuilder() + .put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING, randomRidiculouslySmallLimit()) + .put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING, 1.05) + .build(); + client.admin().cluster().prepareUpdateSettings().setTransientSettings(settings).execute().actionGet(); + + // execute a search that loads field data (sorting on the "test" field) + // again, this time it should trip the breaker + assertFailures(client.prepareSearch("cb-test").setSource("{\"sort\": \"test\",\"query\":{\"match_all\":{}}}"), + RestStatus.INTERNAL_SERVER_ERROR, + containsString("Data too large, data for [test] would be larger than limit of [100/100b]")); + + NodesStatsResponse stats = client.admin().cluster().prepareNodesStats().setBreaker(true).get(); + int breaks = 0; + for (NodeStats stat : stats.getNodes()) { + CircuitBreakerStats breakerStats = stat.getBreaker().getStats(CircuitBreaker.Name.FIELDDATA); + breaks += breakerStats.getTrippedCount(); + } + assertThat(breaks, greaterThanOrEqualTo(1)); + } finally { + // Reset settings + Settings resetSettings = settingsBuilder() + .put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING, "-1") + .put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING, + HierarchyCircuitBreakerService.DEFAULT_FIELDDATA_OVERHEAD_CONSTANT) + .build(); + client.admin().cluster().prepareUpdateSettings().setTransientSettings(resetSettings).execute().actionGet(); + } + } + + @Test + @TestLogging("org.elasticsearch.indices.memory.breaker:TRACE,org.elasticsearch.index.fielddata:TRACE,org.elasticsearch.common.breaker:TRACE") + public void testRamAccountingTermsEnum() { + final Client client = client(); + + try { + // Create an index where the mappings have a field data filter + assertAcked(prepareCreate("ramtest").setSource("{\"mappings\": {\"type\": {\"properties\": {\"test\": " + + "{\"type\": \"string\",\"fielddata\": {\"filter\": {\"regex\": {\"pattern\": \"^value.*\"}}}}}}}}")); + + // Wait 10 seconds for green + client.admin().cluster().prepareHealth("ramtest").setWaitForGreenStatus().setTimeout("10s").execute().actionGet(); + + // index some different terms so we have some field data for loading + int docCount = scaledRandomIntBetween(300, 1000); + for (long id = 0; id < docCount; id++) { + client.prepareIndex("ramtest", "type", Long.toString(id)) + .setSource(MapBuilder.newMapBuilder().put("test", "value" + id).map()).execute().actionGet(); + } + + // refresh + refresh(); + + // execute a search that loads field data (sorting on the "test" field) + client.prepareSearch("ramtest").setSource("{\"sort\": \"test\",\"query\":{\"match_all\":{}}}") + .execute().actionGet(); + + // clear field data cache (thus setting the loaded field data back to 0) + client.admin().indices().prepareClearCache("ramtest").setFieldDataCache(true).execute().actionGet(); + + // Update circuit breaker settings + Settings settings = settingsBuilder() + .put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING, randomRidiculouslySmallLimit()) + .put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING, 1.05) + .build(); + client.admin().cluster().prepareUpdateSettings().setTransientSettings(settings).execute().actionGet(); + + // execute a search that loads field data (sorting on the "test" field) + // again, this time it should trip the breaker + assertFailures(client.prepareSearch("ramtest").setSource("{\"sort\": \"test\",\"query\":{\"match_all\":{}}}"), + RestStatus.INTERNAL_SERVER_ERROR, + containsString("Data too large, data for [test] would be larger than limit of [100/100b]")); + + NodesStatsResponse stats = client.admin().cluster().prepareNodesStats().setBreaker(true).get(); + int breaks = 0; + for (NodeStats stat : stats.getNodes()) { + CircuitBreakerStats breakerStats = stat.getBreaker().getStats(CircuitBreaker.Name.FIELDDATA); + breaks += breakerStats.getTrippedCount(); + } + assertThat(breaks, greaterThanOrEqualTo(1)); + + } finally { + // Reset settings + Settings resetSettings = settingsBuilder() + .put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING, "-1") + .put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING, + HierarchyCircuitBreakerService.DEFAULT_FIELDDATA_OVERHEAD_CONSTANT) + .build(); + client.admin().cluster().prepareUpdateSettings().setTransientSettings(resetSettings).execute().actionGet(); + } + } + + /** + * Test that a breaker correctly redistributes to a different breaker, in + * this case, the fielddata breaker borrows space from the request breaker + */ + @Test + @TestLogging("org.elasticsearch.indices.memory.breaker:TRACE,org.elasticsearch.index.fielddata:TRACE,org.elasticsearch.common.breaker:TRACE") + public void testParentChecking() { + assertAcked(prepareCreate("cb-test", 1, settingsBuilder().put(SETTING_NUMBER_OF_REPLICAS, between(0, 1)))); + Client client = client(); + + try { + // index some different terms so we have some field data for loading + int docCount = scaledRandomIntBetween(300, 1000); + for (long id = 0; id < docCount; id++) { + client.prepareIndex("cb-test", "type", Long.toString(id)) + .setSource(MapBuilder.newMapBuilder().put("test", "value" + id).map()).execute().actionGet(); + } + refresh(); + + // We need the request limit beforehand, just from a single node because the limit should always be the same + long beforeReqLimit = client.admin().cluster().prepareNodesStats().setBreaker(true).get() + .getNodes()[0].getBreaker().getStats(CircuitBreaker.Name.REQUEST).getLimit(); + + Settings resetSettings = settingsBuilder() + .put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING, "10b") + .put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING, 1.0) + .build(); + client.admin().cluster().prepareUpdateSettings().setTransientSettings(resetSettings).execute().actionGet(); + + // Perform a search to load field data for the "test" field + try { + client.prepareSearch("cb-test").setQuery(matchAllQuery()).addSort("test", SortOrder.DESC).get(); + } catch (Exception e) { + String errMsg = "[FIELDDATA] Data too large, data for [test] would be larger than limit of [10/10b]"; + assertThat("Exception: " + ExceptionsHelper.unwrapCause(e) + " should contain a CircuitBreakingException", + ExceptionsHelper.unwrapCause(e).getMessage().contains(errMsg), equalTo(true)); + } + + assertFailures(client.prepareSearch("cb-test").setSource("{\"sort\": \"test\",\"query\":{\"match_all\":{}}}"), + RestStatus.INTERNAL_SERVER_ERROR, + containsString("Data too large, data for [test] would be larger than limit of [10/10b]")); + + // Adjust settings so the parent breaker will fail, but the fielddata breaker doesn't + resetSettings = settingsBuilder() + .put(HierarchyCircuitBreakerService.TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING, "15b") + .put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING, "90%") + .put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING, 1.0) + .build(); + client.admin().cluster().prepareUpdateSettings().setTransientSettings(resetSettings).execute().actionGet(); + + // Perform a search to load field data for the "test" field + try { + client.prepareSearch("cb-test").setQuery(matchAllQuery()).addSort("test", SortOrder.DESC).get(); + } catch (Exception e) { + String errMsg = "[PARENT] Data too large, data for [test] would be larger than limit of [15/15b]"; + assertThat("Exception: " + ExceptionsHelper.unwrapCause(e) + " should contain a CircuitBreakingException", + ExceptionsHelper.unwrapCause(e).getMessage().contains(errMsg), equalTo(true)); + } + + } finally { + Settings resetSettings = settingsBuilder() + .put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING, "-1") + .put(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING, HierarchyCircuitBreakerService.DEFAULT_REQUEST_BREAKER_LIMIT) + .put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING, + HierarchyCircuitBreakerService.DEFAULT_FIELDDATA_OVERHEAD_CONSTANT) + .build(); + client.admin().cluster().prepareUpdateSettings().setTransientSettings(resetSettings).execute().actionGet(); + } + } + + @Test + @TestLogging("org.elasticsearch.indices.memory.breaker:TRACE,org.elasticsearch.index.fielddata:TRACE,org.elasticsearch.common.breaker:TRACE") + public void testRequestBreaker() { + assertAcked(prepareCreate("cb-test", 1, settingsBuilder().put(SETTING_NUMBER_OF_REPLICAS, between(0, 1)))); + Client client = client(); + + try { + // Make request breaker limited to a small amount + Settings resetSettings = settingsBuilder() + .put(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING, "10b") + .build(); + client.admin().cluster().prepareUpdateSettings().setTransientSettings(resetSettings).execute().actionGet(); + + // index some different terms so we have some field data for loading + int docCount = scaledRandomIntBetween(300, 1000); + for (long id = 0; id < docCount; id++) { + client.prepareIndex("cb-test", "type", Long.toString(id)) + .setSource(MapBuilder.newMapBuilder().put("test", id).map()).execute().actionGet(); + } + refresh(); + + // A cardinality aggregation uses BigArrays and thus the REQUEST breaker + try { + client.prepareSearch("cb-test").setQuery(matchAllQuery()).addAggregation(cardinality("card").field("test")).get(); + fail("aggregation should have tripped the breaker"); + } catch (Exception e) { + String errMsg = "CircuitBreakingException[[REQUEST] Data too large, data for [] would be larger than limit of [10/10b]]"; + assertThat("Exception: " + ExceptionsHelper.unwrapCause(e) + " should contain a CircuitBreakingException", + ExceptionsHelper.unwrapCause(e).getMessage().contains(errMsg), equalTo(true)); + } + } finally { + Settings resetSettings = settingsBuilder() + .put(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING, + HierarchyCircuitBreakerService.DEFAULT_REQUEST_BREAKER_LIMIT) + .build(); + client.admin().cluster().prepareUpdateSettings().setTransientSettings(resetSettings).execute().actionGet(); + } + } + +} diff --git a/src/test/java/org/elasticsearch/indices/memory/breaker/CircuitBreakerUnitTests.java b/src/test/java/org/elasticsearch/indices/memory/breaker/CircuitBreakerUnitTests.java new file mode 100644 index 0000000000000..cd47ffef87360 --- /dev/null +++ b/src/test/java/org/elasticsearch/indices/memory/breaker/CircuitBreakerUnitTests.java @@ -0,0 +1,66 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.indices.memory.breaker; + +import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.indices.breaker.BreakerSettings; +import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService; +import org.elasticsearch.test.ElasticsearchTestCase; +import org.junit.Test; + +import static org.hamcrest.Matchers.equalTo; + +/** + * Unit tests for the circuit breaker + */ +public class CircuitBreakerUnitTests extends ElasticsearchTestCase { + + public static long pctBytes(String percentString) { + return ImmutableSettings.EMPTY.getAsMemory("", percentString).bytes(); + } + + @Test + public void testBreakerSettingsValidationWithValidSettings() { + // parent: {:limit 70}, fd: {:limit 50}, request: {:limit 20} + BreakerSettings fd = new BreakerSettings(CircuitBreaker.Name.FIELDDATA, pctBytes("50%"), 1.0); + BreakerSettings request = new BreakerSettings(CircuitBreaker.Name.REQUEST, pctBytes("20%"), 1.0); + HierarchyCircuitBreakerService.validateSettings(new BreakerSettings[]{fd, request}); + + // parent: {:limit 70}, fd: {:limit 40}, request: {:limit 30} + fd = new BreakerSettings(CircuitBreaker.Name.FIELDDATA, pctBytes("40%"), 1.0); + request = new BreakerSettings(CircuitBreaker.Name.REQUEST, pctBytes("30%"), 1.0); + HierarchyCircuitBreakerService.validateSettings(new BreakerSettings[]{fd, request}); + } + + @Test + public void testBreakerSettingsValidationNegativeOverhead() { + // parent: {:limit 70}, fd: {:limit 50}, request: {:limit 20} + BreakerSettings fd = new BreakerSettings(CircuitBreaker.Name.FIELDDATA, pctBytes("50%"), -0.1); + BreakerSettings request = new BreakerSettings(CircuitBreaker.Name.REQUEST, pctBytes("20%"), 1.0); + try { + HierarchyCircuitBreakerService.validateSettings(new BreakerSettings[]{fd, request}); + fail("settings are invalid but validate settings did not throw an exception"); + } catch (Exception e) { + assertThat("Incorrect message: " + e.getMessage(), + e.getMessage().contains("must be non-negative"), equalTo(true)); + } + } +} diff --git a/src/test/java/org/elasticsearch/indices/fielddata/breaker/RandomExceptionCircuitBreakerTests.java b/src/test/java/org/elasticsearch/indices/memory/breaker/RandomExceptionCircuitBreakerTests.java similarity index 96% rename from src/test/java/org/elasticsearch/indices/fielddata/breaker/RandomExceptionCircuitBreakerTests.java rename to src/test/java/org/elasticsearch/indices/memory/breaker/RandomExceptionCircuitBreakerTests.java index 2bd7f4ab0d244..cbf52f53da06a 100644 --- a/src/test/java/org/elasticsearch/indices/fielddata/breaker/RandomExceptionCircuitBreakerTests.java +++ b/src/test/java/org/elasticsearch/indices/memory/breaker/RandomExceptionCircuitBreakerTests.java @@ -17,7 +17,7 @@ * under the License. */ -package org.elasticsearch.indices.fielddata.breaker; +package org.elasticsearch.indices.memory.breaker; import org.apache.lucene.index.AtomicReader; import org.apache.lucene.index.DirectoryReader; @@ -29,6 +29,7 @@ import org.elasticsearch.action.search.SearchPhaseExecutionException; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.client.Requests; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; @@ -58,7 +59,7 @@ public class RandomExceptionCircuitBreakerTests extends ElasticsearchIntegration public void testBreakerWithRandomExceptions() throws IOException, InterruptedException, ExecutionException { for (NodeStats node : client().admin().cluster().prepareNodesStats() .clear().setBreaker(true).execute().actionGet().getNodes()) { - assertThat("Breaker is not set to 0", node.getBreaker().getEstimated(), equalTo(0L)); + assertThat("Breaker is not set to 0", node.getBreaker().getStats(CircuitBreaker.Name.FIELDDATA).getEstimated(), equalTo(0L)); } String mapping = XContentFactory.jsonBuilder() @@ -144,7 +145,7 @@ public void testBreakerWithRandomExceptions() throws IOException, InterruptedExc NodesStatsResponse resp = client().admin().cluster().prepareNodesStats() .clear().setBreaker(true).execute().actionGet(); for (NodeStats stats : resp.getNodes()) { - assertThat("Breaker is set to 0", stats.getBreaker().getEstimated(), equalTo(0L)); + assertThat("Breaker is set to 0", stats.getBreaker().getStats(CircuitBreaker.Name.FIELDDATA).getEstimated(), equalTo(0L)); } for (int i = 0; i < numSearches; i++) { @@ -180,7 +181,8 @@ public void testBreakerWithRandomExceptions() throws IOException, InterruptedExc NodesStatsResponse nodeStats = client().admin().cluster().prepareNodesStats() .clear().setBreaker(true).execute().actionGet(); for (NodeStats stats : nodeStats.getNodes()) { - assertThat("Breaker reset to 0 last search success: " + success + " mapping: " + mapping, stats.getBreaker().getEstimated(), equalTo(0L)); + assertThat("Breaker reset to 0 last search success: " + success + " mapping: " + mapping, + stats.getBreaker().getStats(CircuitBreaker.Name.FIELDDATA).getEstimated(), equalTo(0L)); } } } diff --git a/src/test/java/org/elasticsearch/test/TestCluster.java b/src/test/java/org/elasticsearch/test/TestCluster.java index ac51c728c935a..3dde4ad2d1eac 100644 --- a/src/test/java/org/elasticsearch/test/TestCluster.java +++ b/src/test/java/org/elasticsearch/test/TestCluster.java @@ -26,6 +26,7 @@ import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.indices.IndexMissingException; @@ -194,8 +195,10 @@ public void ensureEstimatedStats() { NodesStatsResponse nodeStats = client().admin().cluster().prepareNodesStats() .clear().setBreaker(true).execute().actionGet(); for (NodeStats stats : nodeStats.getNodes()) { - assertThat("Breaker not reset to 0 on node: " + stats.getNode(), - stats.getBreaker().getEstimated(), equalTo(0L)); + assertThat("Fielddata breaker not reset to 0 on node: " + stats.getNode(), + stats.getBreaker().getStats(CircuitBreaker.Name.FIELDDATA).getEstimated(), equalTo(0L)); + assertThat("Request breaker not reset to 0 on node: " + stats.getNode(), + stats.getBreaker().getStats(CircuitBreaker.Name.REQUEST).getEstimated(), equalTo(0L)); } } } diff --git a/src/test/java/org/elasticsearch/test/TestSearchContext.java b/src/test/java/org/elasticsearch/test/TestSearchContext.java index 360b4f41ee24b..e4599610dcc3c 100644 --- a/src/test/java/org/elasticsearch/test/TestSearchContext.java +++ b/src/test/java/org/elasticsearch/test/TestSearchContext.java @@ -81,7 +81,7 @@ public class TestSearchContext extends SearchContext { public TestSearchContext(ThreadPool threadPool, CacheRecycler cacheRecycler, PageCacheRecycler pageCacheRecycler, BigArrays bigArrays, IndexService indexService, FilterCache filterCache, IndexFieldDataService indexFieldDataService) { this.cacheRecycler = cacheRecycler; this.pageCacheRecycler = pageCacheRecycler; - this.bigArrays = bigArrays; + this.bigArrays = bigArrays.withCircuitBreaking(); this.indexService = indexService; this.filterCache = filterCache; this.indexFieldDataService = indexFieldDataService; diff --git a/src/test/java/org/elasticsearch/test/cache/recycler/MockBigArrays.java b/src/test/java/org/elasticsearch/test/cache/recycler/MockBigArrays.java index 0bd19b0d0ab0c..44a85c86b391f 100644 --- a/src/test/java/org/elasticsearch/test/cache/recycler/MockBigArrays.java +++ b/src/test/java/org/elasticsearch/test/cache/recycler/MockBigArrays.java @@ -29,6 +29,7 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.*; +import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.test.ElasticsearchTestCase; import java.util.*; @@ -68,43 +69,23 @@ public boolean apply(Object input) { } } } - // arrays that are not fully released. - ArrayList badBigArrays = new ArrayList<>(); - synchronized (INSTANCES) { - for (final BigArrays bigArrays : INSTANCES) { - // BigArrays are used on the network layer and the cluster is shared across tests so nodes might still be talking to - // each other a bit after the test finished, wait a bit for things to stabilize if so - final boolean sizeIsZero = ElasticsearchTestCase.awaitBusy(new Predicate() { - @Override - public boolean apply(Object input) { - return bigArrays.sizeInBytes() == 0; - } - }); - if (!sizeIsZero) { - final long sizeInBytes = bigArrays.sizeInBytes(); - if (sizeInBytes != 0) { - badBigArrays.add(bigArrays); - } - } - } - } - - if (!badBigArrays.isEmpty()) { - INSTANCES.removeAll(badBigArrays); - StringBuilder msg = new StringBuilder("Found [").append(badBigArrays.size()).append("] big arrays which were not fully released. Here is a list of the first 20:"); - for (int i = 0; i < Math.min(20, badBigArrays.size()); i++) { - msg.append("\nbigArray instance with [").append(badBigArrays.get(i).sizeInBytes()).append("] bytes"); - } - throw new AssertionError(msg.toString()); - } - } private final Random random; + private final Settings settings; + private final PageCacheRecycler recycler; + private final CircuitBreakerService breakerService; @Inject - public MockBigArrays(Settings settings, PageCacheRecycler recycler) { - super(settings, recycler); + public MockBigArrays(Settings settings, PageCacheRecycler recycler, CircuitBreakerService breakerService) { + this(settings, recycler, breakerService, false); + } + + public MockBigArrays(Settings settings, PageCacheRecycler recycler, CircuitBreakerService breakerService, boolean checkBreaker) { + super(settings, recycler, breakerService, checkBreaker); + this.settings = settings; + this.recycler = recycler; + this.breakerService = breakerService; long seed; try { seed = SeedUtils.parseSeed(RandomizedContext.current().getRunnerSeedAsString()); @@ -115,6 +96,12 @@ public MockBigArrays(Settings settings, PageCacheRecycler recycler) { INSTANCES.add(this); } + + @Override + public BigArrays withCircuitBreaking() { + return new MockBigArrays(this.settings, this.recycler, this.breakerService, true); + } + @Override public ByteArray newByteArray(long size, boolean clearOnResize) { final ByteArrayWrapper array = new ByteArrayWrapper(super.newByteArray(size, clearOnResize), clearOnResize);