Skip to content

Commit 64279a5

Browse files
committed
Deduplicate created objects when deserializing InternalAggregations in SearchResponse (elastic#124296)
It should help in case of cross cluster search. # Conflicts: # server/src/main/java/org/elasticsearch/action/search/SearchResponse.java
1 parent 1b1e9ac commit 64279a5

File tree

2 files changed

+26
-8
lines changed

2 files changed

+26
-8
lines changed

server/src/main/java/org/elasticsearch/action/search/SearchResponse.java

+10-1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.elasticsearch.action.OriginalIndices;
1515
import org.elasticsearch.common.Strings;
1616
import org.elasticsearch.common.bytes.BytesReference;
17+
import org.elasticsearch.common.io.stream.DelayableWriteable;
1718
import org.elasticsearch.common.io.stream.StreamInput;
1819
import org.elasticsearch.common.io.stream.StreamOutput;
1920
import org.elasticsearch.common.io.stream.Writeable;
@@ -91,7 +92,15 @@ public class SearchResponse extends ActionResponse implements ChunkedToXContentO
9192
public SearchResponse(StreamInput in) throws IOException {
9293
super(in);
9394
this.hits = SearchHits.readFrom(in, true);
94-
this.aggregations = in.readBoolean() ? InternalAggregations.readFrom(in) : null;
95+
if (in.readBoolean()) {
96+
// deserialize the aggregations trying to deduplicate the object created
97+
// TODO: use DelayableWriteable instead.
98+
this.aggregations = InternalAggregations.readFrom(
99+
DelayableWriteable.wrapWithDeduplicatorStreamInput(in, in.getTransportVersion(), in.namedWriteableRegistry())
100+
);
101+
} else {
102+
this.aggregations = null;
103+
}
95104
this.suggest = in.readBoolean() ? new Suggest(in) : null;
96105
this.timedOut = in.readBoolean();
97106
this.terminatedEarly = in.readOptionalBoolean();

server/src/main/java/org/elasticsearch/common/io/stream/DelayableWriteable.java

+16-7
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.elasticsearch.TransportVersion;
1313
import org.elasticsearch.common.bytes.BytesReference;
1414
import org.elasticsearch.common.bytes.ReleasableBytesReference;
15+
import org.elasticsearch.core.Nullable;
1516
import org.elasticsearch.core.Releasable;
1617

1718
import java.io.IOException;
@@ -231,16 +232,24 @@ private static <T> T deserialize(
231232
NamedWriteableRegistry registry,
232233
BytesReference serialized
233234
) throws IOException {
234-
try (
235-
StreamInput in = registry == null
236-
? new DeduplicateStreamInput(serialized.streamInput(), new DeduplicatorCache())
237-
: new DeduplicateNamedWriteableAwareStreamInput(serialized.streamInput(), registry, new DeduplicatorCache())
238-
) {
239-
in.setTransportVersion(serializedAtVersion);
240-
return reader.read(in);
235+
try (StreamInput in = serialized.streamInput()) {
236+
return reader.read(wrapWithDeduplicatorStreamInput(in, serializedAtVersion, registry));
241237
}
242238
}
243239

240+
/** Wraps the provided {@link StreamInput} with another stream that extends {@link Deduplicator} */
241+
public static StreamInput wrapWithDeduplicatorStreamInput(
242+
StreamInput in,
243+
TransportVersion serializedAtVersion,
244+
@Nullable NamedWriteableRegistry registry
245+
) {
246+
StreamInput out = registry == null
247+
? new DeduplicateStreamInput(in, new DeduplicatorCache())
248+
: new DeduplicateNamedWriteableAwareStreamInput(in, registry, new DeduplicatorCache());
249+
out.setTransportVersion(serializedAtVersion);
250+
return out;
251+
}
252+
244253
/** An object implementing this interface can deduplicate instance of the provided objects.*/
245254
public interface Deduplicator {
246255
<T> T deduplicate(T object);

0 commit comments

Comments
 (0)