diff --git a/docs/reference/index-modules/fielddata.asciidoc b/docs/reference/index-modules/fielddata.asciidoc
index 2afd89ed19c11..d36fe47d837d2 100644
--- a/docs/reference/index-modules/fielddata.asciidoc
+++ b/docs/reference/index-modules/fielddata.asciidoc
@@ -24,28 +24,63 @@ field data after a certain time of inactivity. Defaults to `-1`. For
example, can be set to `5m` for a 5 minute expiry.
|=======================================================================
+[float]
+[[circuit-breaker]]
+=== Circuit Breaker
+
+coming[1.4.0,Prior to 1.4.0 there was only a single circuit breaker for fielddata]
+
+Elasticsearch contains multiple circuit breakers used to prevent operations from
+causing an OutOfMemoryError. Each breaker specifies a limit for how much memory
+it can use. Additionally, there is a parent-level breaker that specifies the
+total amount of memory that can be used across all breakers.
+
+The parent-level breaker can be configured with the following setting:
+
+`indices.breaker.total.limit`::
+ Starting limit for overall parent breaker, defaults to 70% of JVM heap
+
+All circuit breaker settings can be changed dynamically using the cluster update
+settings API.
+
[float]
[[fielddata-circuit-breaker]]
-=== Field data circuit breaker
+==== Field data circuit breaker
The field data circuit breaker allows Elasticsearch to estimate the amount of
memory a field will required to be loaded into memory. It can then prevent the
field data loading by raising an exception. By default the limit is configured
to 60% of the maximum JVM heap. It can be configured with the following
parameters:
-[cols="<,<",options="header",]
-|=======================================================================
-|Setting |Description
-|`indices.fielddata.breaker.limit` |Maximum size of estimated field data
-to allow loading. Defaults to 60% of the maximum JVM heap.
-|`indices.fielddata.breaker.overhead` |A constant that all field data
-estimations are multiplied with to determine a final estimation. Defaults to
-1.03
-|=======================================================================
+`indices.breaker.fielddata.limit`::
+ Limit for fielddata breaker, defaults to 60% of JVM heap
+
+`indices.breaker.fielddata.overhead`::
+ A constant that all field data estimations are multiplied with to determine a
+ final estimation. Defaults to 1.03
+
+`indices.fielddata.breaker.limit`::
+ deprecated[1.4.0,Replaced by `indices.breaker.fielddata.limit`]
+
+`indices.fielddata.breaker.overhead`::
+ deprecated[1.4.0,Replaced by `indices.breaker.fielddata.overhead`]
+
+[float]
+[[request-circuit-breaker]]
+==== Request circuit breaker
+
+coming[1.4.0]
+
+The request circuit breaker allows Elasticsearch to prevent per-request data
+structures (for example, memory used for calculating aggregations during a
+request) from exceeding a certain amount of memory.
+
+`indices.breaker.request.limit`::
+ Limit for request breaker, defaults to 40% of JVM heap
-Both the `indices.fielddata.breaker.limit` and
-`indices.fielddata.breaker.overhead` can be changed dynamically using the
-cluster update settings API.
+`indices.breaker.request.overhead`::
+ A constant that all request estimations are multiplied with to determine a
+ final estimation. Defaults to 1
[float]
[[fielddata-monitoring]]
diff --git a/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStats.java b/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStats.java
index 0910628188350..7ee06cb7002b5 100644
--- a/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStats.java
+++ b/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStats.java
@@ -28,7 +28,7 @@
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.http.HttpStats;
import org.elasticsearch.indices.NodeIndicesStats;
-import org.elasticsearch.indices.fielddata.breaker.FieldDataBreakerStats;
+import org.elasticsearch.indices.breaker.AllCircuitBreakerStats;
import org.elasticsearch.monitor.fs.FsStats;
import org.elasticsearch.monitor.jvm.JvmStats;
import org.elasticsearch.monitor.network.NetworkStats;
@@ -75,7 +75,7 @@ public class NodeStats extends NodeOperationResponse implements ToXContent {
private HttpStats http;
@Nullable
- private FieldDataBreakerStats breaker;
+ private AllCircuitBreakerStats breaker;
NodeStats() {
}
@@ -83,7 +83,7 @@ public class NodeStats extends NodeOperationResponse implements ToXContent {
public NodeStats(DiscoveryNode node, long timestamp, @Nullable NodeIndicesStats indices,
@Nullable OsStats os, @Nullable ProcessStats process, @Nullable JvmStats jvm, @Nullable ThreadPoolStats threadPool,
@Nullable NetworkStats network, @Nullable FsStats fs, @Nullable TransportStats transport, @Nullable HttpStats http,
- @Nullable FieldDataBreakerStats breaker) {
+ @Nullable AllCircuitBreakerStats breaker) {
super(node);
this.timestamp = timestamp;
this.indices = indices;
@@ -174,7 +174,7 @@ public HttpStats getHttp() {
}
@Nullable
- public FieldDataBreakerStats getBreaker() {
+ public AllCircuitBreakerStats getBreaker() {
return this.breaker;
}
@@ -215,7 +215,7 @@ public void readFrom(StreamInput in) throws IOException {
if (in.readBoolean()) {
http = HttpStats.readHttpStats(in);
}
- breaker = FieldDataBreakerStats.readOptionalCircuitBreakerStats(in);
+ breaker = AllCircuitBreakerStats.readOptionalAllCircuitBreakerStats(in);
}
@Override
diff --git a/src/main/java/org/elasticsearch/client/transport/TransportClient.java b/src/main/java/org/elasticsearch/client/transport/TransportClient.java
index f3957f8b3b3b8..3201761e54aff 100644
--- a/src/main/java/org/elasticsearch/client/transport/TransportClient.java
+++ b/src/main/java/org/elasticsearch/client/transport/TransportClient.java
@@ -76,6 +76,7 @@
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.EnvironmentModule;
+import org.elasticsearch.indices.breaker.CircuitBreakerModule;
import org.elasticsearch.monitor.MonitorService;
import org.elasticsearch.node.internal.InternalSettingsPreparer;
import org.elasticsearch.plugins.PluginsModule;
@@ -187,6 +188,7 @@ public TransportClient(Settings pSettings, boolean loadConfigSettings) throws El
modules.add(new TransportModule(this.settings));
modules.add(new ActionModule(true));
modules.add(new ClientTransportModule());
+ modules.add(new CircuitBreakerModule(this.settings));
injector = modules.createInjector();
diff --git a/src/main/java/org/elasticsearch/cluster/settings/ClusterDynamicSettingsModule.java b/src/main/java/org/elasticsearch/cluster/settings/ClusterDynamicSettingsModule.java
index 6a442a508e0d3..ab8a48bac1d2c 100644
--- a/src/main/java/org/elasticsearch/cluster/settings/ClusterDynamicSettingsModule.java
+++ b/src/main/java/org/elasticsearch/cluster/settings/ClusterDynamicSettingsModule.java
@@ -28,8 +28,9 @@
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.discovery.zen.elect.ElectMasterService;
+import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
import org.elasticsearch.indices.cache.filter.IndicesFilterCache;
-import org.elasticsearch.indices.fielddata.breaker.InternalCircuitBreakerService;
+import org.elasticsearch.indices.breaker.InternalCircuitBreakerService;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.indices.store.IndicesStore;
import org.elasticsearch.indices.ttl.IndicesTTLService;
@@ -85,6 +86,11 @@ public ClusterDynamicSettingsModule() {
clusterDynamicSettings.addDynamicSetting(InternalCircuitBreakerService.CIRCUIT_BREAKER_OVERHEAD_SETTING, Validator.NON_NEGATIVE_DOUBLE);
clusterDynamicSettings.addDynamicSetting(DestructiveOperations.REQUIRES_NAME);
clusterDynamicSettings.addDynamicSetting(DiscoverySettings.PUBLISH_TIMEOUT, Validator.TIME_NON_NEGATIVE);
+ clusterDynamicSettings.addDynamicSetting(HierarchyCircuitBreakerService.TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING, Validator.MEMORY_SIZE);
+ clusterDynamicSettings.addDynamicSetting(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING, Validator.MEMORY_SIZE);
+ clusterDynamicSettings.addDynamicSetting(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING, Validator.NON_NEGATIVE_DOUBLE);
+ clusterDynamicSettings.addDynamicSetting(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING, Validator.MEMORY_SIZE);
+ clusterDynamicSettings.addDynamicSetting(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_OVERHEAD_SETTING, Validator.NON_NEGATIVE_DOUBLE);
}
public void addDynamicSettings(String... settings) {
diff --git a/src/main/java/org/elasticsearch/common/breaker/ChildMemoryCircuitBreaker.java b/src/main/java/org/elasticsearch/common/breaker/ChildMemoryCircuitBreaker.java
new file mode 100644
index 0000000000000..1a55e4c3c74f7
--- /dev/null
+++ b/src/main/java/org/elasticsearch/common/breaker/ChildMemoryCircuitBreaker.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.common.breaker;
+
+import org.elasticsearch.common.logging.ESLogger;
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.indices.breaker.BreakerSettings;
+import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Breaker that will check a parent's when incrementing
+ */
+public class ChildMemoryCircuitBreaker implements CircuitBreaker {
+
+ private final long memoryBytesLimit;
+ private final BreakerSettings settings;
+ private final double overheadConstant;
+ private final AtomicLong used;
+ private final AtomicLong trippedCount;
+ private final ESLogger logger;
+ private final HierarchyCircuitBreakerService parent;
+ private final Name name;
+
+ /**
+ * Create a circuit breaker that will break if the number of estimated
+ * bytes grows above the limit. All estimations will be multiplied by
+ * the given overheadConstant. This breaker starts with 0 bytes used.
+ * @param settings settings to configure this breaker
+ * @param parent parent circuit breaker service to delegate tripped breakers to
+ * @param name the name of the breaker
+ */
+ public ChildMemoryCircuitBreaker(BreakerSettings settings, ESLogger logger,
+ HierarchyCircuitBreakerService parent, Name name) {
+ this(settings, null, logger, parent, name);
+ }
+
+ /**
+ * Create a circuit breaker that will break if the number of estimated
+ * bytes grows above the limit. All estimations will be multiplied by
+ * the given overheadConstant. Uses the given oldBreaker to initialize
+ * the starting offset.
+ * @param settings settings to configure this breaker
+ * @param parent parent circuit breaker service to delegate tripped breakers to
+ * @param name the name of the breaker
+ * @param oldBreaker the previous circuit breaker to inherit the used value from (starting offset)
+ */
+ public ChildMemoryCircuitBreaker(BreakerSettings settings, ChildMemoryCircuitBreaker oldBreaker,
+ ESLogger logger, HierarchyCircuitBreakerService parent, Name name) {
+ this.name = name;
+ this.settings = settings;
+ this.memoryBytesLimit = settings.getLimit();
+ this.overheadConstant = settings.getOverhead();
+ if (oldBreaker == null) {
+ this.used = new AtomicLong(0);
+ this.trippedCount = new AtomicLong(0);
+ } else {
+ this.used = oldBreaker.used;
+ this.trippedCount = oldBreaker.trippedCount;
+ }
+ this.logger = logger;
+ if (logger.isTraceEnabled()) {
+ logger.trace("creating ChildCircuitBreaker with settings {}", this.settings);
+ }
+ this.parent = parent;
+ }
+
+ /**
+ * Method used to trip the breaker, delegates to the parent to determine
+ * whether to trip the breaker or not
+ */
+ @Override
+ public void circuitBreak(String fieldName, long bytesNeeded) {
+ this.trippedCount.incrementAndGet();
+ throw new CircuitBreakingException("[" + this.name + "] Data too large, data for [" +
+ fieldName + "] would be larger than limit of [" +
+ memoryBytesLimit + "/" + new ByteSizeValue(memoryBytesLimit) + "]",
+ bytesNeeded, this.memoryBytesLimit);
+ }
+
+ /**
+ * Add a number of bytes, tripping the circuit breaker if the aggregated
+ * estimates are above the limit. Automatically trips the breaker if the
+ * memory limit is set to 0. Will never trip the breaker if the limit is
+ * set < 0, but can still be used to aggregate estimations.
+ * @param bytes number of bytes to add to the breaker
+ * @return number of "used" bytes so far
+ * @throws CircuitBreakingException
+ */
+ @Override
+ public double addEstimateBytesAndMaybeBreak(long bytes, String label) throws CircuitBreakingException {
+ // short-circuit on no data allowed, immediately throwing an exception
+ if (memoryBytesLimit == 0) {
+ circuitBreak(label, bytes);
+ }
+
+ long newUsed;
+ // If there is no limit (-1), we can optimize a bit by using
+ // .addAndGet() instead of looping (because we don't have to check a
+ // limit), which makes the RamAccountingTermsEnum case faster.
+ if (this.memoryBytesLimit == -1) {
+ newUsed = this.used.addAndGet(bytes);
+ if (logger.isTraceEnabled()) {
+ logger.trace("[{}] Adding [{}][{}] to used bytes [new used: [{}], limit: [-1b]]",
+ this.name, new ByteSizeValue(bytes), label, new ByteSizeValue(newUsed));
+ }
+ } else {
+ // Otherwise, check the addition and commit the addition, looping if
+ // there are conflicts. May result in additional logging, but it's
+ // trace logging and shouldn't be counted on for additions.
+ long currentUsed;
+ do {
+ currentUsed = this.used.get();
+ newUsed = currentUsed + bytes;
+ long newUsedWithOverhead = (long) (newUsed * overheadConstant);
+ if (logger.isTraceEnabled()) {
+ logger.trace("[{}] Adding [{}][{}] to used bytes [new used: [{}], limit: {} [{}], estimate: {} [{}]]",
+ this.name,
+ new ByteSizeValue(bytes), label, new ByteSizeValue(newUsed),
+ memoryBytesLimit, new ByteSizeValue(memoryBytesLimit),
+ newUsedWithOverhead, new ByteSizeValue(newUsedWithOverhead));
+ }
+ if (memoryBytesLimit > 0 && newUsedWithOverhead > memoryBytesLimit) {
+ logger.error("[{}] New used memory {} [{}] from field [{}] would be larger than configured breaker: {} [{}], breaking",
+ this.name,
+ newUsedWithOverhead, new ByteSizeValue(newUsedWithOverhead), label,
+ memoryBytesLimit, new ByteSizeValue(memoryBytesLimit));
+ circuitBreak(label, newUsedWithOverhead);
+ }
+ // Attempt to set the new used value, but make sure it hasn't changed
+ // underneath us, if it has, keep trying until we are able to set it
+ } while (!this.used.compareAndSet(currentUsed, newUsed));
+ }
+
+ // Additionally, we need to check that we haven't exceeded the parent's limit
+ try {
+ parent.checkParentLimit(label);
+ } catch (CircuitBreakingException e) {
+ // If the parent breaker is tripped, this breaker has to be
+ // adjusted back down because the allocation is "blocked" but the
+ // breaker has already been incremented
+ this.used.addAndGet(-bytes);
+ throw e;
+ }
+ return newUsed;
+ }
+
+ /**
+ * Add an exact number of bytes, not checking for tripping the
+ * circuit breaker. This bypasses the overheadConstant multiplication.
+ *
+ * Also does not check with the parent breaker to see if the parent limit
+ * has been exceeded.
+ *
+ * @param bytes number of bytes to add to the breaker
+ * @return number of "used" bytes so far
+ */
+ @Override
+ public long addWithoutBreaking(long bytes) {
+ long u = used.addAndGet(bytes);
+ if (logger.isTraceEnabled()) {
+ logger.trace("[{}] Adjusted breaker by [{}] bytes, now [{}]", this.name, bytes, u);
+ }
+ assert u >= 0 : "Used bytes: [" + u + "] must be >= 0";
+ return u;
+ }
+
+ /**
+ * @return the number of aggregated "used" bytes so far
+ */
+ @Override
+ public long getUsed() {
+ return this.used.get();
+ }
+
+ /**
+ * @return the number of bytes that can be added before the breaker trips
+ */
+ @Override
+ public long getLimit() {
+ return this.memoryBytesLimit;
+ }
+
+ /**
+ * @return the constant multiplier the breaker uses for aggregations
+ */
+ @Override
+ public double getOverhead() {
+ return this.overheadConstant;
+ }
+
+ /**
+ * @return the number of times the breaker has been tripped
+ */
+ @Override
+ public long getTrippedCount() {
+ return this.trippedCount.get();
+ }
+
+ /**
+ * @return the name of the breaker
+ */
+ public Name getName() {
+ return this.name;
+ }
+}
diff --git a/src/main/java/org/elasticsearch/common/breaker/CircuitBreaker.java b/src/main/java/org/elasticsearch/common/breaker/CircuitBreaker.java
new file mode 100644
index 0000000000000..541f6888a15dd
--- /dev/null
+++ b/src/main/java/org/elasticsearch/common/breaker/CircuitBreaker.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.common.breaker;
+
+import org.elasticsearch.ElasticsearchIllegalArgumentException;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+
+import java.io.IOException;
+
+/**
+ * Interface for an object that can be incremented, breaking after some
+ * configured limit has been reached.
+ */
+public interface CircuitBreaker {
+
+ /**
+ * Enum used for specifying different types of circuit breakers
+ */
+ public static enum Name {
+ PARENT(0),
+ FIELDDATA(1),
+ REQUEST(2);
+
+ private int ordinal;
+
+ Name(int ordinal) {
+ this.ordinal = ordinal;
+ }
+
+ public int getSerializableValue() {
+ return this.ordinal;
+ }
+
+ public static Name readFrom(StreamInput in) throws IOException {
+ int value = in.readVInt();
+ switch (value) {
+ case 0:
+ return Name.PARENT;
+ case 1:
+ return Name.FIELDDATA;
+ case 2:
+ return Name.REQUEST;
+ default:
+ throw new ElasticsearchIllegalArgumentException("No CircuitBreaker with ordinal: " + value);
+ }
+ }
+
+ public static void writeTo(Name name, StreamOutput out) throws IOException {
+ out.writeVInt(name.getSerializableValue());
+ }
+ }
+
+ /**
+ * Trip the circuit breaker
+ * @param fieldName name of the field responsible for tripping the breaker
+ * @param bytesNeeded bytes asked for but unable to be allocated
+ */
+ public void circuitBreak(String fieldName, long bytesNeeded);
+
+ /**
+ * add bytes to the breaker and maybe trip
+ * @param bytes number of bytes to add
+ * @param label string label describing the bytes being added
+ * @return the number of "used" bytes for the circuit breaker
+ * @throws CircuitBreakingException
+ */
+ public double addEstimateBytesAndMaybeBreak(long bytes, String label) throws CircuitBreakingException;
+
+ /**
+ * Adjust the circuit breaker without tripping
+ */
+ public long addWithoutBreaking(long bytes);
+
+ /**
+ * @return the currently used bytes the breaker is tracking
+ */
+ public long getUsed();
+
+ /**
+ * @return maximum number of bytes the circuit breaker can track before tripping
+ */
+ public long getLimit();
+
+ /**
+ * @return overhead of circuit breaker
+ */
+ public double getOverhead();
+
+ /**
+ * @return the number of times the circuit breaker has been tripped
+ */
+ public long getTrippedCount();
+
+ /**
+ * @return the name of the breaker
+ */
+ public Name getName();
+}
diff --git a/src/main/java/org/elasticsearch/common/breaker/CircuitBreakingException.java b/src/main/java/org/elasticsearch/common/breaker/CircuitBreakingException.java
index c2c2bf26d10f1..0b88a9dad7c93 100644
--- a/src/main/java/org/elasticsearch/common/breaker/CircuitBreakingException.java
+++ b/src/main/java/org/elasticsearch/common/breaker/CircuitBreakingException.java
@@ -25,8 +25,26 @@
*/
public class CircuitBreakingException extends ElasticsearchException {
- // TODO: maybe add more neat metrics here?
+ private final long bytesWanted;
+ private final long byteLimit;
+
public CircuitBreakingException(String message) {
super(message);
+ this.bytesWanted = 0;
+ this.byteLimit = 0;
+ }
+
+ public CircuitBreakingException(String message, long bytesWanted, long byteLimit) {
+ super(message);
+ this.bytesWanted = bytesWanted;
+ this.byteLimit = byteLimit;
+ }
+
+ public long getBytesWanted() {
+ return this.bytesWanted;
+ }
+
+ public long getByteLimit() {
+ return this.byteLimit;
}
}
diff --git a/src/main/java/org/elasticsearch/common/breaker/MemoryCircuitBreaker.java b/src/main/java/org/elasticsearch/common/breaker/MemoryCircuitBreaker.java
index 9e157c31d21ec..e22753a326a0f 100644
--- a/src/main/java/org/elasticsearch/common/breaker/MemoryCircuitBreaker.java
+++ b/src/main/java/org/elasticsearch/common/breaker/MemoryCircuitBreaker.java
@@ -27,7 +27,7 @@
* MemoryCircuitBreaker is a circuit breaker that breaks once a
* configurable memory limit has been reached.
*/
-public class MemoryCircuitBreaker {
+public class MemoryCircuitBreaker implements CircuitBreaker {
private final long memoryBytesLimit;
private final double overheadConstant;
@@ -77,7 +77,7 @@ public MemoryCircuitBreaker(ByteSizeValue limit, double overheadConstant, Memory
* Method used to trip the breaker
* @throws CircuitBreakingException
*/
- public void circuitBreak(String fieldName) throws CircuitBreakingException {
+ public void circuitBreak(String fieldName, long bytesNeeded) throws CircuitBreakingException {
this.trippedCount.incrementAndGet();
throw new CircuitBreakingException("Data too large, data for field [" + fieldName + "] would be larger than limit of [" +
memoryBytesLimit + "/" + new ByteSizeValue(memoryBytesLimit) + "]");
@@ -92,10 +92,10 @@ public void circuitBreak(String fieldName) throws CircuitBreakingException {
* @return number of "used" bytes so far
* @throws CircuitBreakingException
*/
- public double addEstimateBytesAndMaybeBreak(long bytes, String fieldName) throws CircuitBreakingException {
+ public double addEstimateBytesAndMaybeBreak(long bytes, String label) throws CircuitBreakingException {
// short-circuit on no data allowed, immediately throwing an exception
if (memoryBytesLimit == 0) {
- circuitBreak(fieldName);
+ circuitBreak(label, bytes);
}
long newUsed;
@@ -106,7 +106,7 @@ public double addEstimateBytesAndMaybeBreak(long bytes, String fieldName) throws
newUsed = this.used.addAndGet(bytes);
if (logger.isTraceEnabled()) {
logger.trace("Adding [{}][{}] to used bytes [new used: [{}], limit: [-1b]]",
- new ByteSizeValue(bytes), fieldName, new ByteSizeValue(newUsed));
+ new ByteSizeValue(bytes), label, new ByteSizeValue(newUsed));
}
return newUsed;
}
@@ -121,15 +121,15 @@ public double addEstimateBytesAndMaybeBreak(long bytes, String fieldName) throws
long newUsedWithOverhead = (long)(newUsed * overheadConstant);
if (logger.isTraceEnabled()) {
logger.trace("Adding [{}][{}] to used bytes [new used: [{}], limit: {} [{}], estimate: {} [{}]]",
- new ByteSizeValue(bytes), fieldName, new ByteSizeValue(newUsed),
+ new ByteSizeValue(bytes), label, new ByteSizeValue(newUsed),
memoryBytesLimit, new ByteSizeValue(memoryBytesLimit),
newUsedWithOverhead, new ByteSizeValue(newUsedWithOverhead));
}
if (memoryBytesLimit > 0 && newUsedWithOverhead > memoryBytesLimit) {
logger.error("New used memory {} [{}] from field [{}] would be larger than configured breaker: {} [{}], breaking",
- newUsedWithOverhead, new ByteSizeValue(newUsedWithOverhead), fieldName,
+ newUsedWithOverhead, new ByteSizeValue(newUsedWithOverhead), label,
memoryBytesLimit, new ByteSizeValue(memoryBytesLimit));
- circuitBreak(fieldName);
+ circuitBreak(label, newUsedWithOverhead);
}
// Attempt to set the new used value, but make sure it hasn't changed
// underneath us, if it has, keep trying until we are able to set it
@@ -161,9 +161,9 @@ public long getUsed() {
}
/**
- * @return the maximum number of bytes before the circuit breaker will trip
+ * @return the number of bytes that can be added before the breaker trips
*/
- public long getMaximum() {
+ public long getLimit() {
return this.memoryBytesLimit;
}
@@ -180,4 +180,11 @@ public double getOverhead() {
public long getTrippedCount() {
return this.trippedCount.get();
}
+
+ /**
+ * @return the name of the breaker
+ */
+ public Name getName() {
+ return Name.FIELDDATA;
+ }
}
diff --git a/src/main/java/org/elasticsearch/common/unit/MemorySizeValue.java b/src/main/java/org/elasticsearch/common/unit/MemorySizeValue.java
index 81a293459f29b..2204fe8cac54e 100644
--- a/src/main/java/org/elasticsearch/common/unit/MemorySizeValue.java
+++ b/src/main/java/org/elasticsearch/common/unit/MemorySizeValue.java
@@ -32,7 +32,7 @@ public enum MemorySizeValue {
* 42 (default assumed unit is byte) or 2mb, or percentages of the heap size: if
* the heap is 1G, 10% will be parsed as 100mb. */
public static ByteSizeValue parseBytesSizeValueOrHeapRatio(String sValue) {
- if (sValue.endsWith("%")) {
+ if (sValue != null && sValue.endsWith("%")) {
final String percentAsString = sValue.substring(0, sValue.length() - 1);
try {
final double percent = Double.parseDouble(percentAsString);
diff --git a/src/main/java/org/elasticsearch/common/util/AbstractArray.java b/src/main/java/org/elasticsearch/common/util/AbstractArray.java
index 7de3c1c3b7448..0c00a897333a5 100644
--- a/src/main/java/org/elasticsearch/common/util/AbstractArray.java
+++ b/src/main/java/org/elasticsearch/common/util/AbstractArray.java
@@ -33,7 +33,7 @@ abstract class AbstractArray implements BigArray {
@Override
public final void close() {
- bigArrays.ramBytesUsed.addAndGet(-ramBytesUsed());
+ bigArrays.adjustBreaker(-ramBytesUsed());
assert !released : "double release";
released = true;
doClose();
diff --git a/src/main/java/org/elasticsearch/common/util/BigArrays.java b/src/main/java/org/elasticsearch/common/util/BigArrays.java
index 5f362368e5ec3..a2f216361c419 100644
--- a/src/main/java/org/elasticsearch/common/util/BigArrays.java
+++ b/src/main/java/org/elasticsearch/common/util/BigArrays.java
@@ -23,8 +23,10 @@
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.RamUsageEstimator;
-import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.cache.recycler.PageCacheRecycler;
+import org.elasticsearch.common.Nullable;
+import org.elasticsearch.common.breaker.CircuitBreaker;
+import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lease.Releasable;
@@ -32,6 +34,7 @@
import org.elasticsearch.common.recycler.Recycler;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.indices.breaker.CircuitBreakerService;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicLong;
@@ -39,9 +42,8 @@
/** Utility class to work with arrays. */
public class BigArrays extends AbstractComponent {
- // TODO: switch to a circuit breaker that is shared not only on big arrays level, and applies to other request level data structures
public static final String MAX_SIZE_IN_BYTES_SETTING = "requests.memory.breaker.limit";
- public static final BigArrays NON_RECYCLING_INSTANCE = new BigArrays(ImmutableSettings.EMPTY, null, Long.MAX_VALUE);
+ public static final BigArrays NON_RECYCLING_INSTANCE = new BigArrays(ImmutableSettings.EMPTY, null, null);
/** Page size in bytes: 16KB */
public static final int PAGE_SIZE_IN_BYTES = 1 << 14;
@@ -363,39 +365,73 @@ public T set(long index, T value) {
}
final PageCacheRecycler recycler;
- final AtomicLong ramBytesUsed;
- final long maxSizeInBytes;
+ final CircuitBreakerService breakerService;
+ final boolean checkBreaker;
@Inject
- public BigArrays(Settings settings, PageCacheRecycler recycler) {
- this(settings, recycler, settings.getAsMemory(MAX_SIZE_IN_BYTES_SETTING, Long.toString(Long.MAX_VALUE)).bytes());
+ public BigArrays(Settings settings, PageCacheRecycler recycler, @Nullable final CircuitBreakerService breakerService) {
+ // Checking the breaker is disabled if not specified
+ this(settings, recycler, breakerService, false);
}
- private BigArrays(Settings settings, PageCacheRecycler recycler, final long maxSizeInBytes) {
+ public BigArrays(Settings settings, PageCacheRecycler recycler, @Nullable final CircuitBreakerService breakerService, boolean checkBreaker) {
super(settings);
- this.maxSizeInBytes = maxSizeInBytes;
+ this.checkBreaker = checkBreaker;
this.recycler = recycler;
- ramBytesUsed = new AtomicLong();
+ this.breakerService = breakerService;
}
- private void validate(long delta) {
- final long totalSizeInBytes = ramBytesUsed.addAndGet(delta);
- if (totalSizeInBytes > maxSizeInBytes) {
- throw new ElasticsearchIllegalStateException("Maximum number of bytes allocated exceeded: [" + totalSizeInBytes + "] (> " + maxSizeInBytes + ")");
+ /**
+ * Adjust the circuit breaker with the given delta, if the delta is
+ * negative, or checkBreaker is false, the breaker will be adjusted
+ * without tripping
+ */
+ void adjustBreaker(long delta) {
+ if (this.breakerService != null) {
+ CircuitBreaker breaker = this.breakerService.getBreaker(CircuitBreaker.Name.REQUEST);
+ if (this.checkBreaker == true) {
+ // checking breaker means potentially tripping, but it doesn't
+ // have to if the delta is negative
+ if (delta > 0) {
+ try {
+ breaker.addEstimateBytesAndMaybeBreak(delta, "");
+ } catch (CircuitBreakingException e) {
+ // since we've already created the data, we need to
+ // add it so closing the stream re-adjusts properly
+ breaker.addWithoutBreaking(delta);
+ // re-throw the original exception
+ throw e;
+ }
+ } else {
+ breaker.addWithoutBreaking(delta);
+ }
+ } else {
+ // even if we are not checking the breaker, we need to adjust
+ // its' totals, so add without breaking
+ breaker.addWithoutBreaking(delta);
+ }
}
}
+ /**
+ * Return a new instance of this BigArrays class with circuit breaking
+ * explicitly enabled, instead of only accounting enabled
+ */
+ public BigArrays withCircuitBreaking() {
+ return new BigArrays(this.settings, this.recycler, this.breakerService, true);
+ }
+
private T resizeInPlace(T array, long newSize) {
final long oldMemSize = array.ramBytesUsed();
array.resize(newSize);
- validate(array.ramBytesUsed() - oldMemSize);
+ adjustBreaker(array.ramBytesUsed() - oldMemSize);
return array;
}
private T validate(T array) {
boolean success = false;
try {
- validate(array.ramBytesUsed());
+ adjustBreaker(array.ramBytesUsed());
success = true;
} finally {
if (!success) {
@@ -720,11 +756,4 @@ public ObjectArray grow(ObjectArray array, long minSize) {
final long newSize = overSize(minSize, OBJECT_PAGE_SIZE, RamUsageEstimator.NUM_BYTES_OBJECT_REF);
return resize(array, newSize);
}
-
- /**
- * Return an approximate number of bytes that have been allocated but not released yet.
- */
- public long sizeInBytes() {
- return ramBytesUsed.get();
- }
}
diff --git a/src/main/java/org/elasticsearch/index/fielddata/IndexFieldData.java b/src/main/java/org/elasticsearch/index/fielddata/IndexFieldData.java
index 62faf5abce1e1..d8b925f735443 100644
--- a/src/main/java/org/elasticsearch/index/fielddata/IndexFieldData.java
+++ b/src/main/java/org/elasticsearch/index/fielddata/IndexFieldData.java
@@ -33,7 +33,7 @@
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.settings.IndexSettings;
-import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService;
+import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.search.MultiValueMode;
import java.io.IOException;
diff --git a/src/main/java/org/elasticsearch/index/fielddata/IndexFieldDataService.java b/src/main/java/org/elasticsearch/index/fielddata/IndexFieldDataService.java
index 7e7ba5158b0bd..105d1676466cd 100644
--- a/src/main/java/org/elasticsearch/index/fielddata/IndexFieldDataService.java
+++ b/src/main/java/org/elasticsearch/index/fielddata/IndexFieldDataService.java
@@ -38,11 +38,11 @@
import org.elasticsearch.index.mapper.internal.ParentFieldMapper;
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.settings.IndexSettings;
-import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService;
-import org.elasticsearch.indices.fielddata.breaker.NoneCircuitBreakerService;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCacheListener;
import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.indices.breaker.CircuitBreakerService;
+import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import java.util.ArrayList;
import java.util.Collection;
diff --git a/src/main/java/org/elasticsearch/index/fielddata/RamAccountingTermsEnum.java b/src/main/java/org/elasticsearch/index/fielddata/RamAccountingTermsEnum.java
index 01a91879bd7f3..15aa961294c12 100644
--- a/src/main/java/org/elasticsearch/index/fielddata/RamAccountingTermsEnum.java
+++ b/src/main/java/org/elasticsearch/index/fielddata/RamAccountingTermsEnum.java
@@ -21,8 +21,8 @@
import org.apache.lucene.index.FilteredTermsEnum;
import org.apache.lucene.index.TermsEnum;
import org.apache.lucene.util.BytesRef;
-import org.elasticsearch.common.breaker.MemoryCircuitBreaker;
import org.elasticsearch.index.fielddata.plain.AbstractIndexFieldData;
+import org.elasticsearch.common.breaker.CircuitBreaker;
import java.io.IOException;
@@ -36,7 +36,7 @@ public final class RamAccountingTermsEnum extends FilteredTermsEnum {
// Flush every 5mb
private static final long FLUSH_BUFFER_SIZE = 1024 * 1024 * 5;
- private final MemoryCircuitBreaker breaker;
+ private final CircuitBreaker breaker;
private final TermsEnum termsEnum;
private final AbstractIndexFieldData.PerValueEstimator estimator;
private final String fieldName;
@@ -44,7 +44,7 @@ public final class RamAccountingTermsEnum extends FilteredTermsEnum {
private long flushBuffer;
- public RamAccountingTermsEnum(TermsEnum termsEnum, MemoryCircuitBreaker breaker, AbstractIndexFieldData.PerValueEstimator estimator,
+ public RamAccountingTermsEnum(TermsEnum termsEnum, CircuitBreaker breaker, AbstractIndexFieldData.PerValueEstimator estimator,
String fieldName) {
super(termsEnum);
this.breaker = breaker;
diff --git a/src/main/java/org/elasticsearch/index/fielddata/ordinals/GlobalOrdinalsBuilder.java b/src/main/java/org/elasticsearch/index/fielddata/ordinals/GlobalOrdinalsBuilder.java
index 7c47c6ed15e5a..41f630371d216 100644
--- a/src/main/java/org/elasticsearch/index/fielddata/ordinals/GlobalOrdinalsBuilder.java
+++ b/src/main/java/org/elasticsearch/index/fielddata/ordinals/GlobalOrdinalsBuilder.java
@@ -23,11 +23,12 @@
import org.apache.lucene.index.RandomAccessOrds;
import org.apache.lucene.index.XOrdinalMap;
import org.apache.lucene.util.packed.PackedInts;
+import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.fielddata.AtomicOrdinalsFieldData;
import org.elasticsearch.index.fielddata.IndexOrdinalsFieldData;
-import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService;
+import org.elasticsearch.indices.breaker.CircuitBreakerService;
import java.io.IOException;
@@ -52,7 +53,7 @@ public static IndexOrdinalsFieldData build(final IndexReader indexReader, IndexO
}
final XOrdinalMap ordinalMap = XOrdinalMap.build(null, subs, PackedInts.DEFAULT);
final long memorySizeInBytes = ordinalMap.ramBytesUsed();
- breakerService.getBreaker().addWithoutBreaking(memorySizeInBytes);
+ breakerService.getBreaker(CircuitBreaker.Name.FIELDDATA).addWithoutBreaking(memorySizeInBytes);
if (logger.isDebugEnabled()) {
logger.debug(
diff --git a/src/main/java/org/elasticsearch/index/fielddata/plain/AbstractIndexOrdinalsFieldData.java b/src/main/java/org/elasticsearch/index/fielddata/plain/AbstractIndexOrdinalsFieldData.java
index d3abf187ffd49..93547810e9dba 100644
--- a/src/main/java/org/elasticsearch/index/fielddata/plain/AbstractIndexOrdinalsFieldData.java
+++ b/src/main/java/org/elasticsearch/index/fielddata/plain/AbstractIndexOrdinalsFieldData.java
@@ -31,8 +31,8 @@
import org.elasticsearch.index.fielddata.fieldcomparator.BytesRefFieldComparatorSource;
import org.elasticsearch.index.fielddata.ordinals.GlobalOrdinalsBuilder;
import org.elasticsearch.index.mapper.FieldMapper.Names;
-import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService;
import org.elasticsearch.search.MultiValueMode;
+import org.elasticsearch.indices.breaker.CircuitBreakerService;
import java.io.IOException;
import java.util.Map;
diff --git a/src/main/java/org/elasticsearch/index/fielddata/plain/BytesBinaryDVIndexFieldData.java b/src/main/java/org/elasticsearch/index/fielddata/plain/BytesBinaryDVIndexFieldData.java
index 1098f3c917a14..3864e72b62338 100644
--- a/src/main/java/org/elasticsearch/index/fielddata/plain/BytesBinaryDVIndexFieldData.java
+++ b/src/main/java/org/elasticsearch/index/fielddata/plain/BytesBinaryDVIndexFieldData.java
@@ -33,7 +33,7 @@
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.mapper.FieldMapper.Names;
import org.elasticsearch.index.mapper.MapperService;
-import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService;
+import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.search.MultiValueMode;
import java.io.IOException;
diff --git a/src/main/java/org/elasticsearch/index/fielddata/plain/DisabledIndexFieldData.java b/src/main/java/org/elasticsearch/index/fielddata/plain/DisabledIndexFieldData.java
index 51da7ef7c1248..8ad8c5cfc5751 100644
--- a/src/main/java/org/elasticsearch/index/fielddata/plain/DisabledIndexFieldData.java
+++ b/src/main/java/org/elasticsearch/index/fielddata/plain/DisabledIndexFieldData.java
@@ -29,8 +29,8 @@
import org.elasticsearch.index.mapper.FieldMapper.Names;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.settings.IndexSettings;
-import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService;
import org.elasticsearch.search.MultiValueMode;
+import org.elasticsearch.indices.breaker.CircuitBreakerService;
/**
* A field data implementation that forbids loading and will throw an {@link org.elasticsearch.ElasticsearchIllegalStateException} if you try to load
diff --git a/src/main/java/org/elasticsearch/index/fielddata/plain/DocValuesIndexFieldData.java b/src/main/java/org/elasticsearch/index/fielddata/plain/DocValuesIndexFieldData.java
index 0605b0e909e36..ec3cb13659abd 100644
--- a/src/main/java/org/elasticsearch/index/fielddata/plain/DocValuesIndexFieldData.java
+++ b/src/main/java/org/elasticsearch/index/fielddata/plain/DocValuesIndexFieldData.java
@@ -37,7 +37,7 @@
import org.elasticsearch.index.mapper.internal.IdFieldMapper;
import org.elasticsearch.index.mapper.internal.TimestampFieldMapper;
import org.elasticsearch.index.mapper.internal.UidFieldMapper;
-import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService;
+import org.elasticsearch.indices.breaker.CircuitBreakerService;
import java.util.Map;
import java.util.Set;
diff --git a/src/main/java/org/elasticsearch/index/fielddata/plain/DoubleArrayIndexFieldData.java b/src/main/java/org/elasticsearch/index/fielddata/plain/DoubleArrayIndexFieldData.java
index 327cba1420bca..3cf762db2db25 100644
--- a/src/main/java/org/elasticsearch/index/fielddata/plain/DoubleArrayIndexFieldData.java
+++ b/src/main/java/org/elasticsearch/index/fielddata/plain/DoubleArrayIndexFieldData.java
@@ -22,6 +22,7 @@
import org.apache.lucene.index.*;
import org.apache.lucene.util.*;
import org.elasticsearch.common.Nullable;
+import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.DoubleArray;
@@ -34,7 +35,7 @@
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.settings.IndexSettings;
-import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService;
+import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.search.MultiValueMode;
/**
@@ -70,7 +71,7 @@ public AtomicNumericFieldData loadDirect(AtomicReaderContext context) throws Exc
Terms terms = reader.terms(getFieldNames().indexName());
AtomicNumericFieldData data = null;
// TODO: Use an actual estimator to estimate before loading.
- NonEstimatingEstimator estimator = new NonEstimatingEstimator(breakerService.getBreaker());
+ NonEstimatingEstimator estimator = new NonEstimatingEstimator(breakerService.getBreaker(CircuitBreaker.Name.FIELDDATA));
if (terms == null) {
data = AtomicDoubleFieldData.empty(reader.maxDoc());
estimator.afterLoad(null, data.ramBytesUsed());
diff --git a/src/main/java/org/elasticsearch/index/fielddata/plain/FSTBytesIndexFieldData.java b/src/main/java/org/elasticsearch/index/fielddata/plain/FSTBytesIndexFieldData.java
index 5aa8e371629ef..85cd1e2c14f5c 100644
--- a/src/main/java/org/elasticsearch/index/fielddata/plain/FSTBytesIndexFieldData.java
+++ b/src/main/java/org/elasticsearch/index/fielddata/plain/FSTBytesIndexFieldData.java
@@ -25,6 +25,7 @@
import org.apache.lucene.util.fst.FST.INPUT_TYPE;
import org.apache.lucene.util.fst.PositiveIntOutputs;
import org.apache.lucene.util.fst.Util;
+import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.fielddata.*;
@@ -33,7 +34,7 @@
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.settings.IndexSettings;
-import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService;
+import org.elasticsearch.indices.breaker.CircuitBreakerService;
/**
*/
@@ -63,7 +64,7 @@ public AtomicOrdinalsFieldData loadDirect(AtomicReaderContext context) throws Ex
Terms terms = reader.terms(getFieldNames().indexName());
AtomicOrdinalsFieldData data = null;
// TODO: Use an actual estimator to estimate before loading.
- NonEstimatingEstimator estimator = new NonEstimatingEstimator(breakerService.getBreaker());
+ NonEstimatingEstimator estimator = new NonEstimatingEstimator(breakerService.getBreaker(CircuitBreaker.Name.FIELDDATA));
if (terms == null) {
data = AbstractAtomicOrdinalsFieldData.empty();
estimator.afterLoad(null, data.ramBytesUsed());
diff --git a/src/main/java/org/elasticsearch/index/fielddata/plain/FloatArrayIndexFieldData.java b/src/main/java/org/elasticsearch/index/fielddata/plain/FloatArrayIndexFieldData.java
index 45415e796547a..3c0030408fec6 100644
--- a/src/main/java/org/elasticsearch/index/fielddata/plain/FloatArrayIndexFieldData.java
+++ b/src/main/java/org/elasticsearch/index/fielddata/plain/FloatArrayIndexFieldData.java
@@ -21,6 +21,7 @@
import org.apache.lucene.index.*;
import org.apache.lucene.util.*;
import org.elasticsearch.common.Nullable;
+import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.FloatArray;
@@ -33,7 +34,7 @@
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.settings.IndexSettings;
-import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService;
+import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.search.MultiValueMode;
/**
@@ -68,7 +69,7 @@ public AtomicNumericFieldData loadDirect(AtomicReaderContext context) throws Exc
Terms terms = reader.terms(getFieldNames().indexName());
AtomicNumericFieldData data = null;
// TODO: Use an actual estimator to estimate before loading.
- NonEstimatingEstimator estimator = new NonEstimatingEstimator(breakerService.getBreaker());
+ NonEstimatingEstimator estimator = new NonEstimatingEstimator(breakerService.getBreaker(CircuitBreaker.Name.FIELDDATA));
if (terms == null) {
data = AtomicDoubleFieldData.empty(reader.maxDoc());
estimator.afterLoad(null, data.ramBytesUsed());
diff --git a/src/main/java/org/elasticsearch/index/fielddata/plain/GeoPointBinaryDVIndexFieldData.java b/src/main/java/org/elasticsearch/index/fielddata/plain/GeoPointBinaryDVIndexFieldData.java
index 4165c0d8cb7a0..7f775ab13a1b8 100644
--- a/src/main/java/org/elasticsearch/index/fielddata/plain/GeoPointBinaryDVIndexFieldData.java
+++ b/src/main/java/org/elasticsearch/index/fielddata/plain/GeoPointBinaryDVIndexFieldData.java
@@ -31,7 +31,7 @@
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.mapper.FieldMapper.Names;
import org.elasticsearch.index.mapper.MapperService;
-import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService;
+import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.search.MultiValueMode;
import java.io.IOException;
diff --git a/src/main/java/org/elasticsearch/index/fielddata/plain/GeoPointCompressedIndexFieldData.java b/src/main/java/org/elasticsearch/index/fielddata/plain/GeoPointCompressedIndexFieldData.java
index aedaac01bdbdd..c33c74c38ab60 100644
--- a/src/main/java/org/elasticsearch/index/fielddata/plain/GeoPointCompressedIndexFieldData.java
+++ b/src/main/java/org/elasticsearch/index/fielddata/plain/GeoPointCompressedIndexFieldData.java
@@ -25,6 +25,7 @@
import org.apache.lucene.util.FixedBitSet;
import org.apache.lucene.util.packed.PackedInts;
import org.apache.lucene.util.packed.PagedMutable;
+import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.geo.GeoPoint;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.DistanceUnit;
@@ -38,7 +39,7 @@
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.geo.GeoPointFieldMapper;
import org.elasticsearch.index.settings.IndexSettings;
-import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService;
+import org.elasticsearch.indices.breaker.CircuitBreakerService;
/**
*/
@@ -82,7 +83,7 @@ public AtomicGeoPointFieldData loadDirect(AtomicReaderContext context) throws Ex
Terms terms = reader.terms(getFieldNames().indexName());
AtomicGeoPointFieldData data = null;
// TODO: Use an actual estimator to estimate before loading.
- NonEstimatingEstimator estimator = new NonEstimatingEstimator(breakerService.getBreaker());
+ NonEstimatingEstimator estimator = new NonEstimatingEstimator(breakerService.getBreaker(CircuitBreaker.Name.FIELDDATA));
if (terms == null) {
data = AbstractAtomicGeoPointFieldData.empty(reader.maxDoc());
estimator.afterLoad(null, data.ramBytesUsed());
diff --git a/src/main/java/org/elasticsearch/index/fielddata/plain/GeoPointDoubleArrayIndexFieldData.java b/src/main/java/org/elasticsearch/index/fielddata/plain/GeoPointDoubleArrayIndexFieldData.java
index fb2936c49d80a..e1684d65d6d06 100644
--- a/src/main/java/org/elasticsearch/index/fielddata/plain/GeoPointDoubleArrayIndexFieldData.java
+++ b/src/main/java/org/elasticsearch/index/fielddata/plain/GeoPointDoubleArrayIndexFieldData.java
@@ -23,6 +23,7 @@
import org.apache.lucene.index.RandomAccessOrds;
import org.apache.lucene.index.Terms;
import org.apache.lucene.util.FixedBitSet;
+import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.geo.GeoPoint;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
@@ -34,7 +35,7 @@
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.settings.IndexSettings;
-import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService;
+import org.elasticsearch.indices.breaker.CircuitBreakerService;
/**
*/
@@ -64,7 +65,7 @@ public AtomicGeoPointFieldData loadDirect(AtomicReaderContext context) throws Ex
Terms terms = reader.terms(getFieldNames().indexName());
AtomicGeoPointFieldData data = null;
// TODO: Use an actual estimator to estimate before loading.
- NonEstimatingEstimator estimator = new NonEstimatingEstimator(breakerService.getBreaker());
+ NonEstimatingEstimator estimator = new NonEstimatingEstimator(breakerService.getBreaker(CircuitBreaker.Name.FIELDDATA));
if (terms == null) {
data = AbstractAtomicGeoPointFieldData.empty(reader.maxDoc());
estimator.afterLoad(null, data.ramBytesUsed());
diff --git a/src/main/java/org/elasticsearch/index/fielddata/plain/IndexIndexFieldData.java b/src/main/java/org/elasticsearch/index/fielddata/plain/IndexIndexFieldData.java
index 3850d7920c585..ab875c62ab412 100644
--- a/src/main/java/org/elasticsearch/index/fielddata/plain/IndexIndexFieldData.java
+++ b/src/main/java/org/elasticsearch/index/fielddata/plain/IndexIndexFieldData.java
@@ -27,7 +27,7 @@
import org.elasticsearch.index.fielddata.*;
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.mapper.MapperService;
-import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService;
+import org.elasticsearch.indices.breaker.CircuitBreakerService;
public class IndexIndexFieldData extends AbstractIndexOrdinalsFieldData {
diff --git a/src/main/java/org/elasticsearch/index/fielddata/plain/NonEstimatingEstimator.java b/src/main/java/org/elasticsearch/index/fielddata/plain/NonEstimatingEstimator.java
index 5b35aa70eb1f8..481b2b3c84ca0 100644
--- a/src/main/java/org/elasticsearch/index/fielddata/plain/NonEstimatingEstimator.java
+++ b/src/main/java/org/elasticsearch/index/fielddata/plain/NonEstimatingEstimator.java
@@ -22,7 +22,7 @@
import org.apache.lucene.index.TermsEnum;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.Nullable;
-import org.elasticsearch.common.breaker.MemoryCircuitBreaker;
+import org.elasticsearch.common.breaker.CircuitBreaker;
import java.io.IOException;
@@ -33,9 +33,9 @@
*/
public class NonEstimatingEstimator implements AbstractIndexFieldData.PerValueEstimator {
- private final MemoryCircuitBreaker breaker;
+ private final CircuitBreaker breaker;
- NonEstimatingEstimator(MemoryCircuitBreaker breaker) {
+ NonEstimatingEstimator(CircuitBreaker breaker) {
this.breaker = breaker;
}
diff --git a/src/main/java/org/elasticsearch/index/fielddata/plain/PackedArrayIndexFieldData.java b/src/main/java/org/elasticsearch/index/fielddata/plain/PackedArrayIndexFieldData.java
index 971b3a0f933a3..2cffa02a2cb4c 100644
--- a/src/main/java/org/elasticsearch/index/fielddata/plain/PackedArrayIndexFieldData.java
+++ b/src/main/java/org/elasticsearch/index/fielddata/plain/PackedArrayIndexFieldData.java
@@ -27,7 +27,7 @@
import org.apache.lucene.util.packed.PackedInts;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.Nullable;
-import org.elasticsearch.common.breaker.MemoryCircuitBreaker;
+import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.fielddata.*;
@@ -38,7 +38,7 @@
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.settings.IndexSettings;
-import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService;
+import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.search.MultiValueMode;
import java.io.IOException;
@@ -88,7 +88,7 @@ public AtomicNumericFieldData loadDirect(AtomicReaderContext context) throws Exc
final AtomicReader reader = context.reader();
Terms terms = reader.terms(getFieldNames().indexName());
AtomicNumericFieldData data = null;
- PackedArrayEstimator estimator = new PackedArrayEstimator(breakerService.getBreaker(), getNumericType(), getFieldNames().fullName());
+ PackedArrayEstimator estimator = new PackedArrayEstimator(breakerService.getBreaker(CircuitBreaker.Name.FIELDDATA), getNumericType(), getFieldNames().fullName());
if (terms == null) {
data = AtomicLongFieldData.empty(reader.maxDoc());
estimator.adjustForNoTerms(data.ramBytesUsed());
@@ -355,11 +355,11 @@ public XFieldComparatorSource comparatorSource(@Nullable Object missingValue, Mu
*/
public class PackedArrayEstimator implements PerValueEstimator {
- private final MemoryCircuitBreaker breaker;
+ private final CircuitBreaker breaker;
private final NumericType type;
private final String fieldName;
- public PackedArrayEstimator(MemoryCircuitBreaker breaker, NumericType type, String fieldName) {
+ public PackedArrayEstimator(CircuitBreaker breaker, NumericType type, String fieldName) {
this.breaker = breaker;
this.type = type;
this.fieldName = fieldName;
diff --git a/src/main/java/org/elasticsearch/index/fielddata/plain/PagedBytesIndexFieldData.java b/src/main/java/org/elasticsearch/index/fielddata/plain/PagedBytesIndexFieldData.java
index d6d3e051a5fca..fbfda4af76501 100644
--- a/src/main/java/org/elasticsearch/index/fielddata/plain/PagedBytesIndexFieldData.java
+++ b/src/main/java/org/elasticsearch/index/fielddata/plain/PagedBytesIndexFieldData.java
@@ -24,7 +24,7 @@
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.PagedBytes;
import org.apache.lucene.util.packed.MonotonicAppendingLongBuffer;
-import org.elasticsearch.common.breaker.MemoryCircuitBreaker;
+import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.fielddata.*;
@@ -33,7 +33,7 @@
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.settings.IndexSettings;
-import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService;
+import org.elasticsearch.indices.breaker.CircuitBreakerService;
import java.io.IOException;
@@ -61,7 +61,7 @@ public AtomicOrdinalsFieldData loadDirect(AtomicReaderContext context) throws Ex
AtomicReader reader = context.reader();
AtomicOrdinalsFieldData data = null;
- PagedBytesEstimator estimator = new PagedBytesEstimator(context, breakerService.getBreaker(), getFieldNames().fullName());
+ PagedBytesEstimator estimator = new PagedBytesEstimator(context, breakerService.getBreaker(CircuitBreaker.Name.FIELDDATA), getFieldNames().fullName());
Terms terms = reader.terms(getFieldNames().indexName());
if (terms == null) {
data = AbstractAtomicOrdinalsFieldData.empty();
@@ -125,11 +125,11 @@ public AtomicOrdinalsFieldData loadDirect(AtomicReaderContext context) throws Ex
public class PagedBytesEstimator implements PerValueEstimator {
private final AtomicReaderContext context;
- private final MemoryCircuitBreaker breaker;
+ private final CircuitBreaker breaker;
private final String fieldName;
private long estimatedBytes;
- PagedBytesEstimator(AtomicReaderContext context, MemoryCircuitBreaker breaker, String fieldName) {
+ PagedBytesEstimator(AtomicReaderContext context, CircuitBreaker breaker, String fieldName) {
this.breaker = breaker;
this.context = context;
this.fieldName = fieldName;
diff --git a/src/main/java/org/elasticsearch/index/fielddata/plain/ParentChildIndexFieldData.java b/src/main/java/org/elasticsearch/index/fielddata/plain/ParentChildIndexFieldData.java
index 1b3f30e0bc64d..5e52b5a438f75 100644
--- a/src/main/java/org/elasticsearch/index/fielddata/plain/ParentChildIndexFieldData.java
+++ b/src/main/java/org/elasticsearch/index/fielddata/plain/ParentChildIndexFieldData.java
@@ -32,7 +32,7 @@
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.common.Nullable;
-import org.elasticsearch.common.breaker.MemoryCircuitBreaker;
+import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
@@ -47,7 +47,7 @@
import org.elasticsearch.index.mapper.internal.ParentFieldMapper;
import org.elasticsearch.index.mapper.internal.UidFieldMapper;
import org.elasticsearch.index.settings.IndexSettings;
-import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService;
+import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.search.MultiValueMode;
import java.io.IOException;
@@ -101,7 +101,7 @@ public ParentChildAtomicFieldData loadDirect(AtomicReaderContext context) throws
new ParentChildIntersectTermsEnum(reader, UidFieldMapper.NAME, ParentFieldMapper.NAME),
parentTypes
);
- ParentChildEstimator estimator = new ParentChildEstimator(breakerService.getBreaker(), termsEnum);
+ ParentChildEstimator estimator = new ParentChildEstimator(breakerService.getBreaker(CircuitBreaker.Name.FIELDDATA), termsEnum);
TermsEnum estimatedTermsEnum = estimator.beforeLoad(null);
ObjectObjectOpenHashMap typeBuilders = ObjectObjectOpenHashMap.newInstance();
try {
@@ -210,13 +210,13 @@ public IndexFieldData> build(Index index, @IndexSettings Settings indexSetting
*/
public class ParentChildEstimator implements PerValueEstimator {
- private final MemoryCircuitBreaker breaker;
+ private final CircuitBreaker breaker;
private final TermsEnum filteredEnum;
// The TermsEnum is passed in here instead of being generated in the
// beforeLoad() function since it's filtered inside the previous
// TermsEnum wrappers
- public ParentChildEstimator(MemoryCircuitBreaker breaker, TermsEnum filteredEnum) {
+ public ParentChildEstimator(CircuitBreaker breaker, TermsEnum filteredEnum) {
this.breaker = breaker;
this.filteredEnum = filteredEnum;
}
@@ -337,7 +337,7 @@ public int getOrd(int docID) {
}
}
- breakerService.getBreaker().addWithoutBreaking(ramBytesUsed);
+ breakerService.getBreaker(CircuitBreaker.Name.FIELDDATA).addWithoutBreaking(ramBytesUsed);
if (logger.isDebugEnabled()) {
logger.debug(
"Global-ordinals[_parent] took {}",
diff --git a/src/main/java/org/elasticsearch/index/fielddata/plain/SortedSetDVOrdinalsIndexFieldData.java b/src/main/java/org/elasticsearch/index/fielddata/plain/SortedSetDVOrdinalsIndexFieldData.java
index 3a1b85251a3d8..bf615b746a42f 100644
--- a/src/main/java/org/elasticsearch/index/fielddata/plain/SortedSetDVOrdinalsIndexFieldData.java
+++ b/src/main/java/org/elasticsearch/index/fielddata/plain/SortedSetDVOrdinalsIndexFieldData.java
@@ -29,8 +29,8 @@
import org.elasticsearch.index.fielddata.fieldcomparator.BytesRefFieldComparatorSource;
import org.elasticsearch.index.fielddata.ordinals.GlobalOrdinalsBuilder;
import org.elasticsearch.index.mapper.FieldMapper.Names;
-import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService;
import org.elasticsearch.search.MultiValueMode;
+import org.elasticsearch.indices.breaker.CircuitBreakerService;
public class SortedSetDVOrdinalsIndexFieldData extends DocValuesIndexFieldData implements IndexOrdinalsFieldData {
diff --git a/src/main/java/org/elasticsearch/indices/IndicesModule.java b/src/main/java/org/elasticsearch/indices/IndicesModule.java
index 76ac7e165d70e..dad12f9a295bb 100644
--- a/src/main/java/org/elasticsearch/indices/IndicesModule.java
+++ b/src/main/java/org/elasticsearch/indices/IndicesModule.java
@@ -29,10 +29,8 @@
import org.elasticsearch.indices.cache.filter.IndicesFilterCache;
import org.elasticsearch.indices.cache.filter.terms.IndicesTermsFilterCache;
import org.elasticsearch.indices.cluster.IndicesClusterStateService;
-import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService;
-import org.elasticsearch.indices.fielddata.breaker.InternalCircuitBreakerService;
-import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCacheListener;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
+import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCacheListener;
import org.elasticsearch.indices.memory.IndexingMemoryController;
import org.elasticsearch.indices.query.IndicesQueriesModule;
import org.elasticsearch.indices.recovery.RecoverySettings;
@@ -81,7 +79,6 @@ protected void configure() {
bind(IndicesWarmer.class).to(InternalIndicesWarmer.class).asEagerSingleton();
bind(UpdateHelper.class).asEagerSingleton();
- bind(CircuitBreakerService.class).to(InternalCircuitBreakerService.class).asEagerSingleton();
bind(IndicesFieldDataCacheListener.class).asEagerSingleton();
}
}
diff --git a/src/main/java/org/elasticsearch/indices/breaker/AllCircuitBreakerStats.java b/src/main/java/org/elasticsearch/indices/breaker/AllCircuitBreakerStats.java
new file mode 100644
index 0000000000000..4ba07a50d5909
--- /dev/null
+++ b/src/main/java/org/elasticsearch/indices/breaker/AllCircuitBreakerStats.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.indices.breaker;
+
+import org.elasticsearch.common.breaker.CircuitBreaker;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.io.stream.Streamable;
+import org.elasticsearch.common.xcontent.ToXContent;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentBuilderString;
+
+import java.io.IOException;
+
+/**
+ * Stats class encapsulating all of the different circuit breaker stats
+ */
+public class AllCircuitBreakerStats implements Streamable, ToXContent {
+
+ private CircuitBreakerStats[] allStats = new CircuitBreakerStats[0];
+
+ public AllCircuitBreakerStats() {
+
+ }
+
+ public AllCircuitBreakerStats(CircuitBreakerStats[] allStats) {
+ this.allStats = allStats;
+ }
+
+ public CircuitBreakerStats[] getAllStats() {
+ return this.allStats;
+ }
+
+ public CircuitBreakerStats getStats(CircuitBreaker.Name name) {
+ for (CircuitBreakerStats stats : allStats) {
+ if (stats.getName() == name) {
+ return stats;
+ }
+ }
+ return null;
+ }
+
+ public static AllCircuitBreakerStats readOptionalAllCircuitBreakerStats(StreamInput in) throws IOException {
+ AllCircuitBreakerStats stats = in.readOptionalStreamable(new AllCircuitBreakerStats());
+ return stats;
+ }
+
+ @Override
+ public void readFrom(StreamInput in) throws IOException {
+ int statCount = in.readVInt();
+ CircuitBreakerStats[] newStats = new CircuitBreakerStats[statCount];
+ for (int i = 0; i < statCount; i++) {
+ CircuitBreakerStats stats = new CircuitBreakerStats();
+ stats.readFrom(in);
+ newStats[i] = stats;
+ }
+ allStats = newStats;
+ }
+
+ @Override
+ public void writeTo(StreamOutput out) throws IOException {
+ out.writeVInt(allStats.length);
+ for (CircuitBreakerStats stats : allStats) {
+ if (stats != null) {
+ stats.writeTo(out);
+ }
+ }
+ }
+
+ @Override
+ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+ builder.startObject(Fields.BREAKERS);
+ for (CircuitBreakerStats stats : allStats) {
+ if (stats != null) {
+ stats.toXContent(builder, params);
+ }
+ }
+ builder.endObject();
+ return builder;
+ }
+
+ static final class Fields {
+ static final XContentBuilderString BREAKERS = new XContentBuilderString("breakers");
+ }
+}
diff --git a/src/main/java/org/elasticsearch/indices/breaker/BreakerSettings.java b/src/main/java/org/elasticsearch/indices/breaker/BreakerSettings.java
new file mode 100644
index 0000000000000..b76e4b45dee53
--- /dev/null
+++ b/src/main/java/org/elasticsearch/indices/breaker/BreakerSettings.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.indices.breaker;
+
+import org.elasticsearch.common.breaker.CircuitBreaker;
+import org.elasticsearch.common.unit.ByteSizeValue;
+
+/**
+ * Settings for a {@link CircuitBreaker}
+ */
+public class BreakerSettings {
+
+ private final CircuitBreaker.Name name;
+ private final long limitBytes;
+ private final double overhead;
+
+ public BreakerSettings(CircuitBreaker.Name name, long limitBytes, double overhead) {
+ this.name = name;
+ this.limitBytes = limitBytes;
+ this.overhead = overhead;
+ }
+
+ public CircuitBreaker.Name getName() {
+ return this.name;
+ }
+
+ public long getLimit() {
+ return this.limitBytes;
+ }
+
+ public double getOverhead() {
+ return this.overhead;
+ }
+
+ @Override
+ public String toString() {
+ return "[" + this.name.toString() +
+ ",limit=" + this.limitBytes + "/" + new ByteSizeValue(this.limitBytes) +
+ ",overhead=" + this.overhead + "]";
+ }
+}
diff --git a/src/main/java/org/elasticsearch/indices/fielddata/breaker/CircuitBreakerService.java b/src/main/java/org/elasticsearch/indices/breaker/CircuitBreakerModule.java
similarity index 57%
rename from src/main/java/org/elasticsearch/indices/fielddata/breaker/CircuitBreakerService.java
rename to src/main/java/org/elasticsearch/indices/breaker/CircuitBreakerModule.java
index c53cc3e1ba202..6033015ce546a 100644
--- a/src/main/java/org/elasticsearch/indices/fielddata/breaker/CircuitBreakerService.java
+++ b/src/main/java/org/elasticsearch/indices/breaker/CircuitBreakerModule.java
@@ -17,24 +17,23 @@
* under the License.
*/
-package org.elasticsearch.indices.fielddata.breaker;
+package org.elasticsearch.indices.breaker;
-import org.elasticsearch.common.breaker.MemoryCircuitBreaker;
+import org.elasticsearch.common.inject.AbstractModule;
+import org.elasticsearch.common.settings.Settings;
-/**
- * Interface for Circuit Breaker services, which provide breakers to classes
- * that load field data.
- */
-public interface CircuitBreakerService {
+public class CircuitBreakerModule extends AbstractModule {
+
+ public static final String IMPL = "indices.breaker.breaker_impl";
- /**
- * @return the breaker that can be used to register estimates against
- */
- public MemoryCircuitBreaker getBreaker();
+ private final Settings settings;
+ public CircuitBreakerModule(Settings settings) {
+ this.settings = settings;
+ }
- /**
- * @return stats about the breaker
- */
- public FieldDataBreakerStats stats();
+ @Override
+ protected void configure() {
+ bind(CircuitBreakerService.class).to(settings.getAsClass(IMPL, HierarchyCircuitBreakerService.class)).asEagerSingleton();
+ }
}
diff --git a/src/main/java/org/elasticsearch/indices/breaker/CircuitBreakerService.java b/src/main/java/org/elasticsearch/indices/breaker/CircuitBreakerService.java
new file mode 100644
index 0000000000000..35e5ee80e5a2e
--- /dev/null
+++ b/src/main/java/org/elasticsearch/indices/breaker/CircuitBreakerService.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.indices.breaker;
+
+import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.common.breaker.CircuitBreaker;
+import org.elasticsearch.common.component.AbstractLifecycleComponent;
+import org.elasticsearch.common.settings.Settings;
+
+/**
+ * Interface for Circuit Breaker services, which provide breakers to classes
+ * that load field data.
+ */
+public abstract class CircuitBreakerService extends AbstractLifecycleComponent {
+
+ protected CircuitBreakerService(Settings settings) {
+ super(settings);
+ }
+
+ /**
+ * @return the breaker that can be used to register estimates against
+ */
+ public abstract CircuitBreaker getBreaker(CircuitBreaker.Name type);
+
+ /**
+ * @return stats about all breakers
+ */
+ public abstract AllCircuitBreakerStats stats();
+
+ /**
+ * @return stats about a specific breaker
+ */
+ public abstract CircuitBreakerStats stats(CircuitBreaker.Name name);
+
+ protected void doStart() throws ElasticsearchException {
+ }
+
+ protected void doStop() throws ElasticsearchException {
+ }
+
+ protected void doClose() throws ElasticsearchException {
+ }
+}
diff --git a/src/main/java/org/elasticsearch/indices/fielddata/breaker/FieldDataBreakerStats.java b/src/main/java/org/elasticsearch/indices/breaker/CircuitBreakerStats.java
similarity index 61%
rename from src/main/java/org/elasticsearch/indices/fielddata/breaker/FieldDataBreakerStats.java
rename to src/main/java/org/elasticsearch/indices/breaker/CircuitBreakerStats.java
index 555c0f09efbd4..67bdd358dcd7a 100644
--- a/src/main/java/org/elasticsearch/indices/fielddata/breaker/FieldDataBreakerStats.java
+++ b/src/main/java/org/elasticsearch/indices/breaker/CircuitBreakerStats.java
@@ -17,9 +17,10 @@
* under the License.
*/
-package org.elasticsearch.indices.fielddata.breaker;
+package org.elasticsearch.indices.breaker;
import org.elasticsearch.Version;
+import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
@@ -29,30 +30,37 @@
import org.elasticsearch.common.xcontent.XContentBuilderString;
import java.io.IOException;
+import java.util.Locale;
/**
* Class encapsulating stats about the circuit breaker
*/
-public class FieldDataBreakerStats implements Streamable, ToXContent {
+public class CircuitBreakerStats implements Streamable, ToXContent {
- private long maximum;
+ private CircuitBreaker.Name name;
+ private long limit;
private long estimated;
private long trippedCount;
private double overhead;
- FieldDataBreakerStats() {
+ CircuitBreakerStats() {
}
- public FieldDataBreakerStats(long maximum, long estimated, double overhead, long trippedCount) {
- this.maximum = maximum;
+ public CircuitBreakerStats(CircuitBreaker.Name name, long limit, long estimated, double overhead, long trippedCount) {
+ this.name = name;
+ this.limit = limit;
this.estimated = estimated;
this.trippedCount = trippedCount;
this.overhead = overhead;
}
- public long getMaximum() {
- return this.maximum;
+ public CircuitBreaker.Name getName() {
+ return this.name;
+ }
+
+ public long getLimit() {
+ return this.limit;
}
public long getEstimated() {
@@ -67,14 +75,15 @@ public double getOverhead() {
return this.overhead;
}
- public static FieldDataBreakerStats readOptionalCircuitBreakerStats(StreamInput in) throws IOException {
- FieldDataBreakerStats stats = in.readOptionalStreamable(new FieldDataBreakerStats());
+ public static CircuitBreakerStats readOptionalCircuitBreakerStats(StreamInput in) throws IOException {
+ CircuitBreakerStats stats = in.readOptionalStreamable(new CircuitBreakerStats());
return stats;
}
@Override
public void readFrom(StreamInput in) throws IOException {
- maximum = in.readLong();
+ // limit is the maximum from the old circuit breaker stats for backwards compatibility
+ limit = in.readLong();
estimated = in.readLong();
overhead = in.readDouble();
if (in.getVersion().onOrAfter(Version.V_1_2_0)) {
@@ -82,23 +91,31 @@ public void readFrom(StreamInput in) throws IOException {
} else {
this.trippedCount = -1;
}
+ if (in.getVersion().onOrAfter(Version.V_1_4_0)) {
+ this.name = CircuitBreaker.Name.readFrom(in);
+ } else {
+ this.name = CircuitBreaker.Name.FIELDDATA;
+ }
}
@Override
public void writeTo(StreamOutput out) throws IOException {
- out.writeLong(maximum);
+ out.writeLong(limit);
out.writeLong(estimated);
out.writeDouble(overhead);
if (out.getVersion().onOrAfter(Version.V_1_2_0)) {
out.writeLong(trippedCount);
}
+ if (out.getVersion().onOrAfter(Version.V_1_4_0)) {
+ CircuitBreaker.Name.writeTo(name, out);
+ }
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
- builder.startObject(Fields.BREAKER);
- builder.field(Fields.MAX, maximum);
- builder.field(Fields.MAX_HUMAN, new ByteSizeValue(maximum));
+ builder.startObject(name.toString().toLowerCase(Locale.ROOT));
+ builder.field(Fields.LIMIT, limit);
+ builder.field(Fields.LIMIT_HUMAN, new ByteSizeValue(limit));
builder.field(Fields.ESTIMATED, estimated);
builder.field(Fields.ESTIMATED_HUMAN, new ByteSizeValue(estimated));
builder.field(Fields.OVERHEAD, overhead);
@@ -107,10 +124,17 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
return builder;
}
+ @Override
+ public String toString() {
+ return "[" + this.name.toString() +
+ ",limit=" + this.limit + "/" + new ByteSizeValue(this.limit) +
+ ",estimated=" + this.estimated + "/" + new ByteSizeValue(this.estimated) +
+ ",overhead=" + this.overhead + ",tripped=" + this.trippedCount + "]";
+ }
+
static final class Fields {
- static final XContentBuilderString BREAKER = new XContentBuilderString("fielddata_breaker");
- static final XContentBuilderString MAX = new XContentBuilderString("maximum_size_in_bytes");
- static final XContentBuilderString MAX_HUMAN = new XContentBuilderString("maximum_size");
+ static final XContentBuilderString LIMIT = new XContentBuilderString("limit_size_in_bytes");
+ static final XContentBuilderString LIMIT_HUMAN = new XContentBuilderString("limit_size");
static final XContentBuilderString ESTIMATED = new XContentBuilderString("estimated_size_in_bytes");
static final XContentBuilderString ESTIMATED_HUMAN = new XContentBuilderString("estimated_size");
static final XContentBuilderString OVERHEAD = new XContentBuilderString("overhead");
diff --git a/src/main/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerService.java b/src/main/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerService.java
new file mode 100644
index 0000000000000..67bfaf4688815
--- /dev/null
+++ b/src/main/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerService.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.indices.breaker;
+
+import com.google.common.collect.ImmutableMap;
+import org.elasticsearch.ElasticsearchIllegalStateException;
+import org.elasticsearch.common.breaker.ChildMemoryCircuitBreaker;
+import org.elasticsearch.common.breaker.CircuitBreaker;
+import org.elasticsearch.common.breaker.CircuitBreakingException;
+import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.node.settings.NodeSettingsService;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static com.google.common.collect.Lists.newArrayList;
+
+/**
+ * CircuitBreakerService that attempts to redistribute space between breakers
+ * if tripped
+ */
+public class HierarchyCircuitBreakerService extends CircuitBreakerService {
+
+ private volatile ImmutableMap breakers;
+
+ public static final String TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING = "indices.breaker.total.limit";
+ public static final String DEFAULT_TOTAL_CIRCUIT_BREAKER_LIMIT = "70%";
+
+ public static final String FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING = "indices.breaker.fielddata.limit";
+ public static final String FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING = "indices.breaker.fielddata.overhead";
+ public static final String DEFAULT_FIELDDATA_BREAKER_LIMIT = "60%";
+ public static final double DEFAULT_FIELDDATA_OVERHEAD_CONSTANT = 1.03;
+
+ public static final String REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING = "indices.breaker.request.limit";
+ public static final String REQUEST_CIRCUIT_BREAKER_OVERHEAD_SETTING = "indices.breaker.request.overhead";
+ public static final String DEFAULT_REQUEST_BREAKER_LIMIT = "40%";
+
+ private volatile BreakerSettings parentSettings;
+ private volatile BreakerSettings fielddataSettings;
+ private volatile BreakerSettings requestSettings;
+
+ // Tripped count for when redistribution was attempted but wasn't successful
+ private final AtomicLong parentTripCount = new AtomicLong(0);
+
+ @Inject
+ public HierarchyCircuitBreakerService(Settings settings, NodeSettingsService nodeSettingsService) {
+ super(settings);
+
+ // This uses the old InternalCircuitBreakerService.CIRCUIT_BREAKER_MAX_BYTES_SETTING
+ // setting to keep backwards compatibility with 1.2, it can be safely
+ // removed when compatibility with 1.2 is no longer needed
+ String compatibilityFielddataLimitDefault = DEFAULT_FIELDDATA_BREAKER_LIMIT;
+ ByteSizeValue compatibilityFielddataLimit = settings.getAsMemory(InternalCircuitBreakerService.CIRCUIT_BREAKER_MAX_BYTES_SETTING, null);
+ if (compatibilityFielddataLimit != null) {
+ compatibilityFielddataLimitDefault = compatibilityFielddataLimit.toString();
+ }
+
+ // This uses the old InternalCircuitBreakerService.CIRCUIT_BREAKER_OVERHEAD_SETTING
+ // setting to keep backwards compatibility with 1.2, it can be safely
+ // removed when compatibility with 1.2 is no longer needed
+ double compatibilityFielddataOverheadDefault = DEFAULT_FIELDDATA_OVERHEAD_CONSTANT;
+ Double compatibilityFielddataOverhead = settings.getAsDouble(InternalCircuitBreakerService.CIRCUIT_BREAKER_OVERHEAD_SETTING, null);
+ if (compatibilityFielddataOverhead != null) {
+ compatibilityFielddataOverheadDefault = compatibilityFielddataOverhead;
+ }
+
+ this.fielddataSettings = new BreakerSettings(CircuitBreaker.Name.FIELDDATA,
+ settings.getAsMemory(FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING, compatibilityFielddataLimitDefault).bytes(),
+ settings.getAsDouble(FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING, compatibilityFielddataOverheadDefault));
+
+ this.requestSettings = new BreakerSettings(CircuitBreaker.Name.REQUEST,
+ settings.getAsMemory(REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING, DEFAULT_REQUEST_BREAKER_LIMIT).bytes(),
+ settings.getAsDouble(REQUEST_CIRCUIT_BREAKER_OVERHEAD_SETTING, 1.0));
+
+ // Validate the configured settings
+ validateSettings(new BreakerSettings[] {this.requestSettings, this.fielddataSettings});
+
+ this.parentSettings = new BreakerSettings(CircuitBreaker.Name.PARENT,
+ settings.getAsMemory(TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING, DEFAULT_TOTAL_CIRCUIT_BREAKER_LIMIT).bytes(), 1.0);
+ if (logger.isTraceEnabled()) {
+ logger.trace("parent circuit breaker with settings {}", this.parentSettings);
+ }
+
+ Map tempBreakers = new HashMap<>();
+ tempBreakers.put(CircuitBreaker.Name.FIELDDATA, new ChildMemoryCircuitBreaker(fielddataSettings, logger, this, CircuitBreaker.Name.FIELDDATA));
+ tempBreakers.put(CircuitBreaker.Name.REQUEST, new ChildMemoryCircuitBreaker(requestSettings, logger, this, CircuitBreaker.Name.REQUEST));
+ this.breakers = ImmutableMap.copyOf(tempBreakers);
+
+ nodeSettingsService.addListener(new ApplySettings());
+ }
+
+ public class ApplySettings implements NodeSettingsService.Listener {
+
+ @Override
+ public void onRefreshSettings(Settings settings) {
+ boolean changed = false;
+
+ // Fielddata settings
+ BreakerSettings newFielddataSettings = HierarchyCircuitBreakerService.this.fielddataSettings;
+ ByteSizeValue newFielddataMax = settings.getAsMemory(FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING, null);
+ Double newFielddataOverhead = settings.getAsDouble(FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING, null);
+ if (newFielddataMax != null || newFielddataOverhead != null) {
+ changed = true;
+ long newFielddataLimitBytes = newFielddataMax == null ? HierarchyCircuitBreakerService.this.fielddataSettings.getLimit() : newFielddataMax.bytes();
+ newFielddataOverhead = newFielddataOverhead == null ? HierarchyCircuitBreakerService.this.fielddataSettings.getOverhead() : newFielddataOverhead;
+
+ newFielddataSettings = new BreakerSettings(CircuitBreaker.Name.FIELDDATA, newFielddataLimitBytes, newFielddataOverhead);
+ }
+
+ // Request settings
+ BreakerSettings newRequestSettings = HierarchyCircuitBreakerService.this.requestSettings;
+ ByteSizeValue newRequestMax = settings.getAsMemory(REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING, null);
+ Double newRequestOverhead = settings.getAsDouble(REQUEST_CIRCUIT_BREAKER_OVERHEAD_SETTING, null);
+ if (newRequestMax != null || newRequestOverhead != null) {
+ changed = true;
+ long newRequestLimitBytes = newRequestMax == null ? HierarchyCircuitBreakerService.this.requestSettings.getLimit() : newRequestMax.bytes();
+ newRequestOverhead = newRequestOverhead == null ? HierarchyCircuitBreakerService.this.requestSettings.getOverhead() : newRequestOverhead;
+
+ newRequestSettings = new BreakerSettings(CircuitBreaker.Name.REQUEST, newRequestLimitBytes, newRequestOverhead);
+ }
+
+ // Parent settings
+ BreakerSettings newParentSettings = HierarchyCircuitBreakerService.this.parentSettings;
+ long oldParentMax = HierarchyCircuitBreakerService.this.parentSettings.getLimit();
+ ByteSizeValue newParentMax = settings.getAsMemory(TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING, null);
+ if (newParentMax != null && (newParentMax.bytes() != oldParentMax)) {
+ changed = true;
+ newParentSettings = new BreakerSettings(CircuitBreaker.Name.PARENT, newParentMax.bytes(), 1.0);
+ }
+
+ if (changed) {
+ // change all the things
+ validateSettings(new BreakerSettings[] {newFielddataSettings, newRequestSettings});
+ logger.info("Updating settings parent: {}, fielddata: {}, request: {}", newParentSettings, newFielddataSettings, newRequestSettings);
+ HierarchyCircuitBreakerService.this.parentSettings = newParentSettings;
+ HierarchyCircuitBreakerService.this.fielddataSettings = newFielddataSettings;
+ HierarchyCircuitBreakerService.this.requestSettings = newRequestSettings;
+ Map tempBreakers = new HashMap<>();
+ tempBreakers.put(CircuitBreaker.Name.FIELDDATA, new ChildMemoryCircuitBreaker(newFielddataSettings,
+ (ChildMemoryCircuitBreaker)HierarchyCircuitBreakerService.this.breakers.get(CircuitBreaker.Name.FIELDDATA),
+ logger, HierarchyCircuitBreakerService.this, CircuitBreaker.Name.FIELDDATA));
+ tempBreakers.put(CircuitBreaker.Name.REQUEST, new ChildMemoryCircuitBreaker(newRequestSettings,
+ (ChildMemoryCircuitBreaker)HierarchyCircuitBreakerService.this.breakers.get(CircuitBreaker.Name.REQUEST),
+ logger, HierarchyCircuitBreakerService.this, CircuitBreaker.Name.REQUEST));
+ HierarchyCircuitBreakerService.this.breakers = ImmutableMap.copyOf(tempBreakers);
+ }
+ }
+ }
+
+ /**
+ * Validate that child settings are valid
+ * @throws ElasticsearchIllegalStateException
+ */
+ public static void validateSettings(BreakerSettings[] childrenSettings) throws ElasticsearchIllegalStateException {
+ for (BreakerSettings childSettings : childrenSettings) {
+ // If the child is disabled, ignore it
+ if (childSettings.getLimit() == -1) {
+ continue;
+ }
+
+ if (childSettings.getOverhead() < 0) {
+ throw new ElasticsearchIllegalStateException("Child breaker overhead " + childSettings + " must be non-negative");
+ }
+ }
+ }
+
+ @Override
+ public CircuitBreaker getBreaker(CircuitBreaker.Name name) {
+ return this.breakers.get(name);
+ }
+
+ @Override
+ public AllCircuitBreakerStats stats() {
+ long parentEstimated = 0;
+ List allStats = newArrayList();
+ // Gather the "estimated" count for the parent breaker by adding the
+ // estimations for each individual breaker
+ for (CircuitBreaker breaker : this.breakers.values()) {
+ allStats.add(stats(breaker.getName()));
+ parentEstimated += breaker.getUsed();
+ }
+ // Manually add the parent breaker settings since they aren't part of the breaker map
+ allStats.add(new CircuitBreakerStats(CircuitBreaker.Name.PARENT, parentSettings.getLimit(),
+ parentEstimated, 1.0, parentTripCount.get()));
+ return new AllCircuitBreakerStats(allStats.toArray(new CircuitBreakerStats[allStats.size()]));
+ }
+
+ @Override
+ public CircuitBreakerStats stats(CircuitBreaker.Name name) {
+ CircuitBreaker breaker = this.breakers.get(name);
+ return new CircuitBreakerStats(breaker.getName(), breaker.getLimit(), breaker.getUsed(), breaker.getOverhead(), breaker.getTrippedCount());
+ }
+
+ /**
+ * Checks whether the parent breaker has been tripped
+ * @param label
+ * @throws CircuitBreakingException
+ */
+ public void checkParentLimit(String label) throws CircuitBreakingException {
+ long totalUsed = 0;
+ for (CircuitBreaker breaker : this.breakers.values()) {
+ totalUsed += (breaker.getUsed() * breaker.getOverhead());
+ }
+
+ long parentLimit = this.parentSettings.getLimit();
+ if (totalUsed > parentLimit) {
+ this.parentTripCount.incrementAndGet();
+ throw new CircuitBreakingException("[PARENT] Data too large, data for [" +
+ label + "] would be larger than limit of [" +
+ parentLimit + "/" + new ByteSizeValue(parentLimit) + "]",
+ totalUsed, parentLimit);
+ }
+ }
+}
diff --git a/src/main/java/org/elasticsearch/indices/fielddata/breaker/InternalCircuitBreakerService.java b/src/main/java/org/elasticsearch/indices/breaker/InternalCircuitBreakerService.java
similarity index 83%
rename from src/main/java/org/elasticsearch/indices/fielddata/breaker/InternalCircuitBreakerService.java
rename to src/main/java/org/elasticsearch/indices/breaker/InternalCircuitBreakerService.java
index 7eabc0c24d84f..c64779c96b468 100644
--- a/src/main/java/org/elasticsearch/indices/fielddata/breaker/InternalCircuitBreakerService.java
+++ b/src/main/java/org/elasticsearch/indices/breaker/InternalCircuitBreakerService.java
@@ -16,11 +16,10 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.elasticsearch.indices.fielddata.breaker;
+package org.elasticsearch.indices.breaker;
-import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.breaker.MemoryCircuitBreaker;
-import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
@@ -32,7 +31,7 @@
* that can be used to keep track of memory usage across the node, preventing
* actions that could cause an {@link OutOfMemoryError} on the node.
*/
-public class InternalCircuitBreakerService extends AbstractLifecycleComponent implements CircuitBreakerService {
+public class InternalCircuitBreakerService extends CircuitBreakerService {
public static final String CIRCUIT_BREAKER_MAX_BYTES_SETTING = "indices.fielddata.breaker.limit";
public static final String CIRCUIT_BREAKER_OVERHEAD_SETTING = "indices.fielddata.breaker.overhead";
@@ -87,7 +86,8 @@ InternalCircuitBreakerService.this.maxBytes, new ByteSizeValue(InternalCircuitBr
/**
* @return a {@link org.elasticsearch.common.breaker.MemoryCircuitBreaker} that can be used for aggregating memory usage
*/
- public MemoryCircuitBreaker getBreaker() {
+ public MemoryCircuitBreaker getBreaker(CircuitBreaker.Name name) {
+ // Return the only breaker, regardless of name
return this.breaker;
}
@@ -104,19 +104,17 @@ public synchronized void resetBreaker() {
}
@Override
- public FieldDataBreakerStats stats() {
- return new FieldDataBreakerStats(breaker.getMaximum(), breaker.getUsed(), breaker.getOverhead(), breaker.getTrippedCount());
+ public AllCircuitBreakerStats stats() {
+ return new AllCircuitBreakerStats(new CircuitBreakerStats[]{
+ new CircuitBreakerStats(CircuitBreaker.Name.FIELDDATA, breaker.getLimit(), breaker.getUsed(),
+ breaker.getOverhead(), breaker.getTrippedCount())
+ });
}
@Override
- protected void doStart() throws ElasticsearchException {
- }
-
- @Override
- protected void doStop() throws ElasticsearchException {
- }
-
- @Override
- protected void doClose() throws ElasticsearchException {
+ public CircuitBreakerStats stats(CircuitBreaker.Name name) {
+ // There is only a single breaker, so always return it
+ return new CircuitBreakerStats(CircuitBreaker.Name.FIELDDATA, breaker.getLimit(), breaker.getUsed(),
+ breaker.getOverhead(), breaker.getTrippedCount());
}
}
diff --git a/src/main/java/org/elasticsearch/indices/fielddata/breaker/NoneCircuitBreakerService.java b/src/main/java/org/elasticsearch/indices/breaker/NoneCircuitBreakerService.java
similarity index 65%
rename from src/main/java/org/elasticsearch/indices/fielddata/breaker/NoneCircuitBreakerService.java
rename to src/main/java/org/elasticsearch/indices/breaker/NoneCircuitBreakerService.java
index f5fa7dac8d9f0..c8d76ee9bfd90 100644
--- a/src/main/java/org/elasticsearch/indices/fielddata/breaker/NoneCircuitBreakerService.java
+++ b/src/main/java/org/elasticsearch/indices/breaker/NoneCircuitBreakerService.java
@@ -17,31 +17,41 @@
* under the License.
*/
-package org.elasticsearch.indices.fielddata.breaker;
+package org.elasticsearch.indices.breaker;
+import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.breaker.MemoryCircuitBreaker;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
+import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.unit.ByteSizeValue;
/**
* Class that returns a breaker that never breaks
*/
-public class NoneCircuitBreakerService implements CircuitBreakerService {
+public class NoneCircuitBreakerService extends CircuitBreakerService {
private final ESLogger logger = Loggers.getLogger(NoneCircuitBreakerService.class);
private final MemoryCircuitBreaker breaker = new MemoryCircuitBreaker(new ByteSizeValue(Long.MAX_VALUE), 0.0, logger);
- public NoneCircuitBreakerService() {}
+ public NoneCircuitBreakerService() {
+ super(ImmutableSettings.EMPTY);
+ }
+
@Override
- public MemoryCircuitBreaker getBreaker() {
+ public MemoryCircuitBreaker getBreaker(CircuitBreaker.Name name) {
return breaker;
}
@Override
- public FieldDataBreakerStats stats() {
- return new FieldDataBreakerStats(-1, -1, 0, 0);
+ public AllCircuitBreakerStats stats() {
+ return new AllCircuitBreakerStats(new CircuitBreakerStats[] {stats(CircuitBreaker.Name.FIELDDATA)});
+ }
+
+ @Override
+ public CircuitBreakerStats stats(CircuitBreaker.Name name) {
+ return new CircuitBreakerStats(CircuitBreaker.Name.FIELDDATA, -1, -1, 0, 0);
}
}
diff --git a/src/main/java/org/elasticsearch/indices/fielddata/cache/IndicesFieldDataCacheListener.java b/src/main/java/org/elasticsearch/indices/fielddata/cache/IndicesFieldDataCacheListener.java
index 30dcf64ac376e..5bbdc5fa8deba 100644
--- a/src/main/java/org/elasticsearch/indices/fielddata/cache/IndicesFieldDataCacheListener.java
+++ b/src/main/java/org/elasticsearch/indices/fielddata/cache/IndicesFieldDataCacheListener.java
@@ -20,11 +20,12 @@
package org.elasticsearch.indices.fielddata.cache;
import org.apache.lucene.util.Accountable;
+import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.index.fielddata.FieldDataType;
import org.elasticsearch.index.fielddata.IndexFieldDataCache;
import org.elasticsearch.index.mapper.FieldMapper;
-import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService;
+import org.elasticsearch.indices.breaker.CircuitBreakerService;
/**
* A {@link IndexFieldDataCache.Listener} implementation that updates indices (node) level statistics / service about
@@ -48,7 +49,7 @@ public void onLoad(FieldMapper.Names fieldNames, FieldDataType fieldDataType, Ac
@Override
public void onUnload(FieldMapper.Names fieldNames, FieldDataType fieldDataType, boolean wasEvicted, long sizeInBytes) {
assert sizeInBytes >= 0 : "When reducing circuit breaker, it should be adjusted with a number higher or equal to 0 and not [" + sizeInBytes + "]";
- circuitBreakerService.getBreaker().addWithoutBreaking(-sizeInBytes);
+ circuitBreakerService.getBreaker(CircuitBreaker.Name.FIELDDATA).addWithoutBreaking(-sizeInBytes);
}
}
diff --git a/src/main/java/org/elasticsearch/node/internal/InternalNode.java b/src/main/java/org/elasticsearch/node/internal/InternalNode.java
index 8668383db03c1..c8a12fa221978 100644
--- a/src/main/java/org/elasticsearch/node/internal/InternalNode.java
+++ b/src/main/java/org/elasticsearch/node/internal/InternalNode.java
@@ -19,7 +19,6 @@
package org.elasticsearch.node.internal;
-import org.apache.lucene.util.IOUtils;
import org.elasticsearch.Build;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
@@ -39,7 +38,6 @@
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.routing.RoutingService;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
-import org.elasticsearch.cluster.service.InternalClusterService;
import org.elasticsearch.common.StopWatch;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.Lifecycle;
@@ -49,7 +47,6 @@
import org.elasticsearch.common.inject.Injectors;
import org.elasticsearch.common.inject.ModulesBuilder;
import org.elasticsearch.common.io.CachedStreams;
-import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
@@ -72,6 +69,7 @@
import org.elasticsearch.index.search.shape.ShapeModule;
import org.elasticsearch.indices.IndicesModule;
import org.elasticsearch.indices.IndicesService;
+import org.elasticsearch.indices.breaker.CircuitBreakerModule;
import org.elasticsearch.indices.cache.filter.IndicesFilterCache;
import org.elasticsearch.indices.cluster.IndicesClusterStateService;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
@@ -162,6 +160,7 @@ public InternalNode(Settings pSettings, boolean loadConfigSettings) throws Elast
modules.add(new Version.Module(version));
modules.add(new CacheRecyclerModule(settings));
modules.add(new PageCacheRecyclerModule(settings));
+ modules.add(new CircuitBreakerModule(settings));
modules.add(new BigArraysModule(settings));
modules.add(new PluginsModule(settings, pluginsService));
modules.add(new SettingsModule(settings));
diff --git a/src/main/java/org/elasticsearch/node/service/NodeService.java b/src/main/java/org/elasticsearch/node/service/NodeService.java
index b498a013e686c..e08aae33aff04 100644
--- a/src/main/java/org/elasticsearch/node/service/NodeService.java
+++ b/src/main/java/org/elasticsearch/node/service/NodeService.java
@@ -33,7 +33,7 @@
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.http.HttpServer;
import org.elasticsearch.indices.IndicesService;
-import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService;
+import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.monitor.MonitorService;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.threadpool.ThreadPool;
diff --git a/src/main/java/org/elasticsearch/percolator/PercolateContext.java b/src/main/java/org/elasticsearch/percolator/PercolateContext.java
index 44fcde3e1ea01..701e0648ffb2e 100644
--- a/src/main/java/org/elasticsearch/percolator/PercolateContext.java
+++ b/src/main/java/org/elasticsearch/percolator/PercolateContext.java
@@ -122,7 +122,7 @@ public PercolateContext(PercolateShardRequest request, SearchShardTarget searchS
this.types = new String[]{request.documentType()};
this.cacheRecycler = cacheRecycler;
this.pageCacheRecycler = pageCacheRecycler;
- this.bigArrays = bigArrays;
+ this.bigArrays = bigArrays.withCircuitBreaking();
this.querySearchResult = new QuerySearchResult(0, searchShardTarget);
this.engineSearcher = indexShard.acquireSearcher("percolate");
this.searcher = new ContextIndexSearcher(this, engineSearcher);
diff --git a/src/main/java/org/elasticsearch/search/internal/DefaultSearchContext.java b/src/main/java/org/elasticsearch/search/internal/DefaultSearchContext.java
index ce7a93c145e33..94dd0b7d81cb4 100644
--- a/src/main/java/org/elasticsearch/search/internal/DefaultSearchContext.java
+++ b/src/main/java/org/elasticsearch/search/internal/DefaultSearchContext.java
@@ -193,7 +193,8 @@ public DefaultSearchContext(long id, ShardSearchRequest request, SearchShardTarg
this.scriptService = scriptService;
this.cacheRecycler = cacheRecycler;
this.pageCacheRecycler = pageCacheRecycler;
- this.bigArrays = bigArrays;
+ // SearchContexts use a BigArrays that can circuit break
+ this.bigArrays = bigArrays.withCircuitBreaking();
this.dfsResult = new DfsSearchResult(id, shardTarget);
this.queryResult = new QuerySearchResult(id, shardTarget);
this.fetchResult = new FetchSearchResult(id, shardTarget);
diff --git a/src/test/java/org/elasticsearch/benchmark/fielddata/LongFieldDataBenchmark.java b/src/test/java/org/elasticsearch/benchmark/fielddata/LongFieldDataBenchmark.java
index 9e5ca40f8ea36..da57aa8867ae2 100644
--- a/src/test/java/org/elasticsearch/benchmark/fielddata/LongFieldDataBenchmark.java
+++ b/src/test/java/org/elasticsearch/benchmark/fielddata/LongFieldDataBenchmark.java
@@ -37,7 +37,7 @@
import org.elasticsearch.index.mapper.ContentPath;
import org.elasticsearch.index.mapper.Mapper.BuilderContext;
import org.elasticsearch.index.mapper.core.LongFieldMapper;
-import org.elasticsearch.indices.fielddata.breaker.NoneCircuitBreakerService;
+import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import java.util.Random;
diff --git a/src/test/java/org/elasticsearch/common/breaker/MemoryCircuitBreakerTests.java b/src/test/java/org/elasticsearch/common/breaker/MemoryCircuitBreakerTests.java
index cf65951049dd2..763b7b5173ee6 100644
--- a/src/test/java/org/elasticsearch/common/breaker/MemoryCircuitBreakerTests.java
+++ b/src/test/java/org/elasticsearch/common/breaker/MemoryCircuitBreakerTests.java
@@ -19,11 +19,17 @@
package org.elasticsearch.common.breaker;
+import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.indices.breaker.BreakerSettings;
+import org.elasticsearch.indices.breaker.CircuitBreakerService;
+import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
+import org.elasticsearch.node.settings.NodeSettingsService;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.junit.Test;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import static org.hamcrest.Matchers.equalTo;
@@ -75,6 +81,127 @@ public void run() {
assertThat("breaker was tripped exactly once", breaker.getTrippedCount(), equalTo(1L));
}
+ @Test
+ public void testThreadedUpdatesToChildBreaker() throws Exception {
+ final int NUM_THREADS = 5;
+ final int BYTES_PER_THREAD = 1000;
+ final Thread[] threads = new Thread[NUM_THREADS];
+ final AtomicBoolean tripped = new AtomicBoolean(false);
+ final AtomicReference lastException = new AtomicReference<>(null);
+
+ final AtomicReference breakerRef = new AtomicReference<>(null);
+ final CircuitBreakerService service = new HierarchyCircuitBreakerService(ImmutableSettings.EMPTY, new NodeSettingsService(ImmutableSettings.EMPTY)) {
+
+ @Override
+ public CircuitBreaker getBreaker(CircuitBreaker.Name type) {
+ return breakerRef.get();
+ }
+
+ @Override
+ public void checkParentLimit(String label) throws CircuitBreakingException {
+ // never trip
+ }
+ };
+ final BreakerSettings settings = new BreakerSettings(CircuitBreaker.Name.REQUEST, (BYTES_PER_THREAD * NUM_THREADS) - 1, 1.0);
+ final ChildMemoryCircuitBreaker breaker = new ChildMemoryCircuitBreaker(settings, logger,
+ (HierarchyCircuitBreakerService)service, CircuitBreaker.Name.REQUEST);
+ breakerRef.set(breaker);
+
+ for (int i = 0; i < NUM_THREADS; i++) {
+ threads[i] = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ for (int j = 0; j < BYTES_PER_THREAD; j++) {
+ try {
+ breaker.addEstimateBytesAndMaybeBreak(1L, "test");
+ } catch (CircuitBreakingException e) {
+ if (tripped.get()) {
+ assertThat("tripped too many times", true, equalTo(false));
+ } else {
+ assertThat(tripped.compareAndSet(false, true), equalTo(true));
+ }
+ } catch (Throwable e2) {
+ lastException.set(e2);
+ }
+ }
+ }
+ });
+
+ threads[i].start();
+ }
+
+ for (Thread t : threads) {
+ t.join();
+ }
+
+ assertThat("no other exceptions were thrown", lastException.get(), equalTo(null));
+ assertThat("breaker was tripped exactly once", tripped.get(), equalTo(true));
+ assertThat("breaker was tripped exactly once", breaker.getTrippedCount(), equalTo(1L));
+ }
+
+ @Test
+ public void testThreadedUpdatesToChildBreakerWithParentLimit() throws Exception {
+ final int NUM_THREADS = 5;
+ final int BYTES_PER_THREAD = 1000;
+ final Thread[] threads = new Thread[NUM_THREADS];
+ final AtomicInteger tripped = new AtomicInteger(0);
+ final AtomicReference lastException = new AtomicReference<>(null);
+
+ final AtomicInteger parentTripped = new AtomicInteger(0);
+ final AtomicReference breakerRef = new AtomicReference<>(null);
+ final CircuitBreakerService service = new HierarchyCircuitBreakerService(ImmutableSettings.EMPTY, new NodeSettingsService(ImmutableSettings.EMPTY)) {
+
+ @Override
+ public CircuitBreaker getBreaker(CircuitBreaker.Name type) {
+ return breakerRef.get();
+ }
+
+ @Override
+ public void checkParentLimit(String label) throws CircuitBreakingException {
+ // Parent will trip right before regular breaker would trip
+ if (getBreaker(CircuitBreaker.Name.REQUEST).getUsed() > (BYTES_PER_THREAD * NUM_THREADS) - 2) {
+ parentTripped.incrementAndGet();
+ throw new CircuitBreakingException("parent tripped");
+ }
+ }
+ };
+ final BreakerSettings settings = new BreakerSettings(CircuitBreaker.Name.REQUEST, (BYTES_PER_THREAD * NUM_THREADS) - 1, 1.0);
+ final ChildMemoryCircuitBreaker breaker = new ChildMemoryCircuitBreaker(settings, logger,
+ (HierarchyCircuitBreakerService)service, CircuitBreaker.Name.REQUEST);
+ breakerRef.set(breaker);
+
+ for (int i = 0; i < NUM_THREADS; i++) {
+ threads[i] = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ for (int j = 0; j < BYTES_PER_THREAD; j++) {
+ try {
+ breaker.addEstimateBytesAndMaybeBreak(1L, "test");
+ } catch (CircuitBreakingException e) {
+ tripped.incrementAndGet();
+ if (tripped.get() > 2) {
+ assertThat("tripped too many times: " + tripped.get(), true, equalTo(false));
+ }
+ } catch (Throwable e2) {
+ lastException.set(e2);
+ }
+ }
+ }
+ });
+
+ threads[i].start();
+ }
+
+ for (Thread t : threads) {
+ t.join();
+ }
+
+ assertThat("no other exceptions were thrown", lastException.get(), equalTo(null));
+ assertThat("breaker was tripped exactly once", breaker.getTrippedCount(), equalTo(0L));
+ assertThat("parent breaker was tripped exactly twice", parentTripped.get(), equalTo(2));
+ assertThat("total breaker was tripped exactly twice", tripped.get(), equalTo(2));
+ }
+
@Test
public void testConstantFactor() throws Exception {
final MemoryCircuitBreaker breaker = new MemoryCircuitBreaker(new ByteSizeValue(15), 1.6, logger);
diff --git a/src/test/java/org/elasticsearch/common/bytes/PagedBytesReferenceTest.java b/src/test/java/org/elasticsearch/common/bytes/PagedBytesReferenceTest.java
index becce28b85f6a..0b5fe1d6743cc 100644
--- a/src/test/java/org/elasticsearch/common/bytes/PagedBytesReferenceTest.java
+++ b/src/test/java/org/elasticsearch/common/bytes/PagedBytesReferenceTest.java
@@ -27,6 +27,7 @@
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.ByteArray;
+import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.hamcrest.Matchers;
import org.jboss.netty.buffer.ChannelBuffer;
@@ -49,7 +50,7 @@ public class PagedBytesReferenceTest extends ElasticsearchTestCase {
@Before
public void setUp() throws Exception {
super.setUp();
- bigarrays = new BigArrays(ImmutableSettings.EMPTY, null);
+ bigarrays = new BigArrays(ImmutableSettings.EMPTY, null, new NoneCircuitBreakerService());
}
@After
diff --git a/src/test/java/org/elasticsearch/common/util/BigArraysTests.java b/src/test/java/org/elasticsearch/common/util/BigArraysTests.java
index 89c5fe816b32f..bcf7e4a5dc5a6 100644
--- a/src/test/java/org/elasticsearch/common/util/BigArraysTests.java
+++ b/src/test/java/org/elasticsearch/common/util/BigArraysTests.java
@@ -20,9 +20,13 @@
package org.elasticsearch.common.util;
import org.apache.lucene.util.BytesRef;
-import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.cache.recycler.PageCacheRecycler;
+import org.elasticsearch.common.breaker.CircuitBreaker;
+import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
+import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
+import org.elasticsearch.node.settings.NodeSettingsService;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.elasticsearch.test.cache.recycler.MockBigArrays;
import org.elasticsearch.test.cache.recycler.MockPageCacheRecycler;
@@ -37,7 +41,7 @@ public class BigArraysTests extends ElasticsearchTestCase {
public static BigArrays randombigArrays() {
final PageCacheRecycler recycler = randomBoolean() ? null : new MockPageCacheRecycler(ImmutableSettings.EMPTY, new ThreadPool("BigArraysTests"));
- return new MockBigArrays(ImmutableSettings.EMPTY, recycler);
+ return new MockBigArrays(ImmutableSettings.EMPTY, recycler, new NoneCircuitBreakerService());
}
private BigArrays bigArrays;
@@ -283,7 +287,7 @@ public void testByteArrayEquals() {
// not equal: contents differ
final ByteArray a3 = byteArrayWithBytes(new byte[]{1,2,3});
- final ByteArray a4 = byteArrayWithBytes(new byte[]{1,1,3});
+ final ByteArray a4 = byteArrayWithBytes(new byte[]{1, 1, 3});
assertFalse(bigArrays.equals(a3, a4));
a3.close();
a4.close();
@@ -329,58 +333,51 @@ private ByteArray byteArrayWithBytes(byte[] bytes) {
return bytearray;
}
- public void testByteAccounting() throws Exception {
- for (String type : Arrays.asList("Byte", "Int", "Long", "Float", "Double", "Object")) {
- BigArrays bigArrays = new BigArrays(ImmutableSettings.builder().put(BigArrays.MAX_SIZE_IN_BYTES_SETTING, Long.MAX_VALUE).build(), null);
- Method create = BigArrays.class.getMethod("new" + type + "Array", long.class);
- final int size = scaledRandomIntBetween(5, 1 << 16);
- BigArray array = (BigArray) create.invoke(bigArrays, size);
- assertEquals(array.ramBytesUsed(), bigArrays.sizeInBytes());
- Method resize = BigArrays.class.getMethod("resize", array.getClass().getInterfaces()[0], long.class);
- int newSize = scaledRandomIntBetween(5, 1 << 16);
- array = (BigArray) resize.invoke(bigArrays, array, newSize);
- assertEquals(array.ramBytesUsed(), bigArrays.sizeInBytes());
- array.close();
- assertEquals(0, bigArrays.sizeInBytes());
- }
- }
-
public void testMaxSizeExceededOnNew() throws Exception {
final int size = scaledRandomIntBetween(5, 1 << 22);
for (String type : Arrays.asList("Byte", "Int", "Long", "Float", "Double", "Object")) {
- BigArrays bigArrays = new BigArrays(ImmutableSettings.builder().put(BigArrays.MAX_SIZE_IN_BYTES_SETTING, randomIntBetween(1, size)).build(), null);
+ HierarchyCircuitBreakerService hcbs = new HierarchyCircuitBreakerService(
+ ImmutableSettings.builder()
+ .put(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING, size)
+ .build(),
+ new NodeSettingsService(ImmutableSettings.EMPTY));
+ BigArrays bigArrays = new BigArrays(ImmutableSettings.EMPTY, null, hcbs).withCircuitBreaking();
Method create = BigArrays.class.getMethod("new" + type + "Array", long.class);
try {
create.invoke(bigArrays, size);
fail("expected an exception on " + create);
} catch (InvocationTargetException e) {
- assertTrue(e.getCause() instanceof ElasticsearchIllegalStateException);
+ assertTrue(e.getCause() instanceof CircuitBreakingException);
}
- assertEquals(0, bigArrays.sizeInBytes());
+ assertEquals(0, hcbs.getBreaker(CircuitBreaker.Name.REQUEST).getUsed());
}
}
public void testMaxSizeExceededOnResize() throws Exception {
for (String type : Arrays.asList("Byte", "Int", "Long", "Float", "Double", "Object")) {
final long maxSize = randomIntBetween(1 << 10, 1 << 22);
- BigArrays bigArrays = new BigArrays(ImmutableSettings.builder().put(BigArrays.MAX_SIZE_IN_BYTES_SETTING, maxSize).build(), null);
+ HierarchyCircuitBreakerService hcbs = new HierarchyCircuitBreakerService(
+ ImmutableSettings.builder()
+ .put(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING, maxSize)
+ .build(),
+ new NodeSettingsService(ImmutableSettings.EMPTY));
+ BigArrays bigArrays = new BigArrays(ImmutableSettings.EMPTY, null, hcbs).withCircuitBreaking();
Method create = BigArrays.class.getMethod("new" + type + "Array", long.class);
final int size = scaledRandomIntBetween(1, 20);
BigArray array = (BigArray) create.invoke(bigArrays, size);
Method resize = BigArrays.class.getMethod("resize", array.getClass().getInterfaces()[0], long.class);
while (true) {
long newSize = array.size() * 2;
- assertEquals(array.ramBytesUsed(), bigArrays.sizeInBytes());
try {
array = (BigArray) resize.invoke(bigArrays, array, newSize);
} catch (InvocationTargetException e) {
- assertTrue(e.getCause() instanceof ElasticsearchIllegalStateException);
+ assertTrue(e.getCause() instanceof CircuitBreakingException);
break;
}
}
- assertEquals(array.ramBytesUsed(), bigArrays.sizeInBytes());
+ assertEquals(array.ramBytesUsed(), hcbs.getBreaker(CircuitBreaker.Name.REQUEST).getUsed());
array.close();
- assertEquals(0, bigArrays.sizeInBytes());
+ assertEquals(0, hcbs.getBreaker(CircuitBreaker.Name.REQUEST).getUsed());
}
}
diff --git a/src/test/java/org/elasticsearch/index/query/TemplateQueryParserTest.java b/src/test/java/org/elasticsearch/index/query/TemplateQueryParserTest.java
index 58458c89c1038..508f69f6718eb 100644
--- a/src/test/java/org/elasticsearch/index/query/TemplateQueryParserTest.java
+++ b/src/test/java/org/elasticsearch/index/query/TemplateQueryParserTest.java
@@ -42,8 +42,8 @@
import org.elasticsearch.index.query.functionscore.FunctionScoreModule;
import org.elasticsearch.index.settings.IndexSettingsModule;
import org.elasticsearch.index.similarity.SimilarityModule;
-import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService;
-import org.elasticsearch.indices.fielddata.breaker.NoneCircuitBreakerService;
+import org.elasticsearch.indices.breaker.CircuitBreakerService;
+import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.indices.query.IndicesQueriesModule;
import org.elasticsearch.script.ScriptModule;
import org.elasticsearch.test.ElasticsearchTestCase;
diff --git a/src/test/java/org/elasticsearch/index/query/plugin/IndexQueryParserPlugin2Tests.java b/src/test/java/org/elasticsearch/index/query/plugin/IndexQueryParserPlugin2Tests.java
index 93643b9b0edb7..ef2c94f9f2a63 100644
--- a/src/test/java/org/elasticsearch/index/query/plugin/IndexQueryParserPlugin2Tests.java
+++ b/src/test/java/org/elasticsearch/index/query/plugin/IndexQueryParserPlugin2Tests.java
@@ -39,10 +39,8 @@
import org.elasticsearch.index.query.functionscore.FunctionScoreModule;
import org.elasticsearch.index.settings.IndexSettingsModule;
import org.elasticsearch.index.similarity.SimilarityModule;
-import org.elasticsearch.indices.fielddata.breaker.NoneCircuitBreakerService;
-import org.elasticsearch.indices.query.IndicesQueriesModule;
-import org.elasticsearch.script.ScriptModule;
-import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService;
+import org.elasticsearch.indices.breaker.CircuitBreakerService;
+import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.indices.query.IndicesQueriesModule;
import org.elasticsearch.script.ScriptModule;
import org.elasticsearch.test.ElasticsearchTestCase;
@@ -101,4 +99,4 @@ protected void configure() {
injector.getInstance(ThreadPool.class).shutdownNow();
}
-}
\ No newline at end of file
+}
diff --git a/src/test/java/org/elasticsearch/index/query/plugin/IndexQueryParserPluginTests.java b/src/test/java/org/elasticsearch/index/query/plugin/IndexQueryParserPluginTests.java
index 0e3b322e9f995..b6265f9633d60 100644
--- a/src/test/java/org/elasticsearch/index/query/plugin/IndexQueryParserPluginTests.java
+++ b/src/test/java/org/elasticsearch/index/query/plugin/IndexQueryParserPluginTests.java
@@ -39,10 +39,8 @@
import org.elasticsearch.index.query.functionscore.FunctionScoreModule;
import org.elasticsearch.index.settings.IndexSettingsModule;
import org.elasticsearch.index.similarity.SimilarityModule;
-import org.elasticsearch.indices.fielddata.breaker.NoneCircuitBreakerService;
-import org.elasticsearch.indices.query.IndicesQueriesModule;
-import org.elasticsearch.script.ScriptModule;
-import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService;
+import org.elasticsearch.indices.breaker.CircuitBreakerService;
+import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.indices.query.IndicesQueriesModule;
import org.elasticsearch.script.ScriptModule;
import org.elasticsearch.test.ElasticsearchTestCase;
@@ -110,4 +108,4 @@ protected void configure() {
injector.getInstance(ThreadPool.class).shutdownNow();
}
-}
\ No newline at end of file
+}
diff --git a/src/test/java/org/elasticsearch/indices/fielddata/breaker/CircuitBreakerServiceTests.java b/src/test/java/org/elasticsearch/indices/fielddata/breaker/CircuitBreakerServiceTests.java
deleted file mode 100644
index 76da699fc1668..0000000000000
--- a/src/test/java/org/elasticsearch/indices/fielddata/breaker/CircuitBreakerServiceTests.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * Licensed to Elasticsearch under one or more contributor
- * license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright
- * ownership. Elasticsearch licenses this file to you under
- * the Apache License, Version 2.0 (the "License"); you may
- * not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.elasticsearch.indices.fielddata.breaker;
-
-import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
-import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.common.collect.MapBuilder;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.rest.RestStatus;
-import org.elasticsearch.test.ElasticsearchIntegrationTest;
-import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
-import org.elasticsearch.test.junit.annotations.TestLogging;
-import org.junit.Test;
-
-import java.util.Arrays;
-
-import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
-import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
-import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope.TEST;
-import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
-import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertFailures;
-import static org.hamcrest.CoreMatchers.containsString;
-import static org.hamcrest.Matchers.greaterThanOrEqualTo;
-
-/**
- * Integration tests for InternalCircuitBreakerService
- */
-@ClusterScope(scope = TEST, randomDynamicTemplates = false)
-public class CircuitBreakerServiceTests extends ElasticsearchIntegrationTest {
-
- private String randomRidiculouslySmallLimit() {
- // 3 different ways to say 100 bytes
- return randomFrom(Arrays.asList("100b", "100"));
- //, (10000. / JvmInfo.jvmInfo().getMem().getHeapMax().bytes()) + "%")); // this is prone to rounding errors and will fail if JVM memory changes!
- }
-
- @Test
- @TestLogging("org.elasticsearch.indices.fielddata.breaker:TRACE,org.elasticsearch.index.fielddata:TRACE,org.elasticsearch.common.breaker:TRACE")
- public void testMemoryBreaker() {
- assertAcked(prepareCreate("cb-test", 1, settingsBuilder().put(SETTING_NUMBER_OF_REPLICAS, between(0, 1))));
- final Client client = client();
-
- try {
- // index some different terms so we have some field data for loading
- int docCount = scaledRandomIntBetween(300, 1000);
- for (long id = 0; id < docCount; id++) {
- client.prepareIndex("cb-test", "type", Long.toString(id))
- .setSource(MapBuilder.newMapBuilder().put("test", "value" + id).map()).execute().actionGet();
- }
-
- // refresh
- refresh();
-
- // execute a search that loads field data (sorting on the "test" field)
- client.prepareSearch("cb-test").setSource("{\"sort\": \"test\",\"query\":{\"match_all\":{}}}")
- .execute().actionGet();
-
- // clear field data cache (thus setting the loaded field data back to 0)
- client.admin().indices().prepareClearCache("cb-test").setFieldDataCache(true).execute().actionGet();
-
- // Update circuit breaker settings
- Settings settings = settingsBuilder()
- .put(InternalCircuitBreakerService.CIRCUIT_BREAKER_MAX_BYTES_SETTING, randomRidiculouslySmallLimit())
- .put(InternalCircuitBreakerService.CIRCUIT_BREAKER_OVERHEAD_SETTING, 1.05)
- .build();
- client.admin().cluster().prepareUpdateSettings().setTransientSettings(settings).execute().actionGet();
-
- // execute a search that loads field data (sorting on the "test" field)
- // again, this time it should trip the breaker
- assertFailures(client.prepareSearch("cb-test").setSource("{\"sort\": \"test\",\"query\":{\"match_all\":{}}}"),
- RestStatus.INTERNAL_SERVER_ERROR,
- containsString("Data too large, data for field [test] would be larger than limit of [100/100b]"));
-
- NodesStatsResponse stats = client.admin().cluster().prepareNodesStats().setBreaker(true).get();
- int breaks = 0;
- for (NodeStats stat : stats.getNodes()) {
- FieldDataBreakerStats breakerStats = stat.getBreaker();
- breaks += breakerStats.getTrippedCount();
- }
- assertThat(breaks, greaterThanOrEqualTo(1));
- } finally {
- // Reset settings
- Settings resetSettings = settingsBuilder()
- .put(InternalCircuitBreakerService.CIRCUIT_BREAKER_MAX_BYTES_SETTING, "-1")
- .put(InternalCircuitBreakerService.CIRCUIT_BREAKER_OVERHEAD_SETTING, InternalCircuitBreakerService.DEFAULT_OVERHEAD_CONSTANT)
- .build();
- client.admin().cluster().prepareUpdateSettings().setTransientSettings(resetSettings).execute().actionGet();
- }
- }
-
- @Test
- @TestLogging("org.elasticsearch.indices.fielddata.breaker:TRACE,org.elasticsearch.index.fielddata:TRACE,org.elasticsearch.common.breaker:TRACE")
- public void testRamAccountingTermsEnum() {
- final Client client = client();
-
- try {
- // Create an index where the mappings have a field data filter
- assertAcked(prepareCreate("ramtest").setSource("{\"mappings\": {\"type\": {\"properties\": {\"test\": " +
- "{\"type\": \"string\",\"fielddata\": {\"filter\": {\"regex\": {\"pattern\": \"^value.*\"}}}}}}}}"));
-
- // Wait 10 seconds for green
- client.admin().cluster().prepareHealth("ramtest").setWaitForGreenStatus().setTimeout("10s").execute().actionGet();
-
- // index some different terms so we have some field data for loading
- int docCount = scaledRandomIntBetween(300, 1000);
- for (long id = 0; id < docCount; id++) {
- client.prepareIndex("ramtest", "type", Long.toString(id))
- .setSource(MapBuilder.newMapBuilder().put("test", "value" + id).map()).execute().actionGet();
- }
-
- // refresh
- refresh();
-
- // execute a search that loads field data (sorting on the "test" field)
- client.prepareSearch("ramtest").setSource("{\"sort\": \"test\",\"query\":{\"match_all\":{}}}")
- .execute().actionGet();
-
- // clear field data cache (thus setting the loaded field data back to 0)
- client.admin().indices().prepareClearCache("ramtest").setFieldDataCache(true).execute().actionGet();
-
- // Update circuit breaker settings
- Settings settings = settingsBuilder()
- .put(InternalCircuitBreakerService.CIRCUIT_BREAKER_MAX_BYTES_SETTING, randomRidiculouslySmallLimit())
- .put(InternalCircuitBreakerService.CIRCUIT_BREAKER_OVERHEAD_SETTING, 1.05)
- .build();
- client.admin().cluster().prepareUpdateSettings().setTransientSettings(settings).execute().actionGet();
-
- // execute a search that loads field data (sorting on the "test" field)
- // again, this time it should trip the breaker
- assertFailures(client.prepareSearch("ramtest").setSource("{\"sort\": \"test\",\"query\":{\"match_all\":{}}}"),
- RestStatus.INTERNAL_SERVER_ERROR,
- containsString("Data too large, data for field [test] would be larger than limit of [100/100b]"));
-
- NodesStatsResponse stats = client.admin().cluster().prepareNodesStats().setBreaker(true).get();
- int breaks = 0;
- for (NodeStats stat : stats.getNodes()) {
- FieldDataBreakerStats breakerStats = stat.getBreaker();
- breaks += breakerStats.getTrippedCount();
- }
- assertThat(breaks, greaterThanOrEqualTo(1));
-
- } finally {
- // Reset settings
- Settings resetSettings = settingsBuilder()
- .put(InternalCircuitBreakerService.CIRCUIT_BREAKER_MAX_BYTES_SETTING, "-1")
- .put(InternalCircuitBreakerService.CIRCUIT_BREAKER_OVERHEAD_SETTING, InternalCircuitBreakerService.DEFAULT_OVERHEAD_CONSTANT)
- .build();
- client.admin().cluster().prepareUpdateSettings().setTransientSettings(resetSettings).execute().actionGet();
- }
- }
-}
diff --git a/src/test/java/org/elasticsearch/indices/memory/breaker/CircuitBreakerServiceTests.java b/src/test/java/org/elasticsearch/indices/memory/breaker/CircuitBreakerServiceTests.java
new file mode 100644
index 0000000000000..40aaf2899bdb7
--- /dev/null
+++ b/src/test/java/org/elasticsearch/indices/memory/breaker/CircuitBreakerServiceTests.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.indices.memory.breaker;
+
+import org.elasticsearch.ExceptionsHelper;
+import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
+import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.breaker.CircuitBreaker;
+import org.elasticsearch.common.collect.MapBuilder;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.indices.breaker.CircuitBreakerStats;
+import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
+import org.elasticsearch.rest.RestStatus;
+import org.elasticsearch.search.sort.SortOrder;
+import org.elasticsearch.test.ElasticsearchIntegrationTest;
+import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
+import org.elasticsearch.test.junit.annotations.TestLogging;
+import org.junit.Test;
+
+import java.util.Arrays;
+
+import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
+import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
+import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
+import static org.elasticsearch.search.aggregations.AggregationBuilders.cardinality;
+import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope.TEST;
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertFailures;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.Matchers.*;
+
+/**
+ * Integration tests for InternalCircuitBreakerService
+ */
+@ClusterScope(scope = TEST, randomDynamicTemplates = false)
+public class CircuitBreakerServiceTests extends ElasticsearchIntegrationTest {
+
+ private String randomRidiculouslySmallLimit() {
+ // 3 different ways to say 100 bytes
+ return randomFrom(Arrays.asList("100b", "100"));
+ //, (10000. / JvmInfo.jvmInfo().getMem().getHeapMax().bytes()) + "%")); // this is prone to rounding errors and will fail if JVM memory changes!
+ }
+
+ @Test
+ @TestLogging("org.elasticsearch.indices.memory.breaker:TRACE,org.elasticsearch.index.fielddata:TRACE,org.elasticsearch.common.breaker:TRACE")
+ public void testMemoryBreaker() {
+ assertAcked(prepareCreate("cb-test", 1, settingsBuilder().put(SETTING_NUMBER_OF_REPLICAS, between(0, 1))));
+ final Client client = client();
+
+ try {
+ // index some different terms so we have some field data for loading
+ int docCount = scaledRandomIntBetween(300, 1000);
+ for (long id = 0; id < docCount; id++) {
+ client.prepareIndex("cb-test", "type", Long.toString(id))
+ .setSource(MapBuilder.newMapBuilder().put("test", "value" + id).map()).execute().actionGet();
+ }
+
+ // refresh
+ refresh();
+
+ // execute a search that loads field data (sorting on the "test" field)
+ client.prepareSearch("cb-test").setSource("{\"sort\": \"test\",\"query\":{\"match_all\":{}}}")
+ .execute().actionGet();
+
+ // clear field data cache (thus setting the loaded field data back to 0)
+ client.admin().indices().prepareClearCache("cb-test").setFieldDataCache(true).execute().actionGet();
+
+ // Update circuit breaker settings
+ Settings settings = settingsBuilder()
+ .put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING, randomRidiculouslySmallLimit())
+ .put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING, 1.05)
+ .build();
+ client.admin().cluster().prepareUpdateSettings().setTransientSettings(settings).execute().actionGet();
+
+ // execute a search that loads field data (sorting on the "test" field)
+ // again, this time it should trip the breaker
+ assertFailures(client.prepareSearch("cb-test").setSource("{\"sort\": \"test\",\"query\":{\"match_all\":{}}}"),
+ RestStatus.INTERNAL_SERVER_ERROR,
+ containsString("Data too large, data for [test] would be larger than limit of [100/100b]"));
+
+ NodesStatsResponse stats = client.admin().cluster().prepareNodesStats().setBreaker(true).get();
+ int breaks = 0;
+ for (NodeStats stat : stats.getNodes()) {
+ CircuitBreakerStats breakerStats = stat.getBreaker().getStats(CircuitBreaker.Name.FIELDDATA);
+ breaks += breakerStats.getTrippedCount();
+ }
+ assertThat(breaks, greaterThanOrEqualTo(1));
+ } finally {
+ // Reset settings
+ Settings resetSettings = settingsBuilder()
+ .put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING, "-1")
+ .put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING,
+ HierarchyCircuitBreakerService.DEFAULT_FIELDDATA_OVERHEAD_CONSTANT)
+ .build();
+ client.admin().cluster().prepareUpdateSettings().setTransientSettings(resetSettings).execute().actionGet();
+ }
+ }
+
+ @Test
+ @TestLogging("org.elasticsearch.indices.memory.breaker:TRACE,org.elasticsearch.index.fielddata:TRACE,org.elasticsearch.common.breaker:TRACE")
+ public void testRamAccountingTermsEnum() {
+ final Client client = client();
+
+ try {
+ // Create an index where the mappings have a field data filter
+ assertAcked(prepareCreate("ramtest").setSource("{\"mappings\": {\"type\": {\"properties\": {\"test\": " +
+ "{\"type\": \"string\",\"fielddata\": {\"filter\": {\"regex\": {\"pattern\": \"^value.*\"}}}}}}}}"));
+
+ // Wait 10 seconds for green
+ client.admin().cluster().prepareHealth("ramtest").setWaitForGreenStatus().setTimeout("10s").execute().actionGet();
+
+ // index some different terms so we have some field data for loading
+ int docCount = scaledRandomIntBetween(300, 1000);
+ for (long id = 0; id < docCount; id++) {
+ client.prepareIndex("ramtest", "type", Long.toString(id))
+ .setSource(MapBuilder.newMapBuilder().put("test", "value" + id).map()).execute().actionGet();
+ }
+
+ // refresh
+ refresh();
+
+ // execute a search that loads field data (sorting on the "test" field)
+ client.prepareSearch("ramtest").setSource("{\"sort\": \"test\",\"query\":{\"match_all\":{}}}")
+ .execute().actionGet();
+
+ // clear field data cache (thus setting the loaded field data back to 0)
+ client.admin().indices().prepareClearCache("ramtest").setFieldDataCache(true).execute().actionGet();
+
+ // Update circuit breaker settings
+ Settings settings = settingsBuilder()
+ .put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING, randomRidiculouslySmallLimit())
+ .put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING, 1.05)
+ .build();
+ client.admin().cluster().prepareUpdateSettings().setTransientSettings(settings).execute().actionGet();
+
+ // execute a search that loads field data (sorting on the "test" field)
+ // again, this time it should trip the breaker
+ assertFailures(client.prepareSearch("ramtest").setSource("{\"sort\": \"test\",\"query\":{\"match_all\":{}}}"),
+ RestStatus.INTERNAL_SERVER_ERROR,
+ containsString("Data too large, data for [test] would be larger than limit of [100/100b]"));
+
+ NodesStatsResponse stats = client.admin().cluster().prepareNodesStats().setBreaker(true).get();
+ int breaks = 0;
+ for (NodeStats stat : stats.getNodes()) {
+ CircuitBreakerStats breakerStats = stat.getBreaker().getStats(CircuitBreaker.Name.FIELDDATA);
+ breaks += breakerStats.getTrippedCount();
+ }
+ assertThat(breaks, greaterThanOrEqualTo(1));
+
+ } finally {
+ // Reset settings
+ Settings resetSettings = settingsBuilder()
+ .put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING, "-1")
+ .put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING,
+ HierarchyCircuitBreakerService.DEFAULT_FIELDDATA_OVERHEAD_CONSTANT)
+ .build();
+ client.admin().cluster().prepareUpdateSettings().setTransientSettings(resetSettings).execute().actionGet();
+ }
+ }
+
+ /**
+ * Test that a breaker correctly redistributes to a different breaker, in
+ * this case, the fielddata breaker borrows space from the request breaker
+ */
+ @Test
+ @TestLogging("org.elasticsearch.indices.memory.breaker:TRACE,org.elasticsearch.index.fielddata:TRACE,org.elasticsearch.common.breaker:TRACE")
+ public void testParentChecking() {
+ assertAcked(prepareCreate("cb-test", 1, settingsBuilder().put(SETTING_NUMBER_OF_REPLICAS, between(0, 1))));
+ Client client = client();
+
+ try {
+ // index some different terms so we have some field data for loading
+ int docCount = scaledRandomIntBetween(300, 1000);
+ for (long id = 0; id < docCount; id++) {
+ client.prepareIndex("cb-test", "type", Long.toString(id))
+ .setSource(MapBuilder.newMapBuilder().put("test", "value" + id).map()).execute().actionGet();
+ }
+ refresh();
+
+ // We need the request limit beforehand, just from a single node because the limit should always be the same
+ long beforeReqLimit = client.admin().cluster().prepareNodesStats().setBreaker(true).get()
+ .getNodes()[0].getBreaker().getStats(CircuitBreaker.Name.REQUEST).getLimit();
+
+ Settings resetSettings = settingsBuilder()
+ .put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING, "10b")
+ .put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING, 1.0)
+ .build();
+ client.admin().cluster().prepareUpdateSettings().setTransientSettings(resetSettings).execute().actionGet();
+
+ // Perform a search to load field data for the "test" field
+ try {
+ client.prepareSearch("cb-test").setQuery(matchAllQuery()).addSort("test", SortOrder.DESC).get();
+ } catch (Exception e) {
+ String errMsg = "[FIELDDATA] Data too large, data for [test] would be larger than limit of [10/10b]";
+ assertThat("Exception: " + ExceptionsHelper.unwrapCause(e) + " should contain a CircuitBreakingException",
+ ExceptionsHelper.unwrapCause(e).getMessage().contains(errMsg), equalTo(true));
+ }
+
+ assertFailures(client.prepareSearch("cb-test").setSource("{\"sort\": \"test\",\"query\":{\"match_all\":{}}}"),
+ RestStatus.INTERNAL_SERVER_ERROR,
+ containsString("Data too large, data for [test] would be larger than limit of [10/10b]"));
+
+ // Adjust settings so the parent breaker will fail, but the fielddata breaker doesn't
+ resetSettings = settingsBuilder()
+ .put(HierarchyCircuitBreakerService.TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING, "15b")
+ .put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING, "90%")
+ .put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING, 1.0)
+ .build();
+ client.admin().cluster().prepareUpdateSettings().setTransientSettings(resetSettings).execute().actionGet();
+
+ // Perform a search to load field data for the "test" field
+ try {
+ client.prepareSearch("cb-test").setQuery(matchAllQuery()).addSort("test", SortOrder.DESC).get();
+ } catch (Exception e) {
+ String errMsg = "[PARENT] Data too large, data for [test] would be larger than limit of [15/15b]";
+ assertThat("Exception: " + ExceptionsHelper.unwrapCause(e) + " should contain a CircuitBreakingException",
+ ExceptionsHelper.unwrapCause(e).getMessage().contains(errMsg), equalTo(true));
+ }
+
+ } finally {
+ Settings resetSettings = settingsBuilder()
+ .put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING, "-1")
+ .put(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING, HierarchyCircuitBreakerService.DEFAULT_REQUEST_BREAKER_LIMIT)
+ .put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING,
+ HierarchyCircuitBreakerService.DEFAULT_FIELDDATA_OVERHEAD_CONSTANT)
+ .build();
+ client.admin().cluster().prepareUpdateSettings().setTransientSettings(resetSettings).execute().actionGet();
+ }
+ }
+
+ @Test
+ @TestLogging("org.elasticsearch.indices.memory.breaker:TRACE,org.elasticsearch.index.fielddata:TRACE,org.elasticsearch.common.breaker:TRACE")
+ public void testRequestBreaker() {
+ assertAcked(prepareCreate("cb-test", 1, settingsBuilder().put(SETTING_NUMBER_OF_REPLICAS, between(0, 1))));
+ Client client = client();
+
+ try {
+ // Make request breaker limited to a small amount
+ Settings resetSettings = settingsBuilder()
+ .put(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING, "10b")
+ .build();
+ client.admin().cluster().prepareUpdateSettings().setTransientSettings(resetSettings).execute().actionGet();
+
+ // index some different terms so we have some field data for loading
+ int docCount = scaledRandomIntBetween(300, 1000);
+ for (long id = 0; id < docCount; id++) {
+ client.prepareIndex("cb-test", "type", Long.toString(id))
+ .setSource(MapBuilder.newMapBuilder().put("test", id).map()).execute().actionGet();
+ }
+ refresh();
+
+ // A cardinality aggregation uses BigArrays and thus the REQUEST breaker
+ try {
+ client.prepareSearch("cb-test").setQuery(matchAllQuery()).addAggregation(cardinality("card").field("test")).get();
+ fail("aggregation should have tripped the breaker");
+ } catch (Exception e) {
+ String errMsg = "CircuitBreakingException[[REQUEST] Data too large, data for [] would be larger than limit of [10/10b]]";
+ assertThat("Exception: " + ExceptionsHelper.unwrapCause(e) + " should contain a CircuitBreakingException",
+ ExceptionsHelper.unwrapCause(e).getMessage().contains(errMsg), equalTo(true));
+ }
+ } finally {
+ Settings resetSettings = settingsBuilder()
+ .put(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING,
+ HierarchyCircuitBreakerService.DEFAULT_REQUEST_BREAKER_LIMIT)
+ .build();
+ client.admin().cluster().prepareUpdateSettings().setTransientSettings(resetSettings).execute().actionGet();
+ }
+ }
+
+}
diff --git a/src/test/java/org/elasticsearch/indices/memory/breaker/CircuitBreakerUnitTests.java b/src/test/java/org/elasticsearch/indices/memory/breaker/CircuitBreakerUnitTests.java
new file mode 100644
index 0000000000000..cd47ffef87360
--- /dev/null
+++ b/src/test/java/org/elasticsearch/indices/memory/breaker/CircuitBreakerUnitTests.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.indices.memory.breaker;
+
+import org.elasticsearch.common.breaker.CircuitBreaker;
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.indices.breaker.BreakerSettings;
+import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
+import org.elasticsearch.test.ElasticsearchTestCase;
+import org.junit.Test;
+
+import static org.hamcrest.Matchers.equalTo;
+
+/**
+ * Unit tests for the circuit breaker
+ */
+public class CircuitBreakerUnitTests extends ElasticsearchTestCase {
+
+ public static long pctBytes(String percentString) {
+ return ImmutableSettings.EMPTY.getAsMemory("", percentString).bytes();
+ }
+
+ @Test
+ public void testBreakerSettingsValidationWithValidSettings() {
+ // parent: {:limit 70}, fd: {:limit 50}, request: {:limit 20}
+ BreakerSettings fd = new BreakerSettings(CircuitBreaker.Name.FIELDDATA, pctBytes("50%"), 1.0);
+ BreakerSettings request = new BreakerSettings(CircuitBreaker.Name.REQUEST, pctBytes("20%"), 1.0);
+ HierarchyCircuitBreakerService.validateSettings(new BreakerSettings[]{fd, request});
+
+ // parent: {:limit 70}, fd: {:limit 40}, request: {:limit 30}
+ fd = new BreakerSettings(CircuitBreaker.Name.FIELDDATA, pctBytes("40%"), 1.0);
+ request = new BreakerSettings(CircuitBreaker.Name.REQUEST, pctBytes("30%"), 1.0);
+ HierarchyCircuitBreakerService.validateSettings(new BreakerSettings[]{fd, request});
+ }
+
+ @Test
+ public void testBreakerSettingsValidationNegativeOverhead() {
+ // parent: {:limit 70}, fd: {:limit 50}, request: {:limit 20}
+ BreakerSettings fd = new BreakerSettings(CircuitBreaker.Name.FIELDDATA, pctBytes("50%"), -0.1);
+ BreakerSettings request = new BreakerSettings(CircuitBreaker.Name.REQUEST, pctBytes("20%"), 1.0);
+ try {
+ HierarchyCircuitBreakerService.validateSettings(new BreakerSettings[]{fd, request});
+ fail("settings are invalid but validate settings did not throw an exception");
+ } catch (Exception e) {
+ assertThat("Incorrect message: " + e.getMessage(),
+ e.getMessage().contains("must be non-negative"), equalTo(true));
+ }
+ }
+}
diff --git a/src/test/java/org/elasticsearch/indices/fielddata/breaker/RandomExceptionCircuitBreakerTests.java b/src/test/java/org/elasticsearch/indices/memory/breaker/RandomExceptionCircuitBreakerTests.java
similarity index 96%
rename from src/test/java/org/elasticsearch/indices/fielddata/breaker/RandomExceptionCircuitBreakerTests.java
rename to src/test/java/org/elasticsearch/indices/memory/breaker/RandomExceptionCircuitBreakerTests.java
index 2bd7f4ab0d244..cbf52f53da06a 100644
--- a/src/test/java/org/elasticsearch/indices/fielddata/breaker/RandomExceptionCircuitBreakerTests.java
+++ b/src/test/java/org/elasticsearch/indices/memory/breaker/RandomExceptionCircuitBreakerTests.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.elasticsearch.indices.fielddata.breaker;
+package org.elasticsearch.indices.memory.breaker;
import org.apache.lucene.index.AtomicReader;
import org.apache.lucene.index.DirectoryReader;
@@ -29,6 +29,7 @@
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.client.Requests;
+import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
@@ -58,7 +59,7 @@ public class RandomExceptionCircuitBreakerTests extends ElasticsearchIntegration
public void testBreakerWithRandomExceptions() throws IOException, InterruptedException, ExecutionException {
for (NodeStats node : client().admin().cluster().prepareNodesStats()
.clear().setBreaker(true).execute().actionGet().getNodes()) {
- assertThat("Breaker is not set to 0", node.getBreaker().getEstimated(), equalTo(0L));
+ assertThat("Breaker is not set to 0", node.getBreaker().getStats(CircuitBreaker.Name.FIELDDATA).getEstimated(), equalTo(0L));
}
String mapping = XContentFactory.jsonBuilder()
@@ -144,7 +145,7 @@ public void testBreakerWithRandomExceptions() throws IOException, InterruptedExc
NodesStatsResponse resp = client().admin().cluster().prepareNodesStats()
.clear().setBreaker(true).execute().actionGet();
for (NodeStats stats : resp.getNodes()) {
- assertThat("Breaker is set to 0", stats.getBreaker().getEstimated(), equalTo(0L));
+ assertThat("Breaker is set to 0", stats.getBreaker().getStats(CircuitBreaker.Name.FIELDDATA).getEstimated(), equalTo(0L));
}
for (int i = 0; i < numSearches; i++) {
@@ -180,7 +181,8 @@ public void testBreakerWithRandomExceptions() throws IOException, InterruptedExc
NodesStatsResponse nodeStats = client().admin().cluster().prepareNodesStats()
.clear().setBreaker(true).execute().actionGet();
for (NodeStats stats : nodeStats.getNodes()) {
- assertThat("Breaker reset to 0 last search success: " + success + " mapping: " + mapping, stats.getBreaker().getEstimated(), equalTo(0L));
+ assertThat("Breaker reset to 0 last search success: " + success + " mapping: " + mapping,
+ stats.getBreaker().getStats(CircuitBreaker.Name.FIELDDATA).getEstimated(), equalTo(0L));
}
}
}
diff --git a/src/test/java/org/elasticsearch/test/TestCluster.java b/src/test/java/org/elasticsearch/test/TestCluster.java
index ac51c728c935a..3dde4ad2d1eac 100644
--- a/src/test/java/org/elasticsearch/test/TestCluster.java
+++ b/src/test/java/org/elasticsearch/test/TestCluster.java
@@ -26,6 +26,7 @@
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexMetaData;
+import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.indices.IndexMissingException;
@@ -194,8 +195,10 @@ public void ensureEstimatedStats() {
NodesStatsResponse nodeStats = client().admin().cluster().prepareNodesStats()
.clear().setBreaker(true).execute().actionGet();
for (NodeStats stats : nodeStats.getNodes()) {
- assertThat("Breaker not reset to 0 on node: " + stats.getNode(),
- stats.getBreaker().getEstimated(), equalTo(0L));
+ assertThat("Fielddata breaker not reset to 0 on node: " + stats.getNode(),
+ stats.getBreaker().getStats(CircuitBreaker.Name.FIELDDATA).getEstimated(), equalTo(0L));
+ assertThat("Request breaker not reset to 0 on node: " + stats.getNode(),
+ stats.getBreaker().getStats(CircuitBreaker.Name.REQUEST).getEstimated(), equalTo(0L));
}
}
}
diff --git a/src/test/java/org/elasticsearch/test/TestSearchContext.java b/src/test/java/org/elasticsearch/test/TestSearchContext.java
index 360b4f41ee24b..e4599610dcc3c 100644
--- a/src/test/java/org/elasticsearch/test/TestSearchContext.java
+++ b/src/test/java/org/elasticsearch/test/TestSearchContext.java
@@ -81,7 +81,7 @@ public class TestSearchContext extends SearchContext {
public TestSearchContext(ThreadPool threadPool, CacheRecycler cacheRecycler, PageCacheRecycler pageCacheRecycler, BigArrays bigArrays, IndexService indexService, FilterCache filterCache, IndexFieldDataService indexFieldDataService) {
this.cacheRecycler = cacheRecycler;
this.pageCacheRecycler = pageCacheRecycler;
- this.bigArrays = bigArrays;
+ this.bigArrays = bigArrays.withCircuitBreaking();
this.indexService = indexService;
this.filterCache = filterCache;
this.indexFieldDataService = indexFieldDataService;
diff --git a/src/test/java/org/elasticsearch/test/cache/recycler/MockBigArrays.java b/src/test/java/org/elasticsearch/test/cache/recycler/MockBigArrays.java
index 0bd19b0d0ab0c..44a85c86b391f 100644
--- a/src/test/java/org/elasticsearch/test/cache/recycler/MockBigArrays.java
+++ b/src/test/java/org/elasticsearch/test/cache/recycler/MockBigArrays.java
@@ -29,6 +29,7 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.*;
+import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.test.ElasticsearchTestCase;
import java.util.*;
@@ -68,43 +69,23 @@ public boolean apply(Object input) {
}
}
}
- // arrays that are not fully released.
- ArrayList badBigArrays = new ArrayList<>();
- synchronized (INSTANCES) {
- for (final BigArrays bigArrays : INSTANCES) {
- // BigArrays are used on the network layer and the cluster is shared across tests so nodes might still be talking to
- // each other a bit after the test finished, wait a bit for things to stabilize if so
- final boolean sizeIsZero = ElasticsearchTestCase.awaitBusy(new Predicate