diff --git a/docs/reference/index-modules/fielddata.asciidoc b/docs/reference/index-modules/fielddata.asciidoc index 2afd89ed19c11..d36fe47d837d2 100644 --- a/docs/reference/index-modules/fielddata.asciidoc +++ b/docs/reference/index-modules/fielddata.asciidoc @@ -24,28 +24,63 @@ field data after a certain time of inactivity. Defaults to `-1`. For example, can be set to `5m` for a 5 minute expiry. |======================================================================= +[float] +[[circuit-breaker]] +=== Circuit Breaker + +coming[1.4.0,Prior to 1.4.0 there was only a single circuit breaker for fielddata] + +Elasticsearch contains multiple circuit breakers used to prevent operations from +causing an OutOfMemoryError. Each breaker specifies a limit for how much memory +it can use. Additionally, there is a parent-level breaker that specifies the +total amount of memory that can be used across all breakers. + +The parent-level breaker can be configured with the following setting: + +`indices.breaker.total.limit`:: + Starting limit for overall parent breaker, defaults to 70% of JVM heap + +All circuit breaker settings can be changed dynamically using the cluster update +settings API. + [float] [[fielddata-circuit-breaker]] -=== Field data circuit breaker +==== Field data circuit breaker The field data circuit breaker allows Elasticsearch to estimate the amount of memory a field will required to be loaded into memory. It can then prevent the field data loading by raising an exception. By default the limit is configured to 60% of the maximum JVM heap. It can be configured with the following parameters: -[cols="<,<",options="header",] -|======================================================================= -|Setting |Description -|`indices.fielddata.breaker.limit` |Maximum size of estimated field data -to allow loading. Defaults to 60% of the maximum JVM heap. -|`indices.fielddata.breaker.overhead` |A constant that all field data -estimations are multiplied with to determine a final estimation. Defaults to -1.03 -|======================================================================= +`indices.breaker.fielddata.limit`:: + Limit for fielddata breaker, defaults to 60% of JVM heap + +`indices.breaker.fielddata.overhead`:: + A constant that all field data estimations are multiplied with to determine a + final estimation. Defaults to 1.03 + +`indices.fielddata.breaker.limit`:: + deprecated[1.4.0,Replaced by `indices.breaker.fielddata.limit`] + +`indices.fielddata.breaker.overhead`:: + deprecated[1.4.0,Replaced by `indices.breaker.fielddata.overhead`] + +[float] +[[request-circuit-breaker]] +==== Request circuit breaker + +coming[1.4.0] + +The request circuit breaker allows Elasticsearch to prevent per-request data +structures (for example, memory used for calculating aggregations during a +request) from exceeding a certain amount of memory. + +`indices.breaker.request.limit`:: + Limit for request breaker, defaults to 40% of JVM heap -Both the `indices.fielddata.breaker.limit` and -`indices.fielddata.breaker.overhead` can be changed dynamically using the -cluster update settings API. +`indices.breaker.request.overhead`:: + A constant that all request estimations are multiplied with to determine a + final estimation. Defaults to 1 [float] [[fielddata-monitoring]] diff --git a/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStats.java b/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStats.java index 0910628188350..7ee06cb7002b5 100644 --- a/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStats.java +++ b/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStats.java @@ -28,7 +28,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.http.HttpStats; import org.elasticsearch.indices.NodeIndicesStats; -import org.elasticsearch.indices.fielddata.breaker.FieldDataBreakerStats; +import org.elasticsearch.indices.breaker.AllCircuitBreakerStats; import org.elasticsearch.monitor.fs.FsStats; import org.elasticsearch.monitor.jvm.JvmStats; import org.elasticsearch.monitor.network.NetworkStats; @@ -75,7 +75,7 @@ public class NodeStats extends NodeOperationResponse implements ToXContent { private HttpStats http; @Nullable - private FieldDataBreakerStats breaker; + private AllCircuitBreakerStats breaker; NodeStats() { } @@ -83,7 +83,7 @@ public class NodeStats extends NodeOperationResponse implements ToXContent { public NodeStats(DiscoveryNode node, long timestamp, @Nullable NodeIndicesStats indices, @Nullable OsStats os, @Nullable ProcessStats process, @Nullable JvmStats jvm, @Nullable ThreadPoolStats threadPool, @Nullable NetworkStats network, @Nullable FsStats fs, @Nullable TransportStats transport, @Nullable HttpStats http, - @Nullable FieldDataBreakerStats breaker) { + @Nullable AllCircuitBreakerStats breaker) { super(node); this.timestamp = timestamp; this.indices = indices; @@ -174,7 +174,7 @@ public HttpStats getHttp() { } @Nullable - public FieldDataBreakerStats getBreaker() { + public AllCircuitBreakerStats getBreaker() { return this.breaker; } @@ -215,7 +215,7 @@ public void readFrom(StreamInput in) throws IOException { if (in.readBoolean()) { http = HttpStats.readHttpStats(in); } - breaker = FieldDataBreakerStats.readOptionalCircuitBreakerStats(in); + breaker = AllCircuitBreakerStats.readOptionalAllCircuitBreakerStats(in); } @Override diff --git a/src/main/java/org/elasticsearch/client/transport/TransportClient.java b/src/main/java/org/elasticsearch/client/transport/TransportClient.java index f3957f8b3b3b8..3201761e54aff 100644 --- a/src/main/java/org/elasticsearch/client/transport/TransportClient.java +++ b/src/main/java/org/elasticsearch/client/transport/TransportClient.java @@ -76,6 +76,7 @@ import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.env.Environment; import org.elasticsearch.env.EnvironmentModule; +import org.elasticsearch.indices.breaker.CircuitBreakerModule; import org.elasticsearch.monitor.MonitorService; import org.elasticsearch.node.internal.InternalSettingsPreparer; import org.elasticsearch.plugins.PluginsModule; @@ -187,6 +188,7 @@ public TransportClient(Settings pSettings, boolean loadConfigSettings) throws El modules.add(new TransportModule(this.settings)); modules.add(new ActionModule(true)); modules.add(new ClientTransportModule()); + modules.add(new CircuitBreakerModule(this.settings)); injector = modules.createInjector(); diff --git a/src/main/java/org/elasticsearch/cluster/settings/ClusterDynamicSettingsModule.java b/src/main/java/org/elasticsearch/cluster/settings/ClusterDynamicSettingsModule.java index 6a442a508e0d3..ab8a48bac1d2c 100644 --- a/src/main/java/org/elasticsearch/cluster/settings/ClusterDynamicSettingsModule.java +++ b/src/main/java/org/elasticsearch/cluster/settings/ClusterDynamicSettingsModule.java @@ -28,8 +28,9 @@ import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.discovery.DiscoverySettings; import org.elasticsearch.discovery.zen.elect.ElectMasterService; +import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService; import org.elasticsearch.indices.cache.filter.IndicesFilterCache; -import org.elasticsearch.indices.fielddata.breaker.InternalCircuitBreakerService; +import org.elasticsearch.indices.breaker.InternalCircuitBreakerService; import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.indices.store.IndicesStore; import org.elasticsearch.indices.ttl.IndicesTTLService; @@ -85,6 +86,11 @@ public ClusterDynamicSettingsModule() { clusterDynamicSettings.addDynamicSetting(InternalCircuitBreakerService.CIRCUIT_BREAKER_OVERHEAD_SETTING, Validator.NON_NEGATIVE_DOUBLE); clusterDynamicSettings.addDynamicSetting(DestructiveOperations.REQUIRES_NAME); clusterDynamicSettings.addDynamicSetting(DiscoverySettings.PUBLISH_TIMEOUT, Validator.TIME_NON_NEGATIVE); + clusterDynamicSettings.addDynamicSetting(HierarchyCircuitBreakerService.TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING, Validator.MEMORY_SIZE); + clusterDynamicSettings.addDynamicSetting(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING, Validator.MEMORY_SIZE); + clusterDynamicSettings.addDynamicSetting(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING, Validator.NON_NEGATIVE_DOUBLE); + clusterDynamicSettings.addDynamicSetting(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING, Validator.MEMORY_SIZE); + clusterDynamicSettings.addDynamicSetting(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_OVERHEAD_SETTING, Validator.NON_NEGATIVE_DOUBLE); } public void addDynamicSettings(String... settings) { diff --git a/src/main/java/org/elasticsearch/common/breaker/ChildMemoryCircuitBreaker.java b/src/main/java/org/elasticsearch/common/breaker/ChildMemoryCircuitBreaker.java new file mode 100644 index 0000000000000..1a55e4c3c74f7 --- /dev/null +++ b/src/main/java/org/elasticsearch/common/breaker/ChildMemoryCircuitBreaker.java @@ -0,0 +1,224 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.breaker; + +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.indices.breaker.BreakerSettings; +import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService; + +import java.util.concurrent.atomic.AtomicLong; + +/** + * Breaker that will check a parent's when incrementing + */ +public class ChildMemoryCircuitBreaker implements CircuitBreaker { + + private final long memoryBytesLimit; + private final BreakerSettings settings; + private final double overheadConstant; + private final AtomicLong used; + private final AtomicLong trippedCount; + private final ESLogger logger; + private final HierarchyCircuitBreakerService parent; + private final Name name; + + /** + * Create a circuit breaker that will break if the number of estimated + * bytes grows above the limit. All estimations will be multiplied by + * the given overheadConstant. This breaker starts with 0 bytes used. + * @param settings settings to configure this breaker + * @param parent parent circuit breaker service to delegate tripped breakers to + * @param name the name of the breaker + */ + public ChildMemoryCircuitBreaker(BreakerSettings settings, ESLogger logger, + HierarchyCircuitBreakerService parent, Name name) { + this(settings, null, logger, parent, name); + } + + /** + * Create a circuit breaker that will break if the number of estimated + * bytes grows above the limit. All estimations will be multiplied by + * the given overheadConstant. Uses the given oldBreaker to initialize + * the starting offset. + * @param settings settings to configure this breaker + * @param parent parent circuit breaker service to delegate tripped breakers to + * @param name the name of the breaker + * @param oldBreaker the previous circuit breaker to inherit the used value from (starting offset) + */ + public ChildMemoryCircuitBreaker(BreakerSettings settings, ChildMemoryCircuitBreaker oldBreaker, + ESLogger logger, HierarchyCircuitBreakerService parent, Name name) { + this.name = name; + this.settings = settings; + this.memoryBytesLimit = settings.getLimit(); + this.overheadConstant = settings.getOverhead(); + if (oldBreaker == null) { + this.used = new AtomicLong(0); + this.trippedCount = new AtomicLong(0); + } else { + this.used = oldBreaker.used; + this.trippedCount = oldBreaker.trippedCount; + } + this.logger = logger; + if (logger.isTraceEnabled()) { + logger.trace("creating ChildCircuitBreaker with settings {}", this.settings); + } + this.parent = parent; + } + + /** + * Method used to trip the breaker, delegates to the parent to determine + * whether to trip the breaker or not + */ + @Override + public void circuitBreak(String fieldName, long bytesNeeded) { + this.trippedCount.incrementAndGet(); + throw new CircuitBreakingException("[" + this.name + "] Data too large, data for [" + + fieldName + "] would be larger than limit of [" + + memoryBytesLimit + "/" + new ByteSizeValue(memoryBytesLimit) + "]", + bytesNeeded, this.memoryBytesLimit); + } + + /** + * Add a number of bytes, tripping the circuit breaker if the aggregated + * estimates are above the limit. Automatically trips the breaker if the + * memory limit is set to 0. Will never trip the breaker if the limit is + * set < 0, but can still be used to aggregate estimations. + * @param bytes number of bytes to add to the breaker + * @return number of "used" bytes so far + * @throws CircuitBreakingException + */ + @Override + public double addEstimateBytesAndMaybeBreak(long bytes, String label) throws CircuitBreakingException { + // short-circuit on no data allowed, immediately throwing an exception + if (memoryBytesLimit == 0) { + circuitBreak(label, bytes); + } + + long newUsed; + // If there is no limit (-1), we can optimize a bit by using + // .addAndGet() instead of looping (because we don't have to check a + // limit), which makes the RamAccountingTermsEnum case faster. + if (this.memoryBytesLimit == -1) { + newUsed = this.used.addAndGet(bytes); + if (logger.isTraceEnabled()) { + logger.trace("[{}] Adding [{}][{}] to used bytes [new used: [{}], limit: [-1b]]", + this.name, new ByteSizeValue(bytes), label, new ByteSizeValue(newUsed)); + } + } else { + // Otherwise, check the addition and commit the addition, looping if + // there are conflicts. May result in additional logging, but it's + // trace logging and shouldn't be counted on for additions. + long currentUsed; + do { + currentUsed = this.used.get(); + newUsed = currentUsed + bytes; + long newUsedWithOverhead = (long) (newUsed * overheadConstant); + if (logger.isTraceEnabled()) { + logger.trace("[{}] Adding [{}][{}] to used bytes [new used: [{}], limit: {} [{}], estimate: {} [{}]]", + this.name, + new ByteSizeValue(bytes), label, new ByteSizeValue(newUsed), + memoryBytesLimit, new ByteSizeValue(memoryBytesLimit), + newUsedWithOverhead, new ByteSizeValue(newUsedWithOverhead)); + } + if (memoryBytesLimit > 0 && newUsedWithOverhead > memoryBytesLimit) { + logger.error("[{}] New used memory {} [{}] from field [{}] would be larger than configured breaker: {} [{}], breaking", + this.name, + newUsedWithOverhead, new ByteSizeValue(newUsedWithOverhead), label, + memoryBytesLimit, new ByteSizeValue(memoryBytesLimit)); + circuitBreak(label, newUsedWithOverhead); + } + // Attempt to set the new used value, but make sure it hasn't changed + // underneath us, if it has, keep trying until we are able to set it + } while (!this.used.compareAndSet(currentUsed, newUsed)); + } + + // Additionally, we need to check that we haven't exceeded the parent's limit + try { + parent.checkParentLimit(label); + } catch (CircuitBreakingException e) { + // If the parent breaker is tripped, this breaker has to be + // adjusted back down because the allocation is "blocked" but the + // breaker has already been incremented + this.used.addAndGet(-bytes); + throw e; + } + return newUsed; + } + + /** + * Add an exact number of bytes, not checking for tripping the + * circuit breaker. This bypasses the overheadConstant multiplication. + * + * Also does not check with the parent breaker to see if the parent limit + * has been exceeded. + * + * @param bytes number of bytes to add to the breaker + * @return number of "used" bytes so far + */ + @Override + public long addWithoutBreaking(long bytes) { + long u = used.addAndGet(bytes); + if (logger.isTraceEnabled()) { + logger.trace("[{}] Adjusted breaker by [{}] bytes, now [{}]", this.name, bytes, u); + } + assert u >= 0 : "Used bytes: [" + u + "] must be >= 0"; + return u; + } + + /** + * @return the number of aggregated "used" bytes so far + */ + @Override + public long getUsed() { + return this.used.get(); + } + + /** + * @return the number of bytes that can be added before the breaker trips + */ + @Override + public long getLimit() { + return this.memoryBytesLimit; + } + + /** + * @return the constant multiplier the breaker uses for aggregations + */ + @Override + public double getOverhead() { + return this.overheadConstant; + } + + /** + * @return the number of times the breaker has been tripped + */ + @Override + public long getTrippedCount() { + return this.trippedCount.get(); + } + + /** + * @return the name of the breaker + */ + public Name getName() { + return this.name; + } +} diff --git a/src/main/java/org/elasticsearch/common/breaker/CircuitBreaker.java b/src/main/java/org/elasticsearch/common/breaker/CircuitBreaker.java new file mode 100644 index 0000000000000..541f6888a15dd --- /dev/null +++ b/src/main/java/org/elasticsearch/common/breaker/CircuitBreaker.java @@ -0,0 +1,116 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.breaker; + +import org.elasticsearch.ElasticsearchIllegalArgumentException; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; + +/** + * Interface for an object that can be incremented, breaking after some + * configured limit has been reached. + */ +public interface CircuitBreaker { + + /** + * Enum used for specifying different types of circuit breakers + */ + public static enum Name { + PARENT(0), + FIELDDATA(1), + REQUEST(2); + + private int ordinal; + + Name(int ordinal) { + this.ordinal = ordinal; + } + + public int getSerializableValue() { + return this.ordinal; + } + + public static Name readFrom(StreamInput in) throws IOException { + int value = in.readVInt(); + switch (value) { + case 0: + return Name.PARENT; + case 1: + return Name.FIELDDATA; + case 2: + return Name.REQUEST; + default: + throw new ElasticsearchIllegalArgumentException("No CircuitBreaker with ordinal: " + value); + } + } + + public static void writeTo(Name name, StreamOutput out) throws IOException { + out.writeVInt(name.getSerializableValue()); + } + } + + /** + * Trip the circuit breaker + * @param fieldName name of the field responsible for tripping the breaker + * @param bytesNeeded bytes asked for but unable to be allocated + */ + public void circuitBreak(String fieldName, long bytesNeeded); + + /** + * add bytes to the breaker and maybe trip + * @param bytes number of bytes to add + * @param label string label describing the bytes being added + * @return the number of "used" bytes for the circuit breaker + * @throws CircuitBreakingException + */ + public double addEstimateBytesAndMaybeBreak(long bytes, String label) throws CircuitBreakingException; + + /** + * Adjust the circuit breaker without tripping + */ + public long addWithoutBreaking(long bytes); + + /** + * @return the currently used bytes the breaker is tracking + */ + public long getUsed(); + + /** + * @return maximum number of bytes the circuit breaker can track before tripping + */ + public long getLimit(); + + /** + * @return overhead of circuit breaker + */ + public double getOverhead(); + + /** + * @return the number of times the circuit breaker has been tripped + */ + public long getTrippedCount(); + + /** + * @return the name of the breaker + */ + public Name getName(); +} diff --git a/src/main/java/org/elasticsearch/common/breaker/CircuitBreakingException.java b/src/main/java/org/elasticsearch/common/breaker/CircuitBreakingException.java index c2c2bf26d10f1..0b88a9dad7c93 100644 --- a/src/main/java/org/elasticsearch/common/breaker/CircuitBreakingException.java +++ b/src/main/java/org/elasticsearch/common/breaker/CircuitBreakingException.java @@ -25,8 +25,26 @@ */ public class CircuitBreakingException extends ElasticsearchException { - // TODO: maybe add more neat metrics here? + private final long bytesWanted; + private final long byteLimit; + public CircuitBreakingException(String message) { super(message); + this.bytesWanted = 0; + this.byteLimit = 0; + } + + public CircuitBreakingException(String message, long bytesWanted, long byteLimit) { + super(message); + this.bytesWanted = bytesWanted; + this.byteLimit = byteLimit; + } + + public long getBytesWanted() { + return this.bytesWanted; + } + + public long getByteLimit() { + return this.byteLimit; } } diff --git a/src/main/java/org/elasticsearch/common/breaker/MemoryCircuitBreaker.java b/src/main/java/org/elasticsearch/common/breaker/MemoryCircuitBreaker.java index 9e157c31d21ec..e22753a326a0f 100644 --- a/src/main/java/org/elasticsearch/common/breaker/MemoryCircuitBreaker.java +++ b/src/main/java/org/elasticsearch/common/breaker/MemoryCircuitBreaker.java @@ -27,7 +27,7 @@ * MemoryCircuitBreaker is a circuit breaker that breaks once a * configurable memory limit has been reached. */ -public class MemoryCircuitBreaker { +public class MemoryCircuitBreaker implements CircuitBreaker { private final long memoryBytesLimit; private final double overheadConstant; @@ -77,7 +77,7 @@ public MemoryCircuitBreaker(ByteSizeValue limit, double overheadConstant, Memory * Method used to trip the breaker * @throws CircuitBreakingException */ - public void circuitBreak(String fieldName) throws CircuitBreakingException { + public void circuitBreak(String fieldName, long bytesNeeded) throws CircuitBreakingException { this.trippedCount.incrementAndGet(); throw new CircuitBreakingException("Data too large, data for field [" + fieldName + "] would be larger than limit of [" + memoryBytesLimit + "/" + new ByteSizeValue(memoryBytesLimit) + "]"); @@ -92,10 +92,10 @@ public void circuitBreak(String fieldName) throws CircuitBreakingException { * @return number of "used" bytes so far * @throws CircuitBreakingException */ - public double addEstimateBytesAndMaybeBreak(long bytes, String fieldName) throws CircuitBreakingException { + public double addEstimateBytesAndMaybeBreak(long bytes, String label) throws CircuitBreakingException { // short-circuit on no data allowed, immediately throwing an exception if (memoryBytesLimit == 0) { - circuitBreak(fieldName); + circuitBreak(label, bytes); } long newUsed; @@ -106,7 +106,7 @@ public double addEstimateBytesAndMaybeBreak(long bytes, String fieldName) throws newUsed = this.used.addAndGet(bytes); if (logger.isTraceEnabled()) { logger.trace("Adding [{}][{}] to used bytes [new used: [{}], limit: [-1b]]", - new ByteSizeValue(bytes), fieldName, new ByteSizeValue(newUsed)); + new ByteSizeValue(bytes), label, new ByteSizeValue(newUsed)); } return newUsed; } @@ -121,15 +121,15 @@ public double addEstimateBytesAndMaybeBreak(long bytes, String fieldName) throws long newUsedWithOverhead = (long)(newUsed * overheadConstant); if (logger.isTraceEnabled()) { logger.trace("Adding [{}][{}] to used bytes [new used: [{}], limit: {} [{}], estimate: {} [{}]]", - new ByteSizeValue(bytes), fieldName, new ByteSizeValue(newUsed), + new ByteSizeValue(bytes), label, new ByteSizeValue(newUsed), memoryBytesLimit, new ByteSizeValue(memoryBytesLimit), newUsedWithOverhead, new ByteSizeValue(newUsedWithOverhead)); } if (memoryBytesLimit > 0 && newUsedWithOverhead > memoryBytesLimit) { logger.error("New used memory {} [{}] from field [{}] would be larger than configured breaker: {} [{}], breaking", - newUsedWithOverhead, new ByteSizeValue(newUsedWithOverhead), fieldName, + newUsedWithOverhead, new ByteSizeValue(newUsedWithOverhead), label, memoryBytesLimit, new ByteSizeValue(memoryBytesLimit)); - circuitBreak(fieldName); + circuitBreak(label, newUsedWithOverhead); } // Attempt to set the new used value, but make sure it hasn't changed // underneath us, if it has, keep trying until we are able to set it @@ -161,9 +161,9 @@ public long getUsed() { } /** - * @return the maximum number of bytes before the circuit breaker will trip + * @return the number of bytes that can be added before the breaker trips */ - public long getMaximum() { + public long getLimit() { return this.memoryBytesLimit; } @@ -180,4 +180,11 @@ public double getOverhead() { public long getTrippedCount() { return this.trippedCount.get(); } + + /** + * @return the name of the breaker + */ + public Name getName() { + return Name.FIELDDATA; + } } diff --git a/src/main/java/org/elasticsearch/common/unit/MemorySizeValue.java b/src/main/java/org/elasticsearch/common/unit/MemorySizeValue.java index 81a293459f29b..2204fe8cac54e 100644 --- a/src/main/java/org/elasticsearch/common/unit/MemorySizeValue.java +++ b/src/main/java/org/elasticsearch/common/unit/MemorySizeValue.java @@ -32,7 +32,7 @@ public enum MemorySizeValue { * 42 (default assumed unit is byte) or 2mb, or percentages of the heap size: if * the heap is 1G, 10% will be parsed as 100mb. */ public static ByteSizeValue parseBytesSizeValueOrHeapRatio(String sValue) { - if (sValue.endsWith("%")) { + if (sValue != null && sValue.endsWith("%")) { final String percentAsString = sValue.substring(0, sValue.length() - 1); try { final double percent = Double.parseDouble(percentAsString); diff --git a/src/main/java/org/elasticsearch/common/util/AbstractArray.java b/src/main/java/org/elasticsearch/common/util/AbstractArray.java index 7de3c1c3b7448..0c00a897333a5 100644 --- a/src/main/java/org/elasticsearch/common/util/AbstractArray.java +++ b/src/main/java/org/elasticsearch/common/util/AbstractArray.java @@ -33,7 +33,7 @@ abstract class AbstractArray implements BigArray { @Override public final void close() { - bigArrays.ramBytesUsed.addAndGet(-ramBytesUsed()); + bigArrays.adjustBreaker(-ramBytesUsed()); assert !released : "double release"; released = true; doClose(); diff --git a/src/main/java/org/elasticsearch/common/util/BigArrays.java b/src/main/java/org/elasticsearch/common/util/BigArrays.java index 5f362368e5ec3..a2f216361c419 100644 --- a/src/main/java/org/elasticsearch/common/util/BigArrays.java +++ b/src/main/java/org/elasticsearch/common/util/BigArrays.java @@ -23,8 +23,10 @@ import org.apache.lucene.util.ArrayUtil; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.RamUsageEstimator; -import org.elasticsearch.ElasticsearchIllegalStateException; import org.elasticsearch.cache.recycler.PageCacheRecycler; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.breaker.CircuitBreakingException; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.lease.Releasable; @@ -32,6 +34,7 @@ import org.elasticsearch.common.recycler.Recycler; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.indices.breaker.CircuitBreakerService; import java.util.Arrays; import java.util.concurrent.atomic.AtomicLong; @@ -39,9 +42,8 @@ /** Utility class to work with arrays. */ public class BigArrays extends AbstractComponent { - // TODO: switch to a circuit breaker that is shared not only on big arrays level, and applies to other request level data structures public static final String MAX_SIZE_IN_BYTES_SETTING = "requests.memory.breaker.limit"; - public static final BigArrays NON_RECYCLING_INSTANCE = new BigArrays(ImmutableSettings.EMPTY, null, Long.MAX_VALUE); + public static final BigArrays NON_RECYCLING_INSTANCE = new BigArrays(ImmutableSettings.EMPTY, null, null); /** Page size in bytes: 16KB */ public static final int PAGE_SIZE_IN_BYTES = 1 << 14; @@ -363,39 +365,73 @@ public T set(long index, T value) { } final PageCacheRecycler recycler; - final AtomicLong ramBytesUsed; - final long maxSizeInBytes; + final CircuitBreakerService breakerService; + final boolean checkBreaker; @Inject - public BigArrays(Settings settings, PageCacheRecycler recycler) { - this(settings, recycler, settings.getAsMemory(MAX_SIZE_IN_BYTES_SETTING, Long.toString(Long.MAX_VALUE)).bytes()); + public BigArrays(Settings settings, PageCacheRecycler recycler, @Nullable final CircuitBreakerService breakerService) { + // Checking the breaker is disabled if not specified + this(settings, recycler, breakerService, false); } - private BigArrays(Settings settings, PageCacheRecycler recycler, final long maxSizeInBytes) { + public BigArrays(Settings settings, PageCacheRecycler recycler, @Nullable final CircuitBreakerService breakerService, boolean checkBreaker) { super(settings); - this.maxSizeInBytes = maxSizeInBytes; + this.checkBreaker = checkBreaker; this.recycler = recycler; - ramBytesUsed = new AtomicLong(); + this.breakerService = breakerService; } - private void validate(long delta) { - final long totalSizeInBytes = ramBytesUsed.addAndGet(delta); - if (totalSizeInBytes > maxSizeInBytes) { - throw new ElasticsearchIllegalStateException("Maximum number of bytes allocated exceeded: [" + totalSizeInBytes + "] (> " + maxSizeInBytes + ")"); + /** + * Adjust the circuit breaker with the given delta, if the delta is + * negative, or checkBreaker is false, the breaker will be adjusted + * without tripping + */ + void adjustBreaker(long delta) { + if (this.breakerService != null) { + CircuitBreaker breaker = this.breakerService.getBreaker(CircuitBreaker.Name.REQUEST); + if (this.checkBreaker == true) { + // checking breaker means potentially tripping, but it doesn't + // have to if the delta is negative + if (delta > 0) { + try { + breaker.addEstimateBytesAndMaybeBreak(delta, ""); + } catch (CircuitBreakingException e) { + // since we've already created the data, we need to + // add it so closing the stream re-adjusts properly + breaker.addWithoutBreaking(delta); + // re-throw the original exception + throw e; + } + } else { + breaker.addWithoutBreaking(delta); + } + } else { + // even if we are not checking the breaker, we need to adjust + // its' totals, so add without breaking + breaker.addWithoutBreaking(delta); + } } } + /** + * Return a new instance of this BigArrays class with circuit breaking + * explicitly enabled, instead of only accounting enabled + */ + public BigArrays withCircuitBreaking() { + return new BigArrays(this.settings, this.recycler, this.breakerService, true); + } + private T resizeInPlace(T array, long newSize) { final long oldMemSize = array.ramBytesUsed(); array.resize(newSize); - validate(array.ramBytesUsed() - oldMemSize); + adjustBreaker(array.ramBytesUsed() - oldMemSize); return array; } private T validate(T array) { boolean success = false; try { - validate(array.ramBytesUsed()); + adjustBreaker(array.ramBytesUsed()); success = true; } finally { if (!success) { @@ -720,11 +756,4 @@ public ObjectArray grow(ObjectArray array, long minSize) { final long newSize = overSize(minSize, OBJECT_PAGE_SIZE, RamUsageEstimator.NUM_BYTES_OBJECT_REF); return resize(array, newSize); } - - /** - * Return an approximate number of bytes that have been allocated but not released yet. - */ - public long sizeInBytes() { - return ramBytesUsed.get(); - } } diff --git a/src/main/java/org/elasticsearch/index/fielddata/IndexFieldData.java b/src/main/java/org/elasticsearch/index/fielddata/IndexFieldData.java index 62faf5abce1e1..d8b925f735443 100644 --- a/src/main/java/org/elasticsearch/index/fielddata/IndexFieldData.java +++ b/src/main/java/org/elasticsearch/index/fielddata/IndexFieldData.java @@ -33,7 +33,7 @@ import org.elasticsearch.index.mapper.FieldMapper; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.settings.IndexSettings; -import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService; +import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.search.MultiValueMode; import java.io.IOException; diff --git a/src/main/java/org/elasticsearch/index/fielddata/IndexFieldDataService.java b/src/main/java/org/elasticsearch/index/fielddata/IndexFieldDataService.java index 7e7ba5158b0bd..105d1676466cd 100644 --- a/src/main/java/org/elasticsearch/index/fielddata/IndexFieldDataService.java +++ b/src/main/java/org/elasticsearch/index/fielddata/IndexFieldDataService.java @@ -38,11 +38,11 @@ import org.elasticsearch.index.mapper.internal.ParentFieldMapper; import org.elasticsearch.index.service.IndexService; import org.elasticsearch.index.settings.IndexSettings; -import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService; -import org.elasticsearch.indices.fielddata.breaker.NoneCircuitBreakerService; import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache; import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCacheListener; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.indices.breaker.CircuitBreakerService; +import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import java.util.ArrayList; import java.util.Collection; diff --git a/src/main/java/org/elasticsearch/index/fielddata/RamAccountingTermsEnum.java b/src/main/java/org/elasticsearch/index/fielddata/RamAccountingTermsEnum.java index 01a91879bd7f3..15aa961294c12 100644 --- a/src/main/java/org/elasticsearch/index/fielddata/RamAccountingTermsEnum.java +++ b/src/main/java/org/elasticsearch/index/fielddata/RamAccountingTermsEnum.java @@ -21,8 +21,8 @@ import org.apache.lucene.index.FilteredTermsEnum; import org.apache.lucene.index.TermsEnum; import org.apache.lucene.util.BytesRef; -import org.elasticsearch.common.breaker.MemoryCircuitBreaker; import org.elasticsearch.index.fielddata.plain.AbstractIndexFieldData; +import org.elasticsearch.common.breaker.CircuitBreaker; import java.io.IOException; @@ -36,7 +36,7 @@ public final class RamAccountingTermsEnum extends FilteredTermsEnum { // Flush every 5mb private static final long FLUSH_BUFFER_SIZE = 1024 * 1024 * 5; - private final MemoryCircuitBreaker breaker; + private final CircuitBreaker breaker; private final TermsEnum termsEnum; private final AbstractIndexFieldData.PerValueEstimator estimator; private final String fieldName; @@ -44,7 +44,7 @@ public final class RamAccountingTermsEnum extends FilteredTermsEnum { private long flushBuffer; - public RamAccountingTermsEnum(TermsEnum termsEnum, MemoryCircuitBreaker breaker, AbstractIndexFieldData.PerValueEstimator estimator, + public RamAccountingTermsEnum(TermsEnum termsEnum, CircuitBreaker breaker, AbstractIndexFieldData.PerValueEstimator estimator, String fieldName) { super(termsEnum); this.breaker = breaker; diff --git a/src/main/java/org/elasticsearch/index/fielddata/ordinals/GlobalOrdinalsBuilder.java b/src/main/java/org/elasticsearch/index/fielddata/ordinals/GlobalOrdinalsBuilder.java index 7c47c6ed15e5a..41f630371d216 100644 --- a/src/main/java/org/elasticsearch/index/fielddata/ordinals/GlobalOrdinalsBuilder.java +++ b/src/main/java/org/elasticsearch/index/fielddata/ordinals/GlobalOrdinalsBuilder.java @@ -23,11 +23,12 @@ import org.apache.lucene.index.RandomAccessOrds; import org.apache.lucene.index.XOrdinalMap; import org.apache.lucene.util.packed.PackedInts; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.fielddata.AtomicOrdinalsFieldData; import org.elasticsearch.index.fielddata.IndexOrdinalsFieldData; -import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService; +import org.elasticsearch.indices.breaker.CircuitBreakerService; import java.io.IOException; @@ -52,7 +53,7 @@ public static IndexOrdinalsFieldData build(final IndexReader indexReader, IndexO } final XOrdinalMap ordinalMap = XOrdinalMap.build(null, subs, PackedInts.DEFAULT); final long memorySizeInBytes = ordinalMap.ramBytesUsed(); - breakerService.getBreaker().addWithoutBreaking(memorySizeInBytes); + breakerService.getBreaker(CircuitBreaker.Name.FIELDDATA).addWithoutBreaking(memorySizeInBytes); if (logger.isDebugEnabled()) { logger.debug( diff --git a/src/main/java/org/elasticsearch/index/fielddata/plain/AbstractIndexOrdinalsFieldData.java b/src/main/java/org/elasticsearch/index/fielddata/plain/AbstractIndexOrdinalsFieldData.java index d3abf187ffd49..93547810e9dba 100644 --- a/src/main/java/org/elasticsearch/index/fielddata/plain/AbstractIndexOrdinalsFieldData.java +++ b/src/main/java/org/elasticsearch/index/fielddata/plain/AbstractIndexOrdinalsFieldData.java @@ -31,8 +31,8 @@ import org.elasticsearch.index.fielddata.fieldcomparator.BytesRefFieldComparatorSource; import org.elasticsearch.index.fielddata.ordinals.GlobalOrdinalsBuilder; import org.elasticsearch.index.mapper.FieldMapper.Names; -import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService; import org.elasticsearch.search.MultiValueMode; +import org.elasticsearch.indices.breaker.CircuitBreakerService; import java.io.IOException; import java.util.Map; diff --git a/src/main/java/org/elasticsearch/index/fielddata/plain/BytesBinaryDVIndexFieldData.java b/src/main/java/org/elasticsearch/index/fielddata/plain/BytesBinaryDVIndexFieldData.java index 1098f3c917a14..3864e72b62338 100644 --- a/src/main/java/org/elasticsearch/index/fielddata/plain/BytesBinaryDVIndexFieldData.java +++ b/src/main/java/org/elasticsearch/index/fielddata/plain/BytesBinaryDVIndexFieldData.java @@ -33,7 +33,7 @@ import org.elasticsearch.index.mapper.FieldMapper; import org.elasticsearch.index.mapper.FieldMapper.Names; import org.elasticsearch.index.mapper.MapperService; -import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService; +import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.search.MultiValueMode; import java.io.IOException; diff --git a/src/main/java/org/elasticsearch/index/fielddata/plain/DisabledIndexFieldData.java b/src/main/java/org/elasticsearch/index/fielddata/plain/DisabledIndexFieldData.java index 51da7ef7c1248..8ad8c5cfc5751 100644 --- a/src/main/java/org/elasticsearch/index/fielddata/plain/DisabledIndexFieldData.java +++ b/src/main/java/org/elasticsearch/index/fielddata/plain/DisabledIndexFieldData.java @@ -29,8 +29,8 @@ import org.elasticsearch.index.mapper.FieldMapper.Names; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.settings.IndexSettings; -import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService; import org.elasticsearch.search.MultiValueMode; +import org.elasticsearch.indices.breaker.CircuitBreakerService; /** * A field data implementation that forbids loading and will throw an {@link org.elasticsearch.ElasticsearchIllegalStateException} if you try to load diff --git a/src/main/java/org/elasticsearch/index/fielddata/plain/DocValuesIndexFieldData.java b/src/main/java/org/elasticsearch/index/fielddata/plain/DocValuesIndexFieldData.java index 0605b0e909e36..ec3cb13659abd 100644 --- a/src/main/java/org/elasticsearch/index/fielddata/plain/DocValuesIndexFieldData.java +++ b/src/main/java/org/elasticsearch/index/fielddata/plain/DocValuesIndexFieldData.java @@ -37,7 +37,7 @@ import org.elasticsearch.index.mapper.internal.IdFieldMapper; import org.elasticsearch.index.mapper.internal.TimestampFieldMapper; import org.elasticsearch.index.mapper.internal.UidFieldMapper; -import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService; +import org.elasticsearch.indices.breaker.CircuitBreakerService; import java.util.Map; import java.util.Set; diff --git a/src/main/java/org/elasticsearch/index/fielddata/plain/DoubleArrayIndexFieldData.java b/src/main/java/org/elasticsearch/index/fielddata/plain/DoubleArrayIndexFieldData.java index 327cba1420bca..3cf762db2db25 100644 --- a/src/main/java/org/elasticsearch/index/fielddata/plain/DoubleArrayIndexFieldData.java +++ b/src/main/java/org/elasticsearch/index/fielddata/plain/DoubleArrayIndexFieldData.java @@ -22,6 +22,7 @@ import org.apache.lucene.index.*; import org.apache.lucene.util.*; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.DoubleArray; @@ -34,7 +35,7 @@ import org.elasticsearch.index.mapper.FieldMapper; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.settings.IndexSettings; -import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService; +import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.search.MultiValueMode; /** @@ -70,7 +71,7 @@ public AtomicNumericFieldData loadDirect(AtomicReaderContext context) throws Exc Terms terms = reader.terms(getFieldNames().indexName()); AtomicNumericFieldData data = null; // TODO: Use an actual estimator to estimate before loading. - NonEstimatingEstimator estimator = new NonEstimatingEstimator(breakerService.getBreaker()); + NonEstimatingEstimator estimator = new NonEstimatingEstimator(breakerService.getBreaker(CircuitBreaker.Name.FIELDDATA)); if (terms == null) { data = AtomicDoubleFieldData.empty(reader.maxDoc()); estimator.afterLoad(null, data.ramBytesUsed()); diff --git a/src/main/java/org/elasticsearch/index/fielddata/plain/FSTBytesIndexFieldData.java b/src/main/java/org/elasticsearch/index/fielddata/plain/FSTBytesIndexFieldData.java index 5aa8e371629ef..85cd1e2c14f5c 100644 --- a/src/main/java/org/elasticsearch/index/fielddata/plain/FSTBytesIndexFieldData.java +++ b/src/main/java/org/elasticsearch/index/fielddata/plain/FSTBytesIndexFieldData.java @@ -25,6 +25,7 @@ import org.apache.lucene.util.fst.FST.INPUT_TYPE; import org.apache.lucene.util.fst.PositiveIntOutputs; import org.apache.lucene.util.fst.Util; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.Index; import org.elasticsearch.index.fielddata.*; @@ -33,7 +34,7 @@ import org.elasticsearch.index.mapper.FieldMapper; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.settings.IndexSettings; -import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService; +import org.elasticsearch.indices.breaker.CircuitBreakerService; /** */ @@ -63,7 +64,7 @@ public AtomicOrdinalsFieldData loadDirect(AtomicReaderContext context) throws Ex Terms terms = reader.terms(getFieldNames().indexName()); AtomicOrdinalsFieldData data = null; // TODO: Use an actual estimator to estimate before loading. - NonEstimatingEstimator estimator = new NonEstimatingEstimator(breakerService.getBreaker()); + NonEstimatingEstimator estimator = new NonEstimatingEstimator(breakerService.getBreaker(CircuitBreaker.Name.FIELDDATA)); if (terms == null) { data = AbstractAtomicOrdinalsFieldData.empty(); estimator.afterLoad(null, data.ramBytesUsed()); diff --git a/src/main/java/org/elasticsearch/index/fielddata/plain/FloatArrayIndexFieldData.java b/src/main/java/org/elasticsearch/index/fielddata/plain/FloatArrayIndexFieldData.java index 45415e796547a..3c0030408fec6 100644 --- a/src/main/java/org/elasticsearch/index/fielddata/plain/FloatArrayIndexFieldData.java +++ b/src/main/java/org/elasticsearch/index/fielddata/plain/FloatArrayIndexFieldData.java @@ -21,6 +21,7 @@ import org.apache.lucene.index.*; import org.apache.lucene.util.*; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.FloatArray; @@ -33,7 +34,7 @@ import org.elasticsearch.index.mapper.FieldMapper; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.settings.IndexSettings; -import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService; +import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.search.MultiValueMode; /** @@ -68,7 +69,7 @@ public AtomicNumericFieldData loadDirect(AtomicReaderContext context) throws Exc Terms terms = reader.terms(getFieldNames().indexName()); AtomicNumericFieldData data = null; // TODO: Use an actual estimator to estimate before loading. - NonEstimatingEstimator estimator = new NonEstimatingEstimator(breakerService.getBreaker()); + NonEstimatingEstimator estimator = new NonEstimatingEstimator(breakerService.getBreaker(CircuitBreaker.Name.FIELDDATA)); if (terms == null) { data = AtomicDoubleFieldData.empty(reader.maxDoc()); estimator.afterLoad(null, data.ramBytesUsed()); diff --git a/src/main/java/org/elasticsearch/index/fielddata/plain/GeoPointBinaryDVIndexFieldData.java b/src/main/java/org/elasticsearch/index/fielddata/plain/GeoPointBinaryDVIndexFieldData.java index 4165c0d8cb7a0..7f775ab13a1b8 100644 --- a/src/main/java/org/elasticsearch/index/fielddata/plain/GeoPointBinaryDVIndexFieldData.java +++ b/src/main/java/org/elasticsearch/index/fielddata/plain/GeoPointBinaryDVIndexFieldData.java @@ -31,7 +31,7 @@ import org.elasticsearch.index.mapper.FieldMapper; import org.elasticsearch.index.mapper.FieldMapper.Names; import org.elasticsearch.index.mapper.MapperService; -import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService; +import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.search.MultiValueMode; import java.io.IOException; diff --git a/src/main/java/org/elasticsearch/index/fielddata/plain/GeoPointCompressedIndexFieldData.java b/src/main/java/org/elasticsearch/index/fielddata/plain/GeoPointCompressedIndexFieldData.java index aedaac01bdbdd..c33c74c38ab60 100644 --- a/src/main/java/org/elasticsearch/index/fielddata/plain/GeoPointCompressedIndexFieldData.java +++ b/src/main/java/org/elasticsearch/index/fielddata/plain/GeoPointCompressedIndexFieldData.java @@ -25,6 +25,7 @@ import org.apache.lucene.util.FixedBitSet; import org.apache.lucene.util.packed.PackedInts; import org.apache.lucene.util.packed.PagedMutable; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.geo.GeoPoint; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.DistanceUnit; @@ -38,7 +39,7 @@ import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.geo.GeoPointFieldMapper; import org.elasticsearch.index.settings.IndexSettings; -import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService; +import org.elasticsearch.indices.breaker.CircuitBreakerService; /** */ @@ -82,7 +83,7 @@ public AtomicGeoPointFieldData loadDirect(AtomicReaderContext context) throws Ex Terms terms = reader.terms(getFieldNames().indexName()); AtomicGeoPointFieldData data = null; // TODO: Use an actual estimator to estimate before loading. - NonEstimatingEstimator estimator = new NonEstimatingEstimator(breakerService.getBreaker()); + NonEstimatingEstimator estimator = new NonEstimatingEstimator(breakerService.getBreaker(CircuitBreaker.Name.FIELDDATA)); if (terms == null) { data = AbstractAtomicGeoPointFieldData.empty(reader.maxDoc()); estimator.afterLoad(null, data.ramBytesUsed()); diff --git a/src/main/java/org/elasticsearch/index/fielddata/plain/GeoPointDoubleArrayIndexFieldData.java b/src/main/java/org/elasticsearch/index/fielddata/plain/GeoPointDoubleArrayIndexFieldData.java index fb2936c49d80a..e1684d65d6d06 100644 --- a/src/main/java/org/elasticsearch/index/fielddata/plain/GeoPointDoubleArrayIndexFieldData.java +++ b/src/main/java/org/elasticsearch/index/fielddata/plain/GeoPointDoubleArrayIndexFieldData.java @@ -23,6 +23,7 @@ import org.apache.lucene.index.RandomAccessOrds; import org.apache.lucene.index.Terms; import org.apache.lucene.util.FixedBitSet; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.geo.GeoPoint; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; @@ -34,7 +35,7 @@ import org.elasticsearch.index.mapper.FieldMapper; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.settings.IndexSettings; -import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService; +import org.elasticsearch.indices.breaker.CircuitBreakerService; /** */ @@ -64,7 +65,7 @@ public AtomicGeoPointFieldData loadDirect(AtomicReaderContext context) throws Ex Terms terms = reader.terms(getFieldNames().indexName()); AtomicGeoPointFieldData data = null; // TODO: Use an actual estimator to estimate before loading. - NonEstimatingEstimator estimator = new NonEstimatingEstimator(breakerService.getBreaker()); + NonEstimatingEstimator estimator = new NonEstimatingEstimator(breakerService.getBreaker(CircuitBreaker.Name.FIELDDATA)); if (terms == null) { data = AbstractAtomicGeoPointFieldData.empty(reader.maxDoc()); estimator.afterLoad(null, data.ramBytesUsed()); diff --git a/src/main/java/org/elasticsearch/index/fielddata/plain/IndexIndexFieldData.java b/src/main/java/org/elasticsearch/index/fielddata/plain/IndexIndexFieldData.java index 3850d7920c585..ab875c62ab412 100644 --- a/src/main/java/org/elasticsearch/index/fielddata/plain/IndexIndexFieldData.java +++ b/src/main/java/org/elasticsearch/index/fielddata/plain/IndexIndexFieldData.java @@ -27,7 +27,7 @@ import org.elasticsearch.index.fielddata.*; import org.elasticsearch.index.mapper.FieldMapper; import org.elasticsearch.index.mapper.MapperService; -import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService; +import org.elasticsearch.indices.breaker.CircuitBreakerService; public class IndexIndexFieldData extends AbstractIndexOrdinalsFieldData { diff --git a/src/main/java/org/elasticsearch/index/fielddata/plain/NonEstimatingEstimator.java b/src/main/java/org/elasticsearch/index/fielddata/plain/NonEstimatingEstimator.java index 5b35aa70eb1f8..481b2b3c84ca0 100644 --- a/src/main/java/org/elasticsearch/index/fielddata/plain/NonEstimatingEstimator.java +++ b/src/main/java/org/elasticsearch/index/fielddata/plain/NonEstimatingEstimator.java @@ -22,7 +22,7 @@ import org.apache.lucene.index.TermsEnum; import org.apache.lucene.util.BytesRef; import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.breaker.MemoryCircuitBreaker; +import org.elasticsearch.common.breaker.CircuitBreaker; import java.io.IOException; @@ -33,9 +33,9 @@ */ public class NonEstimatingEstimator implements AbstractIndexFieldData.PerValueEstimator { - private final MemoryCircuitBreaker breaker; + private final CircuitBreaker breaker; - NonEstimatingEstimator(MemoryCircuitBreaker breaker) { + NonEstimatingEstimator(CircuitBreaker breaker) { this.breaker = breaker; } diff --git a/src/main/java/org/elasticsearch/index/fielddata/plain/PackedArrayIndexFieldData.java b/src/main/java/org/elasticsearch/index/fielddata/plain/PackedArrayIndexFieldData.java index 971b3a0f933a3..2cffa02a2cb4c 100644 --- a/src/main/java/org/elasticsearch/index/fielddata/plain/PackedArrayIndexFieldData.java +++ b/src/main/java/org/elasticsearch/index/fielddata/plain/PackedArrayIndexFieldData.java @@ -27,7 +27,7 @@ import org.apache.lucene.util.packed.PackedInts; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.breaker.MemoryCircuitBreaker; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.Index; import org.elasticsearch.index.fielddata.*; @@ -38,7 +38,7 @@ import org.elasticsearch.index.mapper.FieldMapper; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.settings.IndexSettings; -import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService; +import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.search.MultiValueMode; import java.io.IOException; @@ -88,7 +88,7 @@ public AtomicNumericFieldData loadDirect(AtomicReaderContext context) throws Exc final AtomicReader reader = context.reader(); Terms terms = reader.terms(getFieldNames().indexName()); AtomicNumericFieldData data = null; - PackedArrayEstimator estimator = new PackedArrayEstimator(breakerService.getBreaker(), getNumericType(), getFieldNames().fullName()); + PackedArrayEstimator estimator = new PackedArrayEstimator(breakerService.getBreaker(CircuitBreaker.Name.FIELDDATA), getNumericType(), getFieldNames().fullName()); if (terms == null) { data = AtomicLongFieldData.empty(reader.maxDoc()); estimator.adjustForNoTerms(data.ramBytesUsed()); @@ -355,11 +355,11 @@ public XFieldComparatorSource comparatorSource(@Nullable Object missingValue, Mu */ public class PackedArrayEstimator implements PerValueEstimator { - private final MemoryCircuitBreaker breaker; + private final CircuitBreaker breaker; private final NumericType type; private final String fieldName; - public PackedArrayEstimator(MemoryCircuitBreaker breaker, NumericType type, String fieldName) { + public PackedArrayEstimator(CircuitBreaker breaker, NumericType type, String fieldName) { this.breaker = breaker; this.type = type; this.fieldName = fieldName; diff --git a/src/main/java/org/elasticsearch/index/fielddata/plain/PagedBytesIndexFieldData.java b/src/main/java/org/elasticsearch/index/fielddata/plain/PagedBytesIndexFieldData.java index d6d3e051a5fca..fbfda4af76501 100644 --- a/src/main/java/org/elasticsearch/index/fielddata/plain/PagedBytesIndexFieldData.java +++ b/src/main/java/org/elasticsearch/index/fielddata/plain/PagedBytesIndexFieldData.java @@ -24,7 +24,7 @@ import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.PagedBytes; import org.apache.lucene.util.packed.MonotonicAppendingLongBuffer; -import org.elasticsearch.common.breaker.MemoryCircuitBreaker; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.Index; import org.elasticsearch.index.fielddata.*; @@ -33,7 +33,7 @@ import org.elasticsearch.index.mapper.FieldMapper; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.settings.IndexSettings; -import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService; +import org.elasticsearch.indices.breaker.CircuitBreakerService; import java.io.IOException; @@ -61,7 +61,7 @@ public AtomicOrdinalsFieldData loadDirect(AtomicReaderContext context) throws Ex AtomicReader reader = context.reader(); AtomicOrdinalsFieldData data = null; - PagedBytesEstimator estimator = new PagedBytesEstimator(context, breakerService.getBreaker(), getFieldNames().fullName()); + PagedBytesEstimator estimator = new PagedBytesEstimator(context, breakerService.getBreaker(CircuitBreaker.Name.FIELDDATA), getFieldNames().fullName()); Terms terms = reader.terms(getFieldNames().indexName()); if (terms == null) { data = AbstractAtomicOrdinalsFieldData.empty(); @@ -125,11 +125,11 @@ public AtomicOrdinalsFieldData loadDirect(AtomicReaderContext context) throws Ex public class PagedBytesEstimator implements PerValueEstimator { private final AtomicReaderContext context; - private final MemoryCircuitBreaker breaker; + private final CircuitBreaker breaker; private final String fieldName; private long estimatedBytes; - PagedBytesEstimator(AtomicReaderContext context, MemoryCircuitBreaker breaker, String fieldName) { + PagedBytesEstimator(AtomicReaderContext context, CircuitBreaker breaker, String fieldName) { this.breaker = breaker; this.context = context; this.fieldName = fieldName; diff --git a/src/main/java/org/elasticsearch/index/fielddata/plain/ParentChildIndexFieldData.java b/src/main/java/org/elasticsearch/index/fielddata/plain/ParentChildIndexFieldData.java index 1b3f30e0bc64d..5e52b5a438f75 100644 --- a/src/main/java/org/elasticsearch/index/fielddata/plain/ParentChildIndexFieldData.java +++ b/src/main/java/org/elasticsearch/index/fielddata/plain/ParentChildIndexFieldData.java @@ -32,7 +32,7 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchIllegalStateException; import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.breaker.MemoryCircuitBreaker; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; @@ -47,7 +47,7 @@ import org.elasticsearch.index.mapper.internal.ParentFieldMapper; import org.elasticsearch.index.mapper.internal.UidFieldMapper; import org.elasticsearch.index.settings.IndexSettings; -import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService; +import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.search.MultiValueMode; import java.io.IOException; @@ -101,7 +101,7 @@ public ParentChildAtomicFieldData loadDirect(AtomicReaderContext context) throws new ParentChildIntersectTermsEnum(reader, UidFieldMapper.NAME, ParentFieldMapper.NAME), parentTypes ); - ParentChildEstimator estimator = new ParentChildEstimator(breakerService.getBreaker(), termsEnum); + ParentChildEstimator estimator = new ParentChildEstimator(breakerService.getBreaker(CircuitBreaker.Name.FIELDDATA), termsEnum); TermsEnum estimatedTermsEnum = estimator.beforeLoad(null); ObjectObjectOpenHashMap typeBuilders = ObjectObjectOpenHashMap.newInstance(); try { @@ -210,13 +210,13 @@ public IndexFieldData build(Index index, @IndexSettings Settings indexSetting */ public class ParentChildEstimator implements PerValueEstimator { - private final MemoryCircuitBreaker breaker; + private final CircuitBreaker breaker; private final TermsEnum filteredEnum; // The TermsEnum is passed in here instead of being generated in the // beforeLoad() function since it's filtered inside the previous // TermsEnum wrappers - public ParentChildEstimator(MemoryCircuitBreaker breaker, TermsEnum filteredEnum) { + public ParentChildEstimator(CircuitBreaker breaker, TermsEnum filteredEnum) { this.breaker = breaker; this.filteredEnum = filteredEnum; } @@ -337,7 +337,7 @@ public int getOrd(int docID) { } } - breakerService.getBreaker().addWithoutBreaking(ramBytesUsed); + breakerService.getBreaker(CircuitBreaker.Name.FIELDDATA).addWithoutBreaking(ramBytesUsed); if (logger.isDebugEnabled()) { logger.debug( "Global-ordinals[_parent] took {}", diff --git a/src/main/java/org/elasticsearch/index/fielddata/plain/SortedSetDVOrdinalsIndexFieldData.java b/src/main/java/org/elasticsearch/index/fielddata/plain/SortedSetDVOrdinalsIndexFieldData.java index 3a1b85251a3d8..bf615b746a42f 100644 --- a/src/main/java/org/elasticsearch/index/fielddata/plain/SortedSetDVOrdinalsIndexFieldData.java +++ b/src/main/java/org/elasticsearch/index/fielddata/plain/SortedSetDVOrdinalsIndexFieldData.java @@ -29,8 +29,8 @@ import org.elasticsearch.index.fielddata.fieldcomparator.BytesRefFieldComparatorSource; import org.elasticsearch.index.fielddata.ordinals.GlobalOrdinalsBuilder; import org.elasticsearch.index.mapper.FieldMapper.Names; -import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService; import org.elasticsearch.search.MultiValueMode; +import org.elasticsearch.indices.breaker.CircuitBreakerService; public class SortedSetDVOrdinalsIndexFieldData extends DocValuesIndexFieldData implements IndexOrdinalsFieldData { diff --git a/src/main/java/org/elasticsearch/indices/IndicesModule.java b/src/main/java/org/elasticsearch/indices/IndicesModule.java index 76ac7e165d70e..dad12f9a295bb 100644 --- a/src/main/java/org/elasticsearch/indices/IndicesModule.java +++ b/src/main/java/org/elasticsearch/indices/IndicesModule.java @@ -29,10 +29,8 @@ import org.elasticsearch.indices.cache.filter.IndicesFilterCache; import org.elasticsearch.indices.cache.filter.terms.IndicesTermsFilterCache; import org.elasticsearch.indices.cluster.IndicesClusterStateService; -import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService; -import org.elasticsearch.indices.fielddata.breaker.InternalCircuitBreakerService; -import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCacheListener; import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache; +import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCacheListener; import org.elasticsearch.indices.memory.IndexingMemoryController; import org.elasticsearch.indices.query.IndicesQueriesModule; import org.elasticsearch.indices.recovery.RecoverySettings; @@ -81,7 +79,6 @@ protected void configure() { bind(IndicesWarmer.class).to(InternalIndicesWarmer.class).asEagerSingleton(); bind(UpdateHelper.class).asEagerSingleton(); - bind(CircuitBreakerService.class).to(InternalCircuitBreakerService.class).asEagerSingleton(); bind(IndicesFieldDataCacheListener.class).asEagerSingleton(); } } diff --git a/src/main/java/org/elasticsearch/indices/breaker/AllCircuitBreakerStats.java b/src/main/java/org/elasticsearch/indices/breaker/AllCircuitBreakerStats.java new file mode 100644 index 0000000000000..4ba07a50d5909 --- /dev/null +++ b/src/main/java/org/elasticsearch/indices/breaker/AllCircuitBreakerStats.java @@ -0,0 +1,102 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.indices.breaker; + +import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Streamable; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentBuilderString; + +import java.io.IOException; + +/** + * Stats class encapsulating all of the different circuit breaker stats + */ +public class AllCircuitBreakerStats implements Streamable, ToXContent { + + private CircuitBreakerStats[] allStats = new CircuitBreakerStats[0]; + + public AllCircuitBreakerStats() { + + } + + public AllCircuitBreakerStats(CircuitBreakerStats[] allStats) { + this.allStats = allStats; + } + + public CircuitBreakerStats[] getAllStats() { + return this.allStats; + } + + public CircuitBreakerStats getStats(CircuitBreaker.Name name) { + for (CircuitBreakerStats stats : allStats) { + if (stats.getName() == name) { + return stats; + } + } + return null; + } + + public static AllCircuitBreakerStats readOptionalAllCircuitBreakerStats(StreamInput in) throws IOException { + AllCircuitBreakerStats stats = in.readOptionalStreamable(new AllCircuitBreakerStats()); + return stats; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + int statCount = in.readVInt(); + CircuitBreakerStats[] newStats = new CircuitBreakerStats[statCount]; + for (int i = 0; i < statCount; i++) { + CircuitBreakerStats stats = new CircuitBreakerStats(); + stats.readFrom(in); + newStats[i] = stats; + } + allStats = newStats; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVInt(allStats.length); + for (CircuitBreakerStats stats : allStats) { + if (stats != null) { + stats.writeTo(out); + } + } + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(Fields.BREAKERS); + for (CircuitBreakerStats stats : allStats) { + if (stats != null) { + stats.toXContent(builder, params); + } + } + builder.endObject(); + return builder; + } + + static final class Fields { + static final XContentBuilderString BREAKERS = new XContentBuilderString("breakers"); + } +} diff --git a/src/main/java/org/elasticsearch/indices/breaker/BreakerSettings.java b/src/main/java/org/elasticsearch/indices/breaker/BreakerSettings.java new file mode 100644 index 0000000000000..b76e4b45dee53 --- /dev/null +++ b/src/main/java/org/elasticsearch/indices/breaker/BreakerSettings.java @@ -0,0 +1,58 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.indices.breaker; + +import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.unit.ByteSizeValue; + +/** + * Settings for a {@link CircuitBreaker} + */ +public class BreakerSettings { + + private final CircuitBreaker.Name name; + private final long limitBytes; + private final double overhead; + + public BreakerSettings(CircuitBreaker.Name name, long limitBytes, double overhead) { + this.name = name; + this.limitBytes = limitBytes; + this.overhead = overhead; + } + + public CircuitBreaker.Name getName() { + return this.name; + } + + public long getLimit() { + return this.limitBytes; + } + + public double getOverhead() { + return this.overhead; + } + + @Override + public String toString() { + return "[" + this.name.toString() + + ",limit=" + this.limitBytes + "/" + new ByteSizeValue(this.limitBytes) + + ",overhead=" + this.overhead + "]"; + } +} diff --git a/src/main/java/org/elasticsearch/indices/fielddata/breaker/CircuitBreakerService.java b/src/main/java/org/elasticsearch/indices/breaker/CircuitBreakerModule.java similarity index 57% rename from src/main/java/org/elasticsearch/indices/fielddata/breaker/CircuitBreakerService.java rename to src/main/java/org/elasticsearch/indices/breaker/CircuitBreakerModule.java index c53cc3e1ba202..6033015ce546a 100644 --- a/src/main/java/org/elasticsearch/indices/fielddata/breaker/CircuitBreakerService.java +++ b/src/main/java/org/elasticsearch/indices/breaker/CircuitBreakerModule.java @@ -17,24 +17,23 @@ * under the License. */ -package org.elasticsearch.indices.fielddata.breaker; +package org.elasticsearch.indices.breaker; -import org.elasticsearch.common.breaker.MemoryCircuitBreaker; +import org.elasticsearch.common.inject.AbstractModule; +import org.elasticsearch.common.settings.Settings; -/** - * Interface for Circuit Breaker services, which provide breakers to classes - * that load field data. - */ -public interface CircuitBreakerService { +public class CircuitBreakerModule extends AbstractModule { + + public static final String IMPL = "indices.breaker.breaker_impl"; - /** - * @return the breaker that can be used to register estimates against - */ - public MemoryCircuitBreaker getBreaker(); + private final Settings settings; + public CircuitBreakerModule(Settings settings) { + this.settings = settings; + } - /** - * @return stats about the breaker - */ - public FieldDataBreakerStats stats(); + @Override + protected void configure() { + bind(CircuitBreakerService.class).to(settings.getAsClass(IMPL, HierarchyCircuitBreakerService.class)).asEagerSingleton(); + } } diff --git a/src/main/java/org/elasticsearch/indices/breaker/CircuitBreakerService.java b/src/main/java/org/elasticsearch/indices/breaker/CircuitBreakerService.java new file mode 100644 index 0000000000000..35e5ee80e5a2e --- /dev/null +++ b/src/main/java/org/elasticsearch/indices/breaker/CircuitBreakerService.java @@ -0,0 +1,60 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.indices.breaker; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.component.AbstractLifecycleComponent; +import org.elasticsearch.common.settings.Settings; + +/** + * Interface for Circuit Breaker services, which provide breakers to classes + * that load field data. + */ +public abstract class CircuitBreakerService extends AbstractLifecycleComponent { + + protected CircuitBreakerService(Settings settings) { + super(settings); + } + + /** + * @return the breaker that can be used to register estimates against + */ + public abstract CircuitBreaker getBreaker(CircuitBreaker.Name type); + + /** + * @return stats about all breakers + */ + public abstract AllCircuitBreakerStats stats(); + + /** + * @return stats about a specific breaker + */ + public abstract CircuitBreakerStats stats(CircuitBreaker.Name name); + + protected void doStart() throws ElasticsearchException { + } + + protected void doStop() throws ElasticsearchException { + } + + protected void doClose() throws ElasticsearchException { + } +} diff --git a/src/main/java/org/elasticsearch/indices/fielddata/breaker/FieldDataBreakerStats.java b/src/main/java/org/elasticsearch/indices/breaker/CircuitBreakerStats.java similarity index 61% rename from src/main/java/org/elasticsearch/indices/fielddata/breaker/FieldDataBreakerStats.java rename to src/main/java/org/elasticsearch/indices/breaker/CircuitBreakerStats.java index 555c0f09efbd4..67bdd358dcd7a 100644 --- a/src/main/java/org/elasticsearch/indices/fielddata/breaker/FieldDataBreakerStats.java +++ b/src/main/java/org/elasticsearch/indices/breaker/CircuitBreakerStats.java @@ -17,9 +17,10 @@ * under the License. */ -package org.elasticsearch.indices.fielddata.breaker; +package org.elasticsearch.indices.breaker; import org.elasticsearch.Version; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Streamable; @@ -29,30 +30,37 @@ import org.elasticsearch.common.xcontent.XContentBuilderString; import java.io.IOException; +import java.util.Locale; /** * Class encapsulating stats about the circuit breaker */ -public class FieldDataBreakerStats implements Streamable, ToXContent { +public class CircuitBreakerStats implements Streamable, ToXContent { - private long maximum; + private CircuitBreaker.Name name; + private long limit; private long estimated; private long trippedCount; private double overhead; - FieldDataBreakerStats() { + CircuitBreakerStats() { } - public FieldDataBreakerStats(long maximum, long estimated, double overhead, long trippedCount) { - this.maximum = maximum; + public CircuitBreakerStats(CircuitBreaker.Name name, long limit, long estimated, double overhead, long trippedCount) { + this.name = name; + this.limit = limit; this.estimated = estimated; this.trippedCount = trippedCount; this.overhead = overhead; } - public long getMaximum() { - return this.maximum; + public CircuitBreaker.Name getName() { + return this.name; + } + + public long getLimit() { + return this.limit; } public long getEstimated() { @@ -67,14 +75,15 @@ public double getOverhead() { return this.overhead; } - public static FieldDataBreakerStats readOptionalCircuitBreakerStats(StreamInput in) throws IOException { - FieldDataBreakerStats stats = in.readOptionalStreamable(new FieldDataBreakerStats()); + public static CircuitBreakerStats readOptionalCircuitBreakerStats(StreamInput in) throws IOException { + CircuitBreakerStats stats = in.readOptionalStreamable(new CircuitBreakerStats()); return stats; } @Override public void readFrom(StreamInput in) throws IOException { - maximum = in.readLong(); + // limit is the maximum from the old circuit breaker stats for backwards compatibility + limit = in.readLong(); estimated = in.readLong(); overhead = in.readDouble(); if (in.getVersion().onOrAfter(Version.V_1_2_0)) { @@ -82,23 +91,31 @@ public void readFrom(StreamInput in) throws IOException { } else { this.trippedCount = -1; } + if (in.getVersion().onOrAfter(Version.V_1_4_0)) { + this.name = CircuitBreaker.Name.readFrom(in); + } else { + this.name = CircuitBreaker.Name.FIELDDATA; + } } @Override public void writeTo(StreamOutput out) throws IOException { - out.writeLong(maximum); + out.writeLong(limit); out.writeLong(estimated); out.writeDouble(overhead); if (out.getVersion().onOrAfter(Version.V_1_2_0)) { out.writeLong(trippedCount); } + if (out.getVersion().onOrAfter(Version.V_1_4_0)) { + CircuitBreaker.Name.writeTo(name, out); + } } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.startObject(Fields.BREAKER); - builder.field(Fields.MAX, maximum); - builder.field(Fields.MAX_HUMAN, new ByteSizeValue(maximum)); + builder.startObject(name.toString().toLowerCase(Locale.ROOT)); + builder.field(Fields.LIMIT, limit); + builder.field(Fields.LIMIT_HUMAN, new ByteSizeValue(limit)); builder.field(Fields.ESTIMATED, estimated); builder.field(Fields.ESTIMATED_HUMAN, new ByteSizeValue(estimated)); builder.field(Fields.OVERHEAD, overhead); @@ -107,10 +124,17 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws return builder; } + @Override + public String toString() { + return "[" + this.name.toString() + + ",limit=" + this.limit + "/" + new ByteSizeValue(this.limit) + + ",estimated=" + this.estimated + "/" + new ByteSizeValue(this.estimated) + + ",overhead=" + this.overhead + ",tripped=" + this.trippedCount + "]"; + } + static final class Fields { - static final XContentBuilderString BREAKER = new XContentBuilderString("fielddata_breaker"); - static final XContentBuilderString MAX = new XContentBuilderString("maximum_size_in_bytes"); - static final XContentBuilderString MAX_HUMAN = new XContentBuilderString("maximum_size"); + static final XContentBuilderString LIMIT = new XContentBuilderString("limit_size_in_bytes"); + static final XContentBuilderString LIMIT_HUMAN = new XContentBuilderString("limit_size"); static final XContentBuilderString ESTIMATED = new XContentBuilderString("estimated_size_in_bytes"); static final XContentBuilderString ESTIMATED_HUMAN = new XContentBuilderString("estimated_size"); static final XContentBuilderString OVERHEAD = new XContentBuilderString("overhead"); diff --git a/src/main/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerService.java b/src/main/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerService.java new file mode 100644 index 0000000000000..67bfaf4688815 --- /dev/null +++ b/src/main/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerService.java @@ -0,0 +1,235 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.indices.breaker; + +import com.google.common.collect.ImmutableMap; +import org.elasticsearch.ElasticsearchIllegalStateException; +import org.elasticsearch.common.breaker.ChildMemoryCircuitBreaker; +import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.breaker.CircuitBreakingException; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.node.settings.NodeSettingsService; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +import static com.google.common.collect.Lists.newArrayList; + +/** + * CircuitBreakerService that attempts to redistribute space between breakers + * if tripped + */ +public class HierarchyCircuitBreakerService extends CircuitBreakerService { + + private volatile ImmutableMap breakers; + + public static final String TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING = "indices.breaker.total.limit"; + public static final String DEFAULT_TOTAL_CIRCUIT_BREAKER_LIMIT = "70%"; + + public static final String FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING = "indices.breaker.fielddata.limit"; + public static final String FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING = "indices.breaker.fielddata.overhead"; + public static final String DEFAULT_FIELDDATA_BREAKER_LIMIT = "60%"; + public static final double DEFAULT_FIELDDATA_OVERHEAD_CONSTANT = 1.03; + + public static final String REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING = "indices.breaker.request.limit"; + public static final String REQUEST_CIRCUIT_BREAKER_OVERHEAD_SETTING = "indices.breaker.request.overhead"; + public static final String DEFAULT_REQUEST_BREAKER_LIMIT = "40%"; + + private volatile BreakerSettings parentSettings; + private volatile BreakerSettings fielddataSettings; + private volatile BreakerSettings requestSettings; + + // Tripped count for when redistribution was attempted but wasn't successful + private final AtomicLong parentTripCount = new AtomicLong(0); + + @Inject + public HierarchyCircuitBreakerService(Settings settings, NodeSettingsService nodeSettingsService) { + super(settings); + + // This uses the old InternalCircuitBreakerService.CIRCUIT_BREAKER_MAX_BYTES_SETTING + // setting to keep backwards compatibility with 1.2, it can be safely + // removed when compatibility with 1.2 is no longer needed + String compatibilityFielddataLimitDefault = DEFAULT_FIELDDATA_BREAKER_LIMIT; + ByteSizeValue compatibilityFielddataLimit = settings.getAsMemory(InternalCircuitBreakerService.CIRCUIT_BREAKER_MAX_BYTES_SETTING, null); + if (compatibilityFielddataLimit != null) { + compatibilityFielddataLimitDefault = compatibilityFielddataLimit.toString(); + } + + // This uses the old InternalCircuitBreakerService.CIRCUIT_BREAKER_OVERHEAD_SETTING + // setting to keep backwards compatibility with 1.2, it can be safely + // removed when compatibility with 1.2 is no longer needed + double compatibilityFielddataOverheadDefault = DEFAULT_FIELDDATA_OVERHEAD_CONSTANT; + Double compatibilityFielddataOverhead = settings.getAsDouble(InternalCircuitBreakerService.CIRCUIT_BREAKER_OVERHEAD_SETTING, null); + if (compatibilityFielddataOverhead != null) { + compatibilityFielddataOverheadDefault = compatibilityFielddataOverhead; + } + + this.fielddataSettings = new BreakerSettings(CircuitBreaker.Name.FIELDDATA, + settings.getAsMemory(FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING, compatibilityFielddataLimitDefault).bytes(), + settings.getAsDouble(FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING, compatibilityFielddataOverheadDefault)); + + this.requestSettings = new BreakerSettings(CircuitBreaker.Name.REQUEST, + settings.getAsMemory(REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING, DEFAULT_REQUEST_BREAKER_LIMIT).bytes(), + settings.getAsDouble(REQUEST_CIRCUIT_BREAKER_OVERHEAD_SETTING, 1.0)); + + // Validate the configured settings + validateSettings(new BreakerSettings[] {this.requestSettings, this.fielddataSettings}); + + this.parentSettings = new BreakerSettings(CircuitBreaker.Name.PARENT, + settings.getAsMemory(TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING, DEFAULT_TOTAL_CIRCUIT_BREAKER_LIMIT).bytes(), 1.0); + if (logger.isTraceEnabled()) { + logger.trace("parent circuit breaker with settings {}", this.parentSettings); + } + + Map tempBreakers = new HashMap<>(); + tempBreakers.put(CircuitBreaker.Name.FIELDDATA, new ChildMemoryCircuitBreaker(fielddataSettings, logger, this, CircuitBreaker.Name.FIELDDATA)); + tempBreakers.put(CircuitBreaker.Name.REQUEST, new ChildMemoryCircuitBreaker(requestSettings, logger, this, CircuitBreaker.Name.REQUEST)); + this.breakers = ImmutableMap.copyOf(tempBreakers); + + nodeSettingsService.addListener(new ApplySettings()); + } + + public class ApplySettings implements NodeSettingsService.Listener { + + @Override + public void onRefreshSettings(Settings settings) { + boolean changed = false; + + // Fielddata settings + BreakerSettings newFielddataSettings = HierarchyCircuitBreakerService.this.fielddataSettings; + ByteSizeValue newFielddataMax = settings.getAsMemory(FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING, null); + Double newFielddataOverhead = settings.getAsDouble(FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING, null); + if (newFielddataMax != null || newFielddataOverhead != null) { + changed = true; + long newFielddataLimitBytes = newFielddataMax == null ? HierarchyCircuitBreakerService.this.fielddataSettings.getLimit() : newFielddataMax.bytes(); + newFielddataOverhead = newFielddataOverhead == null ? HierarchyCircuitBreakerService.this.fielddataSettings.getOverhead() : newFielddataOverhead; + + newFielddataSettings = new BreakerSettings(CircuitBreaker.Name.FIELDDATA, newFielddataLimitBytes, newFielddataOverhead); + } + + // Request settings + BreakerSettings newRequestSettings = HierarchyCircuitBreakerService.this.requestSettings; + ByteSizeValue newRequestMax = settings.getAsMemory(REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING, null); + Double newRequestOverhead = settings.getAsDouble(REQUEST_CIRCUIT_BREAKER_OVERHEAD_SETTING, null); + if (newRequestMax != null || newRequestOverhead != null) { + changed = true; + long newRequestLimitBytes = newRequestMax == null ? HierarchyCircuitBreakerService.this.requestSettings.getLimit() : newRequestMax.bytes(); + newRequestOverhead = newRequestOverhead == null ? HierarchyCircuitBreakerService.this.requestSettings.getOverhead() : newRequestOverhead; + + newRequestSettings = new BreakerSettings(CircuitBreaker.Name.REQUEST, newRequestLimitBytes, newRequestOverhead); + } + + // Parent settings + BreakerSettings newParentSettings = HierarchyCircuitBreakerService.this.parentSettings; + long oldParentMax = HierarchyCircuitBreakerService.this.parentSettings.getLimit(); + ByteSizeValue newParentMax = settings.getAsMemory(TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING, null); + if (newParentMax != null && (newParentMax.bytes() != oldParentMax)) { + changed = true; + newParentSettings = new BreakerSettings(CircuitBreaker.Name.PARENT, newParentMax.bytes(), 1.0); + } + + if (changed) { + // change all the things + validateSettings(new BreakerSettings[] {newFielddataSettings, newRequestSettings}); + logger.info("Updating settings parent: {}, fielddata: {}, request: {}", newParentSettings, newFielddataSettings, newRequestSettings); + HierarchyCircuitBreakerService.this.parentSettings = newParentSettings; + HierarchyCircuitBreakerService.this.fielddataSettings = newFielddataSettings; + HierarchyCircuitBreakerService.this.requestSettings = newRequestSettings; + Map tempBreakers = new HashMap<>(); + tempBreakers.put(CircuitBreaker.Name.FIELDDATA, new ChildMemoryCircuitBreaker(newFielddataSettings, + (ChildMemoryCircuitBreaker)HierarchyCircuitBreakerService.this.breakers.get(CircuitBreaker.Name.FIELDDATA), + logger, HierarchyCircuitBreakerService.this, CircuitBreaker.Name.FIELDDATA)); + tempBreakers.put(CircuitBreaker.Name.REQUEST, new ChildMemoryCircuitBreaker(newRequestSettings, + (ChildMemoryCircuitBreaker)HierarchyCircuitBreakerService.this.breakers.get(CircuitBreaker.Name.REQUEST), + logger, HierarchyCircuitBreakerService.this, CircuitBreaker.Name.REQUEST)); + HierarchyCircuitBreakerService.this.breakers = ImmutableMap.copyOf(tempBreakers); + } + } + } + + /** + * Validate that child settings are valid + * @throws ElasticsearchIllegalStateException + */ + public static void validateSettings(BreakerSettings[] childrenSettings) throws ElasticsearchIllegalStateException { + for (BreakerSettings childSettings : childrenSettings) { + // If the child is disabled, ignore it + if (childSettings.getLimit() == -1) { + continue; + } + + if (childSettings.getOverhead() < 0) { + throw new ElasticsearchIllegalStateException("Child breaker overhead " + childSettings + " must be non-negative"); + } + } + } + + @Override + public CircuitBreaker getBreaker(CircuitBreaker.Name name) { + return this.breakers.get(name); + } + + @Override + public AllCircuitBreakerStats stats() { + long parentEstimated = 0; + List allStats = newArrayList(); + // Gather the "estimated" count for the parent breaker by adding the + // estimations for each individual breaker + for (CircuitBreaker breaker : this.breakers.values()) { + allStats.add(stats(breaker.getName())); + parentEstimated += breaker.getUsed(); + } + // Manually add the parent breaker settings since they aren't part of the breaker map + allStats.add(new CircuitBreakerStats(CircuitBreaker.Name.PARENT, parentSettings.getLimit(), + parentEstimated, 1.0, parentTripCount.get())); + return new AllCircuitBreakerStats(allStats.toArray(new CircuitBreakerStats[allStats.size()])); + } + + @Override + public CircuitBreakerStats stats(CircuitBreaker.Name name) { + CircuitBreaker breaker = this.breakers.get(name); + return new CircuitBreakerStats(breaker.getName(), breaker.getLimit(), breaker.getUsed(), breaker.getOverhead(), breaker.getTrippedCount()); + } + + /** + * Checks whether the parent breaker has been tripped + * @param label + * @throws CircuitBreakingException + */ + public void checkParentLimit(String label) throws CircuitBreakingException { + long totalUsed = 0; + for (CircuitBreaker breaker : this.breakers.values()) { + totalUsed += (breaker.getUsed() * breaker.getOverhead()); + } + + long parentLimit = this.parentSettings.getLimit(); + if (totalUsed > parentLimit) { + this.parentTripCount.incrementAndGet(); + throw new CircuitBreakingException("[PARENT] Data too large, data for [" + + label + "] would be larger than limit of [" + + parentLimit + "/" + new ByteSizeValue(parentLimit) + "]", + totalUsed, parentLimit); + } + } +} diff --git a/src/main/java/org/elasticsearch/indices/fielddata/breaker/InternalCircuitBreakerService.java b/src/main/java/org/elasticsearch/indices/breaker/InternalCircuitBreakerService.java similarity index 83% rename from src/main/java/org/elasticsearch/indices/fielddata/breaker/InternalCircuitBreakerService.java rename to src/main/java/org/elasticsearch/indices/breaker/InternalCircuitBreakerService.java index 7eabc0c24d84f..c64779c96b468 100644 --- a/src/main/java/org/elasticsearch/indices/fielddata/breaker/InternalCircuitBreakerService.java +++ b/src/main/java/org/elasticsearch/indices/breaker/InternalCircuitBreakerService.java @@ -16,11 +16,10 @@ * specific language governing permissions and limitations * under the License. */ -package org.elasticsearch.indices.fielddata.breaker; +package org.elasticsearch.indices.breaker; -import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.breaker.MemoryCircuitBreaker; -import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; @@ -32,7 +31,7 @@ * that can be used to keep track of memory usage across the node, preventing * actions that could cause an {@link OutOfMemoryError} on the node. */ -public class InternalCircuitBreakerService extends AbstractLifecycleComponent implements CircuitBreakerService { +public class InternalCircuitBreakerService extends CircuitBreakerService { public static final String CIRCUIT_BREAKER_MAX_BYTES_SETTING = "indices.fielddata.breaker.limit"; public static final String CIRCUIT_BREAKER_OVERHEAD_SETTING = "indices.fielddata.breaker.overhead"; @@ -87,7 +86,8 @@ InternalCircuitBreakerService.this.maxBytes, new ByteSizeValue(InternalCircuitBr /** * @return a {@link org.elasticsearch.common.breaker.MemoryCircuitBreaker} that can be used for aggregating memory usage */ - public MemoryCircuitBreaker getBreaker() { + public MemoryCircuitBreaker getBreaker(CircuitBreaker.Name name) { + // Return the only breaker, regardless of name return this.breaker; } @@ -104,19 +104,17 @@ public synchronized void resetBreaker() { } @Override - public FieldDataBreakerStats stats() { - return new FieldDataBreakerStats(breaker.getMaximum(), breaker.getUsed(), breaker.getOverhead(), breaker.getTrippedCount()); + public AllCircuitBreakerStats stats() { + return new AllCircuitBreakerStats(new CircuitBreakerStats[]{ + new CircuitBreakerStats(CircuitBreaker.Name.FIELDDATA, breaker.getLimit(), breaker.getUsed(), + breaker.getOverhead(), breaker.getTrippedCount()) + }); } @Override - protected void doStart() throws ElasticsearchException { - } - - @Override - protected void doStop() throws ElasticsearchException { - } - - @Override - protected void doClose() throws ElasticsearchException { + public CircuitBreakerStats stats(CircuitBreaker.Name name) { + // There is only a single breaker, so always return it + return new CircuitBreakerStats(CircuitBreaker.Name.FIELDDATA, breaker.getLimit(), breaker.getUsed(), + breaker.getOverhead(), breaker.getTrippedCount()); } } diff --git a/src/main/java/org/elasticsearch/indices/fielddata/breaker/NoneCircuitBreakerService.java b/src/main/java/org/elasticsearch/indices/breaker/NoneCircuitBreakerService.java similarity index 65% rename from src/main/java/org/elasticsearch/indices/fielddata/breaker/NoneCircuitBreakerService.java rename to src/main/java/org/elasticsearch/indices/breaker/NoneCircuitBreakerService.java index f5fa7dac8d9f0..c8d76ee9bfd90 100644 --- a/src/main/java/org/elasticsearch/indices/fielddata/breaker/NoneCircuitBreakerService.java +++ b/src/main/java/org/elasticsearch/indices/breaker/NoneCircuitBreakerService.java @@ -17,31 +17,41 @@ * under the License. */ -package org.elasticsearch.indices.fielddata.breaker; +package org.elasticsearch.indices.breaker; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.breaker.MemoryCircuitBreaker; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.unit.ByteSizeValue; /** * Class that returns a breaker that never breaks */ -public class NoneCircuitBreakerService implements CircuitBreakerService { +public class NoneCircuitBreakerService extends CircuitBreakerService { private final ESLogger logger = Loggers.getLogger(NoneCircuitBreakerService.class); private final MemoryCircuitBreaker breaker = new MemoryCircuitBreaker(new ByteSizeValue(Long.MAX_VALUE), 0.0, logger); - public NoneCircuitBreakerService() {} + public NoneCircuitBreakerService() { + super(ImmutableSettings.EMPTY); + } + @Override - public MemoryCircuitBreaker getBreaker() { + public MemoryCircuitBreaker getBreaker(CircuitBreaker.Name name) { return breaker; } @Override - public FieldDataBreakerStats stats() { - return new FieldDataBreakerStats(-1, -1, 0, 0); + public AllCircuitBreakerStats stats() { + return new AllCircuitBreakerStats(new CircuitBreakerStats[] {stats(CircuitBreaker.Name.FIELDDATA)}); + } + + @Override + public CircuitBreakerStats stats(CircuitBreaker.Name name) { + return new CircuitBreakerStats(CircuitBreaker.Name.FIELDDATA, -1, -1, 0, 0); } } diff --git a/src/main/java/org/elasticsearch/indices/fielddata/cache/IndicesFieldDataCacheListener.java b/src/main/java/org/elasticsearch/indices/fielddata/cache/IndicesFieldDataCacheListener.java index 30dcf64ac376e..5bbdc5fa8deba 100644 --- a/src/main/java/org/elasticsearch/indices/fielddata/cache/IndicesFieldDataCacheListener.java +++ b/src/main/java/org/elasticsearch/indices/fielddata/cache/IndicesFieldDataCacheListener.java @@ -20,11 +20,12 @@ package org.elasticsearch.indices.fielddata.cache; import org.apache.lucene.util.Accountable; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.index.fielddata.FieldDataType; import org.elasticsearch.index.fielddata.IndexFieldDataCache; import org.elasticsearch.index.mapper.FieldMapper; -import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService; +import org.elasticsearch.indices.breaker.CircuitBreakerService; /** * A {@link IndexFieldDataCache.Listener} implementation that updates indices (node) level statistics / service about @@ -48,7 +49,7 @@ public void onLoad(FieldMapper.Names fieldNames, FieldDataType fieldDataType, Ac @Override public void onUnload(FieldMapper.Names fieldNames, FieldDataType fieldDataType, boolean wasEvicted, long sizeInBytes) { assert sizeInBytes >= 0 : "When reducing circuit breaker, it should be adjusted with a number higher or equal to 0 and not [" + sizeInBytes + "]"; - circuitBreakerService.getBreaker().addWithoutBreaking(-sizeInBytes); + circuitBreakerService.getBreaker(CircuitBreaker.Name.FIELDDATA).addWithoutBreaking(-sizeInBytes); } } diff --git a/src/main/java/org/elasticsearch/node/internal/InternalNode.java b/src/main/java/org/elasticsearch/node/internal/InternalNode.java index 8668383db03c1..c8a12fa221978 100644 --- a/src/main/java/org/elasticsearch/node/internal/InternalNode.java +++ b/src/main/java/org/elasticsearch/node/internal/InternalNode.java @@ -19,7 +19,6 @@ package org.elasticsearch.node.internal; -import org.apache.lucene.util.IOUtils; import org.elasticsearch.Build; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; @@ -39,7 +38,6 @@ import org.elasticsearch.cluster.action.index.MappingUpdatedAction; import org.elasticsearch.cluster.routing.RoutingService; import org.elasticsearch.cluster.routing.allocation.AllocationService; -import org.elasticsearch.cluster.service.InternalClusterService; import org.elasticsearch.common.StopWatch; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.component.Lifecycle; @@ -49,7 +47,6 @@ import org.elasticsearch.common.inject.Injectors; import org.elasticsearch.common.inject.ModulesBuilder; import org.elasticsearch.common.io.CachedStreams; -import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.Loggers; @@ -72,6 +69,7 @@ import org.elasticsearch.index.search.shape.ShapeModule; import org.elasticsearch.indices.IndicesModule; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.indices.breaker.CircuitBreakerModule; import org.elasticsearch.indices.cache.filter.IndicesFilterCache; import org.elasticsearch.indices.cluster.IndicesClusterStateService; import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache; @@ -162,6 +160,7 @@ public InternalNode(Settings pSettings, boolean loadConfigSettings) throws Elast modules.add(new Version.Module(version)); modules.add(new CacheRecyclerModule(settings)); modules.add(new PageCacheRecyclerModule(settings)); + modules.add(new CircuitBreakerModule(settings)); modules.add(new BigArraysModule(settings)); modules.add(new PluginsModule(settings, pluginsService)); modules.add(new SettingsModule(settings)); diff --git a/src/main/java/org/elasticsearch/node/service/NodeService.java b/src/main/java/org/elasticsearch/node/service/NodeService.java index b498a013e686c..e08aae33aff04 100644 --- a/src/main/java/org/elasticsearch/node/service/NodeService.java +++ b/src/main/java/org/elasticsearch/node/service/NodeService.java @@ -33,7 +33,7 @@ import org.elasticsearch.discovery.Discovery; import org.elasticsearch.http.HttpServer; import org.elasticsearch.indices.IndicesService; -import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService; +import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.monitor.MonitorService; import org.elasticsearch.plugins.PluginsService; import org.elasticsearch.threadpool.ThreadPool; diff --git a/src/main/java/org/elasticsearch/percolator/PercolateContext.java b/src/main/java/org/elasticsearch/percolator/PercolateContext.java index 44fcde3e1ea01..701e0648ffb2e 100644 --- a/src/main/java/org/elasticsearch/percolator/PercolateContext.java +++ b/src/main/java/org/elasticsearch/percolator/PercolateContext.java @@ -122,7 +122,7 @@ public PercolateContext(PercolateShardRequest request, SearchShardTarget searchS this.types = new String[]{request.documentType()}; this.cacheRecycler = cacheRecycler; this.pageCacheRecycler = pageCacheRecycler; - this.bigArrays = bigArrays; + this.bigArrays = bigArrays.withCircuitBreaking(); this.querySearchResult = new QuerySearchResult(0, searchShardTarget); this.engineSearcher = indexShard.acquireSearcher("percolate"); this.searcher = new ContextIndexSearcher(this, engineSearcher); diff --git a/src/main/java/org/elasticsearch/search/internal/DefaultSearchContext.java b/src/main/java/org/elasticsearch/search/internal/DefaultSearchContext.java index ce7a93c145e33..94dd0b7d81cb4 100644 --- a/src/main/java/org/elasticsearch/search/internal/DefaultSearchContext.java +++ b/src/main/java/org/elasticsearch/search/internal/DefaultSearchContext.java @@ -193,7 +193,8 @@ public DefaultSearchContext(long id, ShardSearchRequest request, SearchShardTarg this.scriptService = scriptService; this.cacheRecycler = cacheRecycler; this.pageCacheRecycler = pageCacheRecycler; - this.bigArrays = bigArrays; + // SearchContexts use a BigArrays that can circuit break + this.bigArrays = bigArrays.withCircuitBreaking(); this.dfsResult = new DfsSearchResult(id, shardTarget); this.queryResult = new QuerySearchResult(id, shardTarget); this.fetchResult = new FetchSearchResult(id, shardTarget); diff --git a/src/test/java/org/elasticsearch/benchmark/fielddata/LongFieldDataBenchmark.java b/src/test/java/org/elasticsearch/benchmark/fielddata/LongFieldDataBenchmark.java index 9e5ca40f8ea36..da57aa8867ae2 100644 --- a/src/test/java/org/elasticsearch/benchmark/fielddata/LongFieldDataBenchmark.java +++ b/src/test/java/org/elasticsearch/benchmark/fielddata/LongFieldDataBenchmark.java @@ -37,7 +37,7 @@ import org.elasticsearch.index.mapper.ContentPath; import org.elasticsearch.index.mapper.Mapper.BuilderContext; import org.elasticsearch.index.mapper.core.LongFieldMapper; -import org.elasticsearch.indices.fielddata.breaker.NoneCircuitBreakerService; +import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import java.util.Random; diff --git a/src/test/java/org/elasticsearch/common/breaker/MemoryCircuitBreakerTests.java b/src/test/java/org/elasticsearch/common/breaker/MemoryCircuitBreakerTests.java index cf65951049dd2..763b7b5173ee6 100644 --- a/src/test/java/org/elasticsearch/common/breaker/MemoryCircuitBreakerTests.java +++ b/src/test/java/org/elasticsearch/common/breaker/MemoryCircuitBreakerTests.java @@ -19,11 +19,17 @@ package org.elasticsearch.common.breaker; +import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.indices.breaker.BreakerSettings; +import org.elasticsearch.indices.breaker.CircuitBreakerService; +import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService; +import org.elasticsearch.node.settings.NodeSettingsService; import org.elasticsearch.test.ElasticsearchTestCase; import org.junit.Test; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import static org.hamcrest.Matchers.equalTo; @@ -75,6 +81,127 @@ public void run() { assertThat("breaker was tripped exactly once", breaker.getTrippedCount(), equalTo(1L)); } + @Test + public void testThreadedUpdatesToChildBreaker() throws Exception { + final int NUM_THREADS = 5; + final int BYTES_PER_THREAD = 1000; + final Thread[] threads = new Thread[NUM_THREADS]; + final AtomicBoolean tripped = new AtomicBoolean(false); + final AtomicReference lastException = new AtomicReference<>(null); + + final AtomicReference breakerRef = new AtomicReference<>(null); + final CircuitBreakerService service = new HierarchyCircuitBreakerService(ImmutableSettings.EMPTY, new NodeSettingsService(ImmutableSettings.EMPTY)) { + + @Override + public CircuitBreaker getBreaker(CircuitBreaker.Name type) { + return breakerRef.get(); + } + + @Override + public void checkParentLimit(String label) throws CircuitBreakingException { + // never trip + } + }; + final BreakerSettings settings = new BreakerSettings(CircuitBreaker.Name.REQUEST, (BYTES_PER_THREAD * NUM_THREADS) - 1, 1.0); + final ChildMemoryCircuitBreaker breaker = new ChildMemoryCircuitBreaker(settings, logger, + (HierarchyCircuitBreakerService)service, CircuitBreaker.Name.REQUEST); + breakerRef.set(breaker); + + for (int i = 0; i < NUM_THREADS; i++) { + threads[i] = new Thread(new Runnable() { + @Override + public void run() { + for (int j = 0; j < BYTES_PER_THREAD; j++) { + try { + breaker.addEstimateBytesAndMaybeBreak(1L, "test"); + } catch (CircuitBreakingException e) { + if (tripped.get()) { + assertThat("tripped too many times", true, equalTo(false)); + } else { + assertThat(tripped.compareAndSet(false, true), equalTo(true)); + } + } catch (Throwable e2) { + lastException.set(e2); + } + } + } + }); + + threads[i].start(); + } + + for (Thread t : threads) { + t.join(); + } + + assertThat("no other exceptions were thrown", lastException.get(), equalTo(null)); + assertThat("breaker was tripped exactly once", tripped.get(), equalTo(true)); + assertThat("breaker was tripped exactly once", breaker.getTrippedCount(), equalTo(1L)); + } + + @Test + public void testThreadedUpdatesToChildBreakerWithParentLimit() throws Exception { + final int NUM_THREADS = 5; + final int BYTES_PER_THREAD = 1000; + final Thread[] threads = new Thread[NUM_THREADS]; + final AtomicInteger tripped = new AtomicInteger(0); + final AtomicReference lastException = new AtomicReference<>(null); + + final AtomicInteger parentTripped = new AtomicInteger(0); + final AtomicReference breakerRef = new AtomicReference<>(null); + final CircuitBreakerService service = new HierarchyCircuitBreakerService(ImmutableSettings.EMPTY, new NodeSettingsService(ImmutableSettings.EMPTY)) { + + @Override + public CircuitBreaker getBreaker(CircuitBreaker.Name type) { + return breakerRef.get(); + } + + @Override + public void checkParentLimit(String label) throws CircuitBreakingException { + // Parent will trip right before regular breaker would trip + if (getBreaker(CircuitBreaker.Name.REQUEST).getUsed() > (BYTES_PER_THREAD * NUM_THREADS) - 2) { + parentTripped.incrementAndGet(); + throw new CircuitBreakingException("parent tripped"); + } + } + }; + final BreakerSettings settings = new BreakerSettings(CircuitBreaker.Name.REQUEST, (BYTES_PER_THREAD * NUM_THREADS) - 1, 1.0); + final ChildMemoryCircuitBreaker breaker = new ChildMemoryCircuitBreaker(settings, logger, + (HierarchyCircuitBreakerService)service, CircuitBreaker.Name.REQUEST); + breakerRef.set(breaker); + + for (int i = 0; i < NUM_THREADS; i++) { + threads[i] = new Thread(new Runnable() { + @Override + public void run() { + for (int j = 0; j < BYTES_PER_THREAD; j++) { + try { + breaker.addEstimateBytesAndMaybeBreak(1L, "test"); + } catch (CircuitBreakingException e) { + tripped.incrementAndGet(); + if (tripped.get() > 2) { + assertThat("tripped too many times: " + tripped.get(), true, equalTo(false)); + } + } catch (Throwable e2) { + lastException.set(e2); + } + } + } + }); + + threads[i].start(); + } + + for (Thread t : threads) { + t.join(); + } + + assertThat("no other exceptions were thrown", lastException.get(), equalTo(null)); + assertThat("breaker was tripped exactly once", breaker.getTrippedCount(), equalTo(0L)); + assertThat("parent breaker was tripped exactly twice", parentTripped.get(), equalTo(2)); + assertThat("total breaker was tripped exactly twice", tripped.get(), equalTo(2)); + } + @Test public void testConstantFactor() throws Exception { final MemoryCircuitBreaker breaker = new MemoryCircuitBreaker(new ByteSizeValue(15), 1.6, logger); diff --git a/src/test/java/org/elasticsearch/common/bytes/PagedBytesReferenceTest.java b/src/test/java/org/elasticsearch/common/bytes/PagedBytesReferenceTest.java index becce28b85f6a..0b5fe1d6743cc 100644 --- a/src/test/java/org/elasticsearch/common/bytes/PagedBytesReferenceTest.java +++ b/src/test/java/org/elasticsearch/common/bytes/PagedBytesReferenceTest.java @@ -27,6 +27,7 @@ import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.ByteArray; +import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.test.ElasticsearchTestCase; import org.hamcrest.Matchers; import org.jboss.netty.buffer.ChannelBuffer; @@ -49,7 +50,7 @@ public class PagedBytesReferenceTest extends ElasticsearchTestCase { @Before public void setUp() throws Exception { super.setUp(); - bigarrays = new BigArrays(ImmutableSettings.EMPTY, null); + bigarrays = new BigArrays(ImmutableSettings.EMPTY, null, new NoneCircuitBreakerService()); } @After diff --git a/src/test/java/org/elasticsearch/common/util/BigArraysTests.java b/src/test/java/org/elasticsearch/common/util/BigArraysTests.java index 89c5fe816b32f..bcf7e4a5dc5a6 100644 --- a/src/test/java/org/elasticsearch/common/util/BigArraysTests.java +++ b/src/test/java/org/elasticsearch/common/util/BigArraysTests.java @@ -20,9 +20,13 @@ package org.elasticsearch.common.util; import org.apache.lucene.util.BytesRef; -import org.elasticsearch.ElasticsearchIllegalStateException; import org.elasticsearch.cache.recycler.PageCacheRecycler; +import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.breaker.CircuitBreakingException; import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService; +import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; +import org.elasticsearch.node.settings.NodeSettingsService; import org.elasticsearch.test.ElasticsearchTestCase; import org.elasticsearch.test.cache.recycler.MockBigArrays; import org.elasticsearch.test.cache.recycler.MockPageCacheRecycler; @@ -37,7 +41,7 @@ public class BigArraysTests extends ElasticsearchTestCase { public static BigArrays randombigArrays() { final PageCacheRecycler recycler = randomBoolean() ? null : new MockPageCacheRecycler(ImmutableSettings.EMPTY, new ThreadPool("BigArraysTests")); - return new MockBigArrays(ImmutableSettings.EMPTY, recycler); + return new MockBigArrays(ImmutableSettings.EMPTY, recycler, new NoneCircuitBreakerService()); } private BigArrays bigArrays; @@ -283,7 +287,7 @@ public void testByteArrayEquals() { // not equal: contents differ final ByteArray a3 = byteArrayWithBytes(new byte[]{1,2,3}); - final ByteArray a4 = byteArrayWithBytes(new byte[]{1,1,3}); + final ByteArray a4 = byteArrayWithBytes(new byte[]{1, 1, 3}); assertFalse(bigArrays.equals(a3, a4)); a3.close(); a4.close(); @@ -329,58 +333,51 @@ private ByteArray byteArrayWithBytes(byte[] bytes) { return bytearray; } - public void testByteAccounting() throws Exception { - for (String type : Arrays.asList("Byte", "Int", "Long", "Float", "Double", "Object")) { - BigArrays bigArrays = new BigArrays(ImmutableSettings.builder().put(BigArrays.MAX_SIZE_IN_BYTES_SETTING, Long.MAX_VALUE).build(), null); - Method create = BigArrays.class.getMethod("new" + type + "Array", long.class); - final int size = scaledRandomIntBetween(5, 1 << 16); - BigArray array = (BigArray) create.invoke(bigArrays, size); - assertEquals(array.ramBytesUsed(), bigArrays.sizeInBytes()); - Method resize = BigArrays.class.getMethod("resize", array.getClass().getInterfaces()[0], long.class); - int newSize = scaledRandomIntBetween(5, 1 << 16); - array = (BigArray) resize.invoke(bigArrays, array, newSize); - assertEquals(array.ramBytesUsed(), bigArrays.sizeInBytes()); - array.close(); - assertEquals(0, bigArrays.sizeInBytes()); - } - } - public void testMaxSizeExceededOnNew() throws Exception { final int size = scaledRandomIntBetween(5, 1 << 22); for (String type : Arrays.asList("Byte", "Int", "Long", "Float", "Double", "Object")) { - BigArrays bigArrays = new BigArrays(ImmutableSettings.builder().put(BigArrays.MAX_SIZE_IN_BYTES_SETTING, randomIntBetween(1, size)).build(), null); + HierarchyCircuitBreakerService hcbs = new HierarchyCircuitBreakerService( + ImmutableSettings.builder() + .put(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING, size) + .build(), + new NodeSettingsService(ImmutableSettings.EMPTY)); + BigArrays bigArrays = new BigArrays(ImmutableSettings.EMPTY, null, hcbs).withCircuitBreaking(); Method create = BigArrays.class.getMethod("new" + type + "Array", long.class); try { create.invoke(bigArrays, size); fail("expected an exception on " + create); } catch (InvocationTargetException e) { - assertTrue(e.getCause() instanceof ElasticsearchIllegalStateException); + assertTrue(e.getCause() instanceof CircuitBreakingException); } - assertEquals(0, bigArrays.sizeInBytes()); + assertEquals(0, hcbs.getBreaker(CircuitBreaker.Name.REQUEST).getUsed()); } } public void testMaxSizeExceededOnResize() throws Exception { for (String type : Arrays.asList("Byte", "Int", "Long", "Float", "Double", "Object")) { final long maxSize = randomIntBetween(1 << 10, 1 << 22); - BigArrays bigArrays = new BigArrays(ImmutableSettings.builder().put(BigArrays.MAX_SIZE_IN_BYTES_SETTING, maxSize).build(), null); + HierarchyCircuitBreakerService hcbs = new HierarchyCircuitBreakerService( + ImmutableSettings.builder() + .put(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING, maxSize) + .build(), + new NodeSettingsService(ImmutableSettings.EMPTY)); + BigArrays bigArrays = new BigArrays(ImmutableSettings.EMPTY, null, hcbs).withCircuitBreaking(); Method create = BigArrays.class.getMethod("new" + type + "Array", long.class); final int size = scaledRandomIntBetween(1, 20); BigArray array = (BigArray) create.invoke(bigArrays, size); Method resize = BigArrays.class.getMethod("resize", array.getClass().getInterfaces()[0], long.class); while (true) { long newSize = array.size() * 2; - assertEquals(array.ramBytesUsed(), bigArrays.sizeInBytes()); try { array = (BigArray) resize.invoke(bigArrays, array, newSize); } catch (InvocationTargetException e) { - assertTrue(e.getCause() instanceof ElasticsearchIllegalStateException); + assertTrue(e.getCause() instanceof CircuitBreakingException); break; } } - assertEquals(array.ramBytesUsed(), bigArrays.sizeInBytes()); + assertEquals(array.ramBytesUsed(), hcbs.getBreaker(CircuitBreaker.Name.REQUEST).getUsed()); array.close(); - assertEquals(0, bigArrays.sizeInBytes()); + assertEquals(0, hcbs.getBreaker(CircuitBreaker.Name.REQUEST).getUsed()); } } diff --git a/src/test/java/org/elasticsearch/index/query/TemplateQueryParserTest.java b/src/test/java/org/elasticsearch/index/query/TemplateQueryParserTest.java index 58458c89c1038..508f69f6718eb 100644 --- a/src/test/java/org/elasticsearch/index/query/TemplateQueryParserTest.java +++ b/src/test/java/org/elasticsearch/index/query/TemplateQueryParserTest.java @@ -42,8 +42,8 @@ import org.elasticsearch.index.query.functionscore.FunctionScoreModule; import org.elasticsearch.index.settings.IndexSettingsModule; import org.elasticsearch.index.similarity.SimilarityModule; -import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService; -import org.elasticsearch.indices.fielddata.breaker.NoneCircuitBreakerService; +import org.elasticsearch.indices.breaker.CircuitBreakerService; +import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.indices.query.IndicesQueriesModule; import org.elasticsearch.script.ScriptModule; import org.elasticsearch.test.ElasticsearchTestCase; diff --git a/src/test/java/org/elasticsearch/index/query/plugin/IndexQueryParserPlugin2Tests.java b/src/test/java/org/elasticsearch/index/query/plugin/IndexQueryParserPlugin2Tests.java index 93643b9b0edb7..ef2c94f9f2a63 100644 --- a/src/test/java/org/elasticsearch/index/query/plugin/IndexQueryParserPlugin2Tests.java +++ b/src/test/java/org/elasticsearch/index/query/plugin/IndexQueryParserPlugin2Tests.java @@ -39,10 +39,8 @@ import org.elasticsearch.index.query.functionscore.FunctionScoreModule; import org.elasticsearch.index.settings.IndexSettingsModule; import org.elasticsearch.index.similarity.SimilarityModule; -import org.elasticsearch.indices.fielddata.breaker.NoneCircuitBreakerService; -import org.elasticsearch.indices.query.IndicesQueriesModule; -import org.elasticsearch.script.ScriptModule; -import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService; +import org.elasticsearch.indices.breaker.CircuitBreakerService; +import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.indices.query.IndicesQueriesModule; import org.elasticsearch.script.ScriptModule; import org.elasticsearch.test.ElasticsearchTestCase; @@ -101,4 +99,4 @@ protected void configure() { injector.getInstance(ThreadPool.class).shutdownNow(); } -} \ No newline at end of file +} diff --git a/src/test/java/org/elasticsearch/index/query/plugin/IndexQueryParserPluginTests.java b/src/test/java/org/elasticsearch/index/query/plugin/IndexQueryParserPluginTests.java index 0e3b322e9f995..b6265f9633d60 100644 --- a/src/test/java/org/elasticsearch/index/query/plugin/IndexQueryParserPluginTests.java +++ b/src/test/java/org/elasticsearch/index/query/plugin/IndexQueryParserPluginTests.java @@ -39,10 +39,8 @@ import org.elasticsearch.index.query.functionscore.FunctionScoreModule; import org.elasticsearch.index.settings.IndexSettingsModule; import org.elasticsearch.index.similarity.SimilarityModule; -import org.elasticsearch.indices.fielddata.breaker.NoneCircuitBreakerService; -import org.elasticsearch.indices.query.IndicesQueriesModule; -import org.elasticsearch.script.ScriptModule; -import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService; +import org.elasticsearch.indices.breaker.CircuitBreakerService; +import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.indices.query.IndicesQueriesModule; import org.elasticsearch.script.ScriptModule; import org.elasticsearch.test.ElasticsearchTestCase; @@ -110,4 +108,4 @@ protected void configure() { injector.getInstance(ThreadPool.class).shutdownNow(); } -} \ No newline at end of file +} diff --git a/src/test/java/org/elasticsearch/indices/fielddata/breaker/CircuitBreakerServiceTests.java b/src/test/java/org/elasticsearch/indices/fielddata/breaker/CircuitBreakerServiceTests.java deleted file mode 100644 index 76da699fc1668..0000000000000 --- a/src/test/java/org/elasticsearch/indices/fielddata/breaker/CircuitBreakerServiceTests.java +++ /dev/null @@ -1,169 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.indices.fielddata.breaker; - -import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; -import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; -import org.elasticsearch.client.Client; -import org.elasticsearch.common.collect.MapBuilder; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.rest.RestStatus; -import org.elasticsearch.test.ElasticsearchIntegrationTest; -import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope; -import org.elasticsearch.test.junit.annotations.TestLogging; -import org.junit.Test; - -import java.util.Arrays; - -import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; -import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; -import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope.TEST; -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertFailures; -import static org.hamcrest.CoreMatchers.containsString; -import static org.hamcrest.Matchers.greaterThanOrEqualTo; - -/** - * Integration tests for InternalCircuitBreakerService - */ -@ClusterScope(scope = TEST, randomDynamicTemplates = false) -public class CircuitBreakerServiceTests extends ElasticsearchIntegrationTest { - - private String randomRidiculouslySmallLimit() { - // 3 different ways to say 100 bytes - return randomFrom(Arrays.asList("100b", "100")); - //, (10000. / JvmInfo.jvmInfo().getMem().getHeapMax().bytes()) + "%")); // this is prone to rounding errors and will fail if JVM memory changes! - } - - @Test - @TestLogging("org.elasticsearch.indices.fielddata.breaker:TRACE,org.elasticsearch.index.fielddata:TRACE,org.elasticsearch.common.breaker:TRACE") - public void testMemoryBreaker() { - assertAcked(prepareCreate("cb-test", 1, settingsBuilder().put(SETTING_NUMBER_OF_REPLICAS, between(0, 1)))); - final Client client = client(); - - try { - // index some different terms so we have some field data for loading - int docCount = scaledRandomIntBetween(300, 1000); - for (long id = 0; id < docCount; id++) { - client.prepareIndex("cb-test", "type", Long.toString(id)) - .setSource(MapBuilder.newMapBuilder().put("test", "value" + id).map()).execute().actionGet(); - } - - // refresh - refresh(); - - // execute a search that loads field data (sorting on the "test" field) - client.prepareSearch("cb-test").setSource("{\"sort\": \"test\",\"query\":{\"match_all\":{}}}") - .execute().actionGet(); - - // clear field data cache (thus setting the loaded field data back to 0) - client.admin().indices().prepareClearCache("cb-test").setFieldDataCache(true).execute().actionGet(); - - // Update circuit breaker settings - Settings settings = settingsBuilder() - .put(InternalCircuitBreakerService.CIRCUIT_BREAKER_MAX_BYTES_SETTING, randomRidiculouslySmallLimit()) - .put(InternalCircuitBreakerService.CIRCUIT_BREAKER_OVERHEAD_SETTING, 1.05) - .build(); - client.admin().cluster().prepareUpdateSettings().setTransientSettings(settings).execute().actionGet(); - - // execute a search that loads field data (sorting on the "test" field) - // again, this time it should trip the breaker - assertFailures(client.prepareSearch("cb-test").setSource("{\"sort\": \"test\",\"query\":{\"match_all\":{}}}"), - RestStatus.INTERNAL_SERVER_ERROR, - containsString("Data too large, data for field [test] would be larger than limit of [100/100b]")); - - NodesStatsResponse stats = client.admin().cluster().prepareNodesStats().setBreaker(true).get(); - int breaks = 0; - for (NodeStats stat : stats.getNodes()) { - FieldDataBreakerStats breakerStats = stat.getBreaker(); - breaks += breakerStats.getTrippedCount(); - } - assertThat(breaks, greaterThanOrEqualTo(1)); - } finally { - // Reset settings - Settings resetSettings = settingsBuilder() - .put(InternalCircuitBreakerService.CIRCUIT_BREAKER_MAX_BYTES_SETTING, "-1") - .put(InternalCircuitBreakerService.CIRCUIT_BREAKER_OVERHEAD_SETTING, InternalCircuitBreakerService.DEFAULT_OVERHEAD_CONSTANT) - .build(); - client.admin().cluster().prepareUpdateSettings().setTransientSettings(resetSettings).execute().actionGet(); - } - } - - @Test - @TestLogging("org.elasticsearch.indices.fielddata.breaker:TRACE,org.elasticsearch.index.fielddata:TRACE,org.elasticsearch.common.breaker:TRACE") - public void testRamAccountingTermsEnum() { - final Client client = client(); - - try { - // Create an index where the mappings have a field data filter - assertAcked(prepareCreate("ramtest").setSource("{\"mappings\": {\"type\": {\"properties\": {\"test\": " + - "{\"type\": \"string\",\"fielddata\": {\"filter\": {\"regex\": {\"pattern\": \"^value.*\"}}}}}}}}")); - - // Wait 10 seconds for green - client.admin().cluster().prepareHealth("ramtest").setWaitForGreenStatus().setTimeout("10s").execute().actionGet(); - - // index some different terms so we have some field data for loading - int docCount = scaledRandomIntBetween(300, 1000); - for (long id = 0; id < docCount; id++) { - client.prepareIndex("ramtest", "type", Long.toString(id)) - .setSource(MapBuilder.newMapBuilder().put("test", "value" + id).map()).execute().actionGet(); - } - - // refresh - refresh(); - - // execute a search that loads field data (sorting on the "test" field) - client.prepareSearch("ramtest").setSource("{\"sort\": \"test\",\"query\":{\"match_all\":{}}}") - .execute().actionGet(); - - // clear field data cache (thus setting the loaded field data back to 0) - client.admin().indices().prepareClearCache("ramtest").setFieldDataCache(true).execute().actionGet(); - - // Update circuit breaker settings - Settings settings = settingsBuilder() - .put(InternalCircuitBreakerService.CIRCUIT_BREAKER_MAX_BYTES_SETTING, randomRidiculouslySmallLimit()) - .put(InternalCircuitBreakerService.CIRCUIT_BREAKER_OVERHEAD_SETTING, 1.05) - .build(); - client.admin().cluster().prepareUpdateSettings().setTransientSettings(settings).execute().actionGet(); - - // execute a search that loads field data (sorting on the "test" field) - // again, this time it should trip the breaker - assertFailures(client.prepareSearch("ramtest").setSource("{\"sort\": \"test\",\"query\":{\"match_all\":{}}}"), - RestStatus.INTERNAL_SERVER_ERROR, - containsString("Data too large, data for field [test] would be larger than limit of [100/100b]")); - - NodesStatsResponse stats = client.admin().cluster().prepareNodesStats().setBreaker(true).get(); - int breaks = 0; - for (NodeStats stat : stats.getNodes()) { - FieldDataBreakerStats breakerStats = stat.getBreaker(); - breaks += breakerStats.getTrippedCount(); - } - assertThat(breaks, greaterThanOrEqualTo(1)); - - } finally { - // Reset settings - Settings resetSettings = settingsBuilder() - .put(InternalCircuitBreakerService.CIRCUIT_BREAKER_MAX_BYTES_SETTING, "-1") - .put(InternalCircuitBreakerService.CIRCUIT_BREAKER_OVERHEAD_SETTING, InternalCircuitBreakerService.DEFAULT_OVERHEAD_CONSTANT) - .build(); - client.admin().cluster().prepareUpdateSettings().setTransientSettings(resetSettings).execute().actionGet(); - } - } -} diff --git a/src/test/java/org/elasticsearch/indices/memory/breaker/CircuitBreakerServiceTests.java b/src/test/java/org/elasticsearch/indices/memory/breaker/CircuitBreakerServiceTests.java new file mode 100644 index 0000000000000..40aaf2899bdb7 --- /dev/null +++ b/src/test/java/org/elasticsearch/indices/memory/breaker/CircuitBreakerServiceTests.java @@ -0,0 +1,288 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.indices.memory.breaker; + +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; +import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.collect.MapBuilder; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.indices.breaker.CircuitBreakerStats; +import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.search.sort.SortOrder; +import org.elasticsearch.test.ElasticsearchIntegrationTest; +import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope; +import org.elasticsearch.test.junit.annotations.TestLogging; +import org.junit.Test; + +import java.util.Arrays; + +import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; +import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; +import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; +import static org.elasticsearch.search.aggregations.AggregationBuilders.cardinality; +import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope.TEST; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertFailures; +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.Matchers.*; + +/** + * Integration tests for InternalCircuitBreakerService + */ +@ClusterScope(scope = TEST, randomDynamicTemplates = false) +public class CircuitBreakerServiceTests extends ElasticsearchIntegrationTest { + + private String randomRidiculouslySmallLimit() { + // 3 different ways to say 100 bytes + return randomFrom(Arrays.asList("100b", "100")); + //, (10000. / JvmInfo.jvmInfo().getMem().getHeapMax().bytes()) + "%")); // this is prone to rounding errors and will fail if JVM memory changes! + } + + @Test + @TestLogging("org.elasticsearch.indices.memory.breaker:TRACE,org.elasticsearch.index.fielddata:TRACE,org.elasticsearch.common.breaker:TRACE") + public void testMemoryBreaker() { + assertAcked(prepareCreate("cb-test", 1, settingsBuilder().put(SETTING_NUMBER_OF_REPLICAS, between(0, 1)))); + final Client client = client(); + + try { + // index some different terms so we have some field data for loading + int docCount = scaledRandomIntBetween(300, 1000); + for (long id = 0; id < docCount; id++) { + client.prepareIndex("cb-test", "type", Long.toString(id)) + .setSource(MapBuilder.newMapBuilder().put("test", "value" + id).map()).execute().actionGet(); + } + + // refresh + refresh(); + + // execute a search that loads field data (sorting on the "test" field) + client.prepareSearch("cb-test").setSource("{\"sort\": \"test\",\"query\":{\"match_all\":{}}}") + .execute().actionGet(); + + // clear field data cache (thus setting the loaded field data back to 0) + client.admin().indices().prepareClearCache("cb-test").setFieldDataCache(true).execute().actionGet(); + + // Update circuit breaker settings + Settings settings = settingsBuilder() + .put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING, randomRidiculouslySmallLimit()) + .put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING, 1.05) + .build(); + client.admin().cluster().prepareUpdateSettings().setTransientSettings(settings).execute().actionGet(); + + // execute a search that loads field data (sorting on the "test" field) + // again, this time it should trip the breaker + assertFailures(client.prepareSearch("cb-test").setSource("{\"sort\": \"test\",\"query\":{\"match_all\":{}}}"), + RestStatus.INTERNAL_SERVER_ERROR, + containsString("Data too large, data for [test] would be larger than limit of [100/100b]")); + + NodesStatsResponse stats = client.admin().cluster().prepareNodesStats().setBreaker(true).get(); + int breaks = 0; + for (NodeStats stat : stats.getNodes()) { + CircuitBreakerStats breakerStats = stat.getBreaker().getStats(CircuitBreaker.Name.FIELDDATA); + breaks += breakerStats.getTrippedCount(); + } + assertThat(breaks, greaterThanOrEqualTo(1)); + } finally { + // Reset settings + Settings resetSettings = settingsBuilder() + .put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING, "-1") + .put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING, + HierarchyCircuitBreakerService.DEFAULT_FIELDDATA_OVERHEAD_CONSTANT) + .build(); + client.admin().cluster().prepareUpdateSettings().setTransientSettings(resetSettings).execute().actionGet(); + } + } + + @Test + @TestLogging("org.elasticsearch.indices.memory.breaker:TRACE,org.elasticsearch.index.fielddata:TRACE,org.elasticsearch.common.breaker:TRACE") + public void testRamAccountingTermsEnum() { + final Client client = client(); + + try { + // Create an index where the mappings have a field data filter + assertAcked(prepareCreate("ramtest").setSource("{\"mappings\": {\"type\": {\"properties\": {\"test\": " + + "{\"type\": \"string\",\"fielddata\": {\"filter\": {\"regex\": {\"pattern\": \"^value.*\"}}}}}}}}")); + + // Wait 10 seconds for green + client.admin().cluster().prepareHealth("ramtest").setWaitForGreenStatus().setTimeout("10s").execute().actionGet(); + + // index some different terms so we have some field data for loading + int docCount = scaledRandomIntBetween(300, 1000); + for (long id = 0; id < docCount; id++) { + client.prepareIndex("ramtest", "type", Long.toString(id)) + .setSource(MapBuilder.newMapBuilder().put("test", "value" + id).map()).execute().actionGet(); + } + + // refresh + refresh(); + + // execute a search that loads field data (sorting on the "test" field) + client.prepareSearch("ramtest").setSource("{\"sort\": \"test\",\"query\":{\"match_all\":{}}}") + .execute().actionGet(); + + // clear field data cache (thus setting the loaded field data back to 0) + client.admin().indices().prepareClearCache("ramtest").setFieldDataCache(true).execute().actionGet(); + + // Update circuit breaker settings + Settings settings = settingsBuilder() + .put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING, randomRidiculouslySmallLimit()) + .put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING, 1.05) + .build(); + client.admin().cluster().prepareUpdateSettings().setTransientSettings(settings).execute().actionGet(); + + // execute a search that loads field data (sorting on the "test" field) + // again, this time it should trip the breaker + assertFailures(client.prepareSearch("ramtest").setSource("{\"sort\": \"test\",\"query\":{\"match_all\":{}}}"), + RestStatus.INTERNAL_SERVER_ERROR, + containsString("Data too large, data for [test] would be larger than limit of [100/100b]")); + + NodesStatsResponse stats = client.admin().cluster().prepareNodesStats().setBreaker(true).get(); + int breaks = 0; + for (NodeStats stat : stats.getNodes()) { + CircuitBreakerStats breakerStats = stat.getBreaker().getStats(CircuitBreaker.Name.FIELDDATA); + breaks += breakerStats.getTrippedCount(); + } + assertThat(breaks, greaterThanOrEqualTo(1)); + + } finally { + // Reset settings + Settings resetSettings = settingsBuilder() + .put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING, "-1") + .put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING, + HierarchyCircuitBreakerService.DEFAULT_FIELDDATA_OVERHEAD_CONSTANT) + .build(); + client.admin().cluster().prepareUpdateSettings().setTransientSettings(resetSettings).execute().actionGet(); + } + } + + /** + * Test that a breaker correctly redistributes to a different breaker, in + * this case, the fielddata breaker borrows space from the request breaker + */ + @Test + @TestLogging("org.elasticsearch.indices.memory.breaker:TRACE,org.elasticsearch.index.fielddata:TRACE,org.elasticsearch.common.breaker:TRACE") + public void testParentChecking() { + assertAcked(prepareCreate("cb-test", 1, settingsBuilder().put(SETTING_NUMBER_OF_REPLICAS, between(0, 1)))); + Client client = client(); + + try { + // index some different terms so we have some field data for loading + int docCount = scaledRandomIntBetween(300, 1000); + for (long id = 0; id < docCount; id++) { + client.prepareIndex("cb-test", "type", Long.toString(id)) + .setSource(MapBuilder.newMapBuilder().put("test", "value" + id).map()).execute().actionGet(); + } + refresh(); + + // We need the request limit beforehand, just from a single node because the limit should always be the same + long beforeReqLimit = client.admin().cluster().prepareNodesStats().setBreaker(true).get() + .getNodes()[0].getBreaker().getStats(CircuitBreaker.Name.REQUEST).getLimit(); + + Settings resetSettings = settingsBuilder() + .put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING, "10b") + .put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING, 1.0) + .build(); + client.admin().cluster().prepareUpdateSettings().setTransientSettings(resetSettings).execute().actionGet(); + + // Perform a search to load field data for the "test" field + try { + client.prepareSearch("cb-test").setQuery(matchAllQuery()).addSort("test", SortOrder.DESC).get(); + } catch (Exception e) { + String errMsg = "[FIELDDATA] Data too large, data for [test] would be larger than limit of [10/10b]"; + assertThat("Exception: " + ExceptionsHelper.unwrapCause(e) + " should contain a CircuitBreakingException", + ExceptionsHelper.unwrapCause(e).getMessage().contains(errMsg), equalTo(true)); + } + + assertFailures(client.prepareSearch("cb-test").setSource("{\"sort\": \"test\",\"query\":{\"match_all\":{}}}"), + RestStatus.INTERNAL_SERVER_ERROR, + containsString("Data too large, data for [test] would be larger than limit of [10/10b]")); + + // Adjust settings so the parent breaker will fail, but the fielddata breaker doesn't + resetSettings = settingsBuilder() + .put(HierarchyCircuitBreakerService.TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING, "15b") + .put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING, "90%") + .put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING, 1.0) + .build(); + client.admin().cluster().prepareUpdateSettings().setTransientSettings(resetSettings).execute().actionGet(); + + // Perform a search to load field data for the "test" field + try { + client.prepareSearch("cb-test").setQuery(matchAllQuery()).addSort("test", SortOrder.DESC).get(); + } catch (Exception e) { + String errMsg = "[PARENT] Data too large, data for [test] would be larger than limit of [15/15b]"; + assertThat("Exception: " + ExceptionsHelper.unwrapCause(e) + " should contain a CircuitBreakingException", + ExceptionsHelper.unwrapCause(e).getMessage().contains(errMsg), equalTo(true)); + } + + } finally { + Settings resetSettings = settingsBuilder() + .put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING, "-1") + .put(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING, HierarchyCircuitBreakerService.DEFAULT_REQUEST_BREAKER_LIMIT) + .put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING, + HierarchyCircuitBreakerService.DEFAULT_FIELDDATA_OVERHEAD_CONSTANT) + .build(); + client.admin().cluster().prepareUpdateSettings().setTransientSettings(resetSettings).execute().actionGet(); + } + } + + @Test + @TestLogging("org.elasticsearch.indices.memory.breaker:TRACE,org.elasticsearch.index.fielddata:TRACE,org.elasticsearch.common.breaker:TRACE") + public void testRequestBreaker() { + assertAcked(prepareCreate("cb-test", 1, settingsBuilder().put(SETTING_NUMBER_OF_REPLICAS, between(0, 1)))); + Client client = client(); + + try { + // Make request breaker limited to a small amount + Settings resetSettings = settingsBuilder() + .put(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING, "10b") + .build(); + client.admin().cluster().prepareUpdateSettings().setTransientSettings(resetSettings).execute().actionGet(); + + // index some different terms so we have some field data for loading + int docCount = scaledRandomIntBetween(300, 1000); + for (long id = 0; id < docCount; id++) { + client.prepareIndex("cb-test", "type", Long.toString(id)) + .setSource(MapBuilder.newMapBuilder().put("test", id).map()).execute().actionGet(); + } + refresh(); + + // A cardinality aggregation uses BigArrays and thus the REQUEST breaker + try { + client.prepareSearch("cb-test").setQuery(matchAllQuery()).addAggregation(cardinality("card").field("test")).get(); + fail("aggregation should have tripped the breaker"); + } catch (Exception e) { + String errMsg = "CircuitBreakingException[[REQUEST] Data too large, data for [] would be larger than limit of [10/10b]]"; + assertThat("Exception: " + ExceptionsHelper.unwrapCause(e) + " should contain a CircuitBreakingException", + ExceptionsHelper.unwrapCause(e).getMessage().contains(errMsg), equalTo(true)); + } + } finally { + Settings resetSettings = settingsBuilder() + .put(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING, + HierarchyCircuitBreakerService.DEFAULT_REQUEST_BREAKER_LIMIT) + .build(); + client.admin().cluster().prepareUpdateSettings().setTransientSettings(resetSettings).execute().actionGet(); + } + } + +} diff --git a/src/test/java/org/elasticsearch/indices/memory/breaker/CircuitBreakerUnitTests.java b/src/test/java/org/elasticsearch/indices/memory/breaker/CircuitBreakerUnitTests.java new file mode 100644 index 0000000000000..cd47ffef87360 --- /dev/null +++ b/src/test/java/org/elasticsearch/indices/memory/breaker/CircuitBreakerUnitTests.java @@ -0,0 +1,66 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.indices.memory.breaker; + +import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.indices.breaker.BreakerSettings; +import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService; +import org.elasticsearch.test.ElasticsearchTestCase; +import org.junit.Test; + +import static org.hamcrest.Matchers.equalTo; + +/** + * Unit tests for the circuit breaker + */ +public class CircuitBreakerUnitTests extends ElasticsearchTestCase { + + public static long pctBytes(String percentString) { + return ImmutableSettings.EMPTY.getAsMemory("", percentString).bytes(); + } + + @Test + public void testBreakerSettingsValidationWithValidSettings() { + // parent: {:limit 70}, fd: {:limit 50}, request: {:limit 20} + BreakerSettings fd = new BreakerSettings(CircuitBreaker.Name.FIELDDATA, pctBytes("50%"), 1.0); + BreakerSettings request = new BreakerSettings(CircuitBreaker.Name.REQUEST, pctBytes("20%"), 1.0); + HierarchyCircuitBreakerService.validateSettings(new BreakerSettings[]{fd, request}); + + // parent: {:limit 70}, fd: {:limit 40}, request: {:limit 30} + fd = new BreakerSettings(CircuitBreaker.Name.FIELDDATA, pctBytes("40%"), 1.0); + request = new BreakerSettings(CircuitBreaker.Name.REQUEST, pctBytes("30%"), 1.0); + HierarchyCircuitBreakerService.validateSettings(new BreakerSettings[]{fd, request}); + } + + @Test + public void testBreakerSettingsValidationNegativeOverhead() { + // parent: {:limit 70}, fd: {:limit 50}, request: {:limit 20} + BreakerSettings fd = new BreakerSettings(CircuitBreaker.Name.FIELDDATA, pctBytes("50%"), -0.1); + BreakerSettings request = new BreakerSettings(CircuitBreaker.Name.REQUEST, pctBytes("20%"), 1.0); + try { + HierarchyCircuitBreakerService.validateSettings(new BreakerSettings[]{fd, request}); + fail("settings are invalid but validate settings did not throw an exception"); + } catch (Exception e) { + assertThat("Incorrect message: " + e.getMessage(), + e.getMessage().contains("must be non-negative"), equalTo(true)); + } + } +} diff --git a/src/test/java/org/elasticsearch/indices/fielddata/breaker/RandomExceptionCircuitBreakerTests.java b/src/test/java/org/elasticsearch/indices/memory/breaker/RandomExceptionCircuitBreakerTests.java similarity index 96% rename from src/test/java/org/elasticsearch/indices/fielddata/breaker/RandomExceptionCircuitBreakerTests.java rename to src/test/java/org/elasticsearch/indices/memory/breaker/RandomExceptionCircuitBreakerTests.java index 2bd7f4ab0d244..cbf52f53da06a 100644 --- a/src/test/java/org/elasticsearch/indices/fielddata/breaker/RandomExceptionCircuitBreakerTests.java +++ b/src/test/java/org/elasticsearch/indices/memory/breaker/RandomExceptionCircuitBreakerTests.java @@ -17,7 +17,7 @@ * under the License. */ -package org.elasticsearch.indices.fielddata.breaker; +package org.elasticsearch.indices.memory.breaker; import org.apache.lucene.index.AtomicReader; import org.apache.lucene.index.DirectoryReader; @@ -29,6 +29,7 @@ import org.elasticsearch.action.search.SearchPhaseExecutionException; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.client.Requests; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; @@ -58,7 +59,7 @@ public class RandomExceptionCircuitBreakerTests extends ElasticsearchIntegration public void testBreakerWithRandomExceptions() throws IOException, InterruptedException, ExecutionException { for (NodeStats node : client().admin().cluster().prepareNodesStats() .clear().setBreaker(true).execute().actionGet().getNodes()) { - assertThat("Breaker is not set to 0", node.getBreaker().getEstimated(), equalTo(0L)); + assertThat("Breaker is not set to 0", node.getBreaker().getStats(CircuitBreaker.Name.FIELDDATA).getEstimated(), equalTo(0L)); } String mapping = XContentFactory.jsonBuilder() @@ -144,7 +145,7 @@ public void testBreakerWithRandomExceptions() throws IOException, InterruptedExc NodesStatsResponse resp = client().admin().cluster().prepareNodesStats() .clear().setBreaker(true).execute().actionGet(); for (NodeStats stats : resp.getNodes()) { - assertThat("Breaker is set to 0", stats.getBreaker().getEstimated(), equalTo(0L)); + assertThat("Breaker is set to 0", stats.getBreaker().getStats(CircuitBreaker.Name.FIELDDATA).getEstimated(), equalTo(0L)); } for (int i = 0; i < numSearches; i++) { @@ -180,7 +181,8 @@ public void testBreakerWithRandomExceptions() throws IOException, InterruptedExc NodesStatsResponse nodeStats = client().admin().cluster().prepareNodesStats() .clear().setBreaker(true).execute().actionGet(); for (NodeStats stats : nodeStats.getNodes()) { - assertThat("Breaker reset to 0 last search success: " + success + " mapping: " + mapping, stats.getBreaker().getEstimated(), equalTo(0L)); + assertThat("Breaker reset to 0 last search success: " + success + " mapping: " + mapping, + stats.getBreaker().getStats(CircuitBreaker.Name.FIELDDATA).getEstimated(), equalTo(0L)); } } } diff --git a/src/test/java/org/elasticsearch/test/TestCluster.java b/src/test/java/org/elasticsearch/test/TestCluster.java index ac51c728c935a..3dde4ad2d1eac 100644 --- a/src/test/java/org/elasticsearch/test/TestCluster.java +++ b/src/test/java/org/elasticsearch/test/TestCluster.java @@ -26,6 +26,7 @@ import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.indices.IndexMissingException; @@ -194,8 +195,10 @@ public void ensureEstimatedStats() { NodesStatsResponse nodeStats = client().admin().cluster().prepareNodesStats() .clear().setBreaker(true).execute().actionGet(); for (NodeStats stats : nodeStats.getNodes()) { - assertThat("Breaker not reset to 0 on node: " + stats.getNode(), - stats.getBreaker().getEstimated(), equalTo(0L)); + assertThat("Fielddata breaker not reset to 0 on node: " + stats.getNode(), + stats.getBreaker().getStats(CircuitBreaker.Name.FIELDDATA).getEstimated(), equalTo(0L)); + assertThat("Request breaker not reset to 0 on node: " + stats.getNode(), + stats.getBreaker().getStats(CircuitBreaker.Name.REQUEST).getEstimated(), equalTo(0L)); } } } diff --git a/src/test/java/org/elasticsearch/test/TestSearchContext.java b/src/test/java/org/elasticsearch/test/TestSearchContext.java index 360b4f41ee24b..e4599610dcc3c 100644 --- a/src/test/java/org/elasticsearch/test/TestSearchContext.java +++ b/src/test/java/org/elasticsearch/test/TestSearchContext.java @@ -81,7 +81,7 @@ public class TestSearchContext extends SearchContext { public TestSearchContext(ThreadPool threadPool, CacheRecycler cacheRecycler, PageCacheRecycler pageCacheRecycler, BigArrays bigArrays, IndexService indexService, FilterCache filterCache, IndexFieldDataService indexFieldDataService) { this.cacheRecycler = cacheRecycler; this.pageCacheRecycler = pageCacheRecycler; - this.bigArrays = bigArrays; + this.bigArrays = bigArrays.withCircuitBreaking(); this.indexService = indexService; this.filterCache = filterCache; this.indexFieldDataService = indexFieldDataService; diff --git a/src/test/java/org/elasticsearch/test/cache/recycler/MockBigArrays.java b/src/test/java/org/elasticsearch/test/cache/recycler/MockBigArrays.java index 0bd19b0d0ab0c..44a85c86b391f 100644 --- a/src/test/java/org/elasticsearch/test/cache/recycler/MockBigArrays.java +++ b/src/test/java/org/elasticsearch/test/cache/recycler/MockBigArrays.java @@ -29,6 +29,7 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.*; +import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.test.ElasticsearchTestCase; import java.util.*; @@ -68,43 +69,23 @@ public boolean apply(Object input) { } } } - // arrays that are not fully released. - ArrayList badBigArrays = new ArrayList<>(); - synchronized (INSTANCES) { - for (final BigArrays bigArrays : INSTANCES) { - // BigArrays are used on the network layer and the cluster is shared across tests so nodes might still be talking to - // each other a bit after the test finished, wait a bit for things to stabilize if so - final boolean sizeIsZero = ElasticsearchTestCase.awaitBusy(new Predicate() { - @Override - public boolean apply(Object input) { - return bigArrays.sizeInBytes() == 0; - } - }); - if (!sizeIsZero) { - final long sizeInBytes = bigArrays.sizeInBytes(); - if (sizeInBytes != 0) { - badBigArrays.add(bigArrays); - } - } - } - } - - if (!badBigArrays.isEmpty()) { - INSTANCES.removeAll(badBigArrays); - StringBuilder msg = new StringBuilder("Found [").append(badBigArrays.size()).append("] big arrays which were not fully released. Here is a list of the first 20:"); - for (int i = 0; i < Math.min(20, badBigArrays.size()); i++) { - msg.append("\nbigArray instance with [").append(badBigArrays.get(i).sizeInBytes()).append("] bytes"); - } - throw new AssertionError(msg.toString()); - } - } private final Random random; + private final Settings settings; + private final PageCacheRecycler recycler; + private final CircuitBreakerService breakerService; @Inject - public MockBigArrays(Settings settings, PageCacheRecycler recycler) { - super(settings, recycler); + public MockBigArrays(Settings settings, PageCacheRecycler recycler, CircuitBreakerService breakerService) { + this(settings, recycler, breakerService, false); + } + + public MockBigArrays(Settings settings, PageCacheRecycler recycler, CircuitBreakerService breakerService, boolean checkBreaker) { + super(settings, recycler, breakerService, checkBreaker); + this.settings = settings; + this.recycler = recycler; + this.breakerService = breakerService; long seed; try { seed = SeedUtils.parseSeed(RandomizedContext.current().getRunnerSeedAsString()); @@ -115,6 +96,12 @@ public MockBigArrays(Settings settings, PageCacheRecycler recycler) { INSTANCES.add(this); } + + @Override + public BigArrays withCircuitBreaking() { + return new MockBigArrays(this.settings, this.recycler, this.breakerService, true); + } + @Override public ByteArray newByteArray(long size, boolean clearOnResize) { final ByteArrayWrapper array = new ByteArrayWrapper(super.newByteArray(size, clearOnResize), clearOnResize);