Skip to content

DATAMONGO-1311 - Configuration of query batchsize through Query.cursorBatchSize(…). #575

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

<groupId>org.springframework.data</groupId>
<artifactId>spring-data-mongodb-parent</artifactId>
<version>2.1.0.BUILD-SNAPSHOT</version>
<version>2.1.0.DATAMONGO-1311-SNAPSHOT</version>
<packaging>pom</packaging>

<name>Spring Data MongoDB</name>
Expand Down
2 changes: 1 addition & 1 deletion spring-data-mongodb-benchmarks/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<parent>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-mongodb-parent</artifactId>
<version>2.1.0.BUILD-SNAPSHOT</version>
<version>2.1.0.DATAMONGO-1311-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
4 changes: 2 additions & 2 deletions spring-data-mongodb-cross-store/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-mongodb-parent</artifactId>
<version>2.1.0.BUILD-SNAPSHOT</version>
<version>2.1.0.DATAMONGO-1311-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down Expand Up @@ -50,7 +50,7 @@
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-mongodb</artifactId>
<version>2.1.0.BUILD-SNAPSHOT</version>
<version>2.1.0.DATAMONGO-1311-SNAPSHOT</version>
</dependency>

<!-- reactive -->
Expand Down
2 changes: 1 addition & 1 deletion spring-data-mongodb-distribution/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
<parent>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-mongodb-parent</artifactId>
<version>2.1.0.BUILD-SNAPSHOT</version>
<version>2.1.0.DATAMONGO-1311-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion spring-data-mongodb/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
<parent>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-mongodb-parent</artifactId>
<version>2.1.0.BUILD-SNAPSHOT</version>
<version>2.1.0.DATAMONGO-1311-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3279,9 +3279,9 @@ public FindIterable<Document> prepare(FindIterable<Document> cursor) {
return cursor;
}

Meta meta = query.getMeta();
if (query.getSkip() <= 0 && query.getLimit() <= 0 && ObjectUtils.isEmpty(query.getSortObject())
&& !StringUtils.hasText(query.getHint()) && !query.getMeta().hasValues()
&& !query.getCollation().isPresent()) {
&& !StringUtils.hasText(query.getHint()) && !meta.hasValues() && !query.getCollation().isPresent()) {
return cursor;
}

Expand All @@ -3301,18 +3301,33 @@ public FindIterable<Document> prepare(FindIterable<Document> cursor) {
cursorToUse = cursorToUse.sort(sort);
}

Document meta = new Document();
if (StringUtils.hasText(query.getHint())) {
meta.put("$hint", query.getHint());
cursorToUse = cursorToUse.hint(Document.parse(query.getHint()));
}

if (query.getMeta().hasValues()) {
if (meta.hasValues()) {

for (Entry<String, Object> entry : query.getMeta().values()) {
meta.put(entry.getKey(), entry.getValue());
if (StringUtils.hasText(meta.getComment())) {
cursorToUse = cursorToUse.comment(meta.getComment());
}

for (Meta.CursorOption option : query.getMeta().getFlags()) {
if (meta.getSnapshot()) {
cursorToUse = cursorToUse.snapshot(meta.getSnapshot());
}

if (meta.getMaxScan() != null) {
cursorToUse = cursorToUse.maxScan(meta.getMaxScan());
}

if (meta.getMaxTimeMsec() != null) {
cursorToUse = cursorToUse.maxTime(meta.getMaxTimeMsec(), TimeUnit.MILLISECONDS);
}

if (meta.getCursorBatchSize() != null) {
cursorToUse = cursorToUse.batchSize(meta.getCursorBatchSize());
}

for (Meta.CursorOption option : meta.getFlags()) {

switch (option) {

Expand All @@ -3328,7 +3343,6 @@ public FindIterable<Document> prepare(FindIterable<Document> cursor) {
}
}

cursorToUse = cursorToUse.modifiers(meta);
} catch (RuntimeException e) {
throw potentiallyConvertRuntimeException(e, exceptionTranslator);
}
Expand Down Expand Up @@ -3459,149 +3473,6 @@ public MongoDbFactory getMongoDbFactory() {
return mongoDbFactory;
}

/**
* {@link BatchAggregationLoader} is a little helper that can process cursor results returned by an aggregation
* command execution. On presence of a {@literal nextBatch} indicated by presence of an {@code id} field in the
* {@code cursor} another {@code getMore} command gets executed reading the next batch of documents until all results
* are loaded.
*
* @author Christoph Strobl
* @since 1.10
*/
static class BatchAggregationLoader {

private static final String CURSOR_FIELD = "cursor";
private static final String RESULT_FIELD = "result";
private static final String BATCH_SIZE_FIELD = "batchSize";
private static final String FIRST_BATCH = "firstBatch";
private static final String NEXT_BATCH = "nextBatch";
private static final String SERVER_USED = "serverUsed";
private static final String OK = "ok";

private final MongoTemplate template;
private final ReadPreference readPreference;
private final int batchSize;

BatchAggregationLoader(MongoTemplate template, ReadPreference readPreference, int batchSize) {

this.template = template;
this.readPreference = readPreference;
this.batchSize = batchSize;
}

/**
* Run aggregation command and fetch all results.
*/
Document aggregate(String collectionName, Aggregation aggregation, AggregationOperationContext context) {

Document command = prepareAggregationCommand(collectionName, aggregation, context, batchSize);

if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Executing aggregation: {}", serializeToJsonSafely(command));
}

return mergeAggregationResults(aggregateBatched(command, collectionName, batchSize));
}

/**
* Pre process the aggregation command sent to the server by adding {@code cursor} options to match execution on
* different server versions.
*/
private static Document prepareAggregationCommand(String collectionName, Aggregation aggregation,
@Nullable AggregationOperationContext context, int batchSize) {

AggregationOperationContext rootContext = context == null ? Aggregation.DEFAULT_CONTEXT : context;
Document command = aggregation.toDocument(collectionName, rootContext);

if (!aggregation.getOptions().isExplain()) {
command.put(CURSOR_FIELD, new Document(BATCH_SIZE_FIELD, batchSize));
}

return command;
}

private List<Document> aggregateBatched(Document command, String collectionName, int batchSize) {

List<Document> results = new ArrayList<>();

Document commandResult = template.executeCommand(command, readPreference);
results.add(postProcessResult(commandResult));

while (hasNext(commandResult)) {

Document getMore = new Document("getMore", getNextBatchId(commandResult)) //
.append("collection", collectionName) //
.append(BATCH_SIZE_FIELD, batchSize);

commandResult = template.executeCommand(getMore, this.readPreference);
results.add(postProcessResult(commandResult));
}

return results;
}

private static Document postProcessResult(Document commandResult) {

if (!commandResult.containsKey(CURSOR_FIELD)) {
return commandResult;
}

Document resultObject = new Document(SERVER_USED, commandResult.get(SERVER_USED));
resultObject.put(OK, commandResult.get(OK));

Document cursor = (Document) commandResult.get(CURSOR_FIELD);
if (cursor.containsKey(FIRST_BATCH)) {
resultObject.put(RESULT_FIELD, cursor.get(FIRST_BATCH));
} else {
resultObject.put(RESULT_FIELD, cursor.get(NEXT_BATCH));
}

return resultObject;
}

private static Document mergeAggregationResults(List<Document> batchResults) {

if (batchResults.size() == 1) {
return batchResults.iterator().next();
}

Document commandResult = new Document();
List<Object> allResults = new ArrayList<>();

for (Document batchResult : batchResults) {

Collection documents = (Collection<?>) batchResult.get(RESULT_FIELD);
if (!CollectionUtils.isEmpty(documents)) {
allResults.addAll(documents);
}
}

// take general info from first batch
commandResult.put(SERVER_USED, batchResults.iterator().next().get(SERVER_USED));
commandResult.put(OK, batchResults.iterator().next().get(OK));

// and append the merged batchResults
commandResult.put(RESULT_FIELD, allResults);

return commandResult;
}

private static boolean hasNext(Document commandResult) {

if (!commandResult.containsKey(CURSOR_FIELD)) {
return false;
}

Object next = getNextBatchId(commandResult);
return next != null && ((Number) next).longValue() != 0L;
}

@Nullable
private static Object getNextBatchId(Document commandResult) {
return ((Document) commandResult.get(CURSOR_FIELD)).get("id");
}
}

/**
* {@link MongoTemplate} extension bound to a specific {@link ClientSession} that is applied when interacting with the
* server through the driver API.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
import org.springframework.data.mongodb.core.query.Collation;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.CriteriaDefinition;
import org.springframework.data.mongodb.core.query.Meta;
import org.springframework.data.mongodb.core.query.NearQuery;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.query.Update;
Expand All @@ -115,7 +116,6 @@
import org.springframework.util.ResourceUtils;
import org.springframework.util.StringUtils;

import com.mongodb.BasicDBObject;
import com.mongodb.ClientSessionOptions;
import com.mongodb.CursorType;
import com.mongodb.DBCollection;
Expand Down Expand Up @@ -971,7 +971,6 @@ protected <O> Flux<O> aggregate(Aggregation aggregation, String collectionName,
List<Document> pipeline = aggregationUtil.createPipeline(aggregation, rootContext);

Assert.isTrue(!options.isExplain(), "Cannot use explain option with streaming!");
Assert.isNull(options.getCursorBatchSize(), "Cannot use batchSize cursor option with streaming!");

if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Streaming aggregation: {} in collection {}", serializeToJsonSafely(pipeline), collectionName);
Expand All @@ -987,6 +986,10 @@ private <O> Flux<O> aggregateAndMap(MongoCollection<Document> collection, List<D
AggregatePublisher<Document> cursor = collection.aggregate(pipeline, Document.class)
.allowDiskUse(options.isAllowDiskUse());

if (options.getCursorBatchSize() != null) {
cursor = cursor.batchSize(options.getCursorBatchSize());
}

if (options.getCollation().isPresent()) {
cursor = cursor.collation(options.getCollation().map(Collation::toMongoCollation).get());
}
Expand Down Expand Up @@ -3216,8 +3219,9 @@ public <T> FindPublisher<T> prepare(FindPublisher<T> findPublisher) {
findPublisherToUse = query.getCollation().map(Collation::toMongoCollation).map(findPublisher::collation)
.orElse(findPublisher);

Meta meta = query.getMeta();
if (query.getSkip() <= 0 && query.getLimit() <= 0 && ObjectUtils.isEmpty(query.getSortObject())
&& !StringUtils.hasText(query.getHint()) && !query.getMeta().hasValues()) {
&& !StringUtils.hasText(query.getHint()) && !meta.hasValues()) {
return findPublisherToUse;
}

Expand All @@ -3232,21 +3236,34 @@ public <T> FindPublisher<T> prepare(FindPublisher<T> findPublisher) {
Document sort = type != null ? getMappedSortObject(query, type) : query.getSortObject();
findPublisherToUse = findPublisherToUse.sort(sort);
}
BasicDBObject modifiers = new BasicDBObject();

if (StringUtils.hasText(query.getHint())) {
modifiers.append("$hint", query.getHint());
findPublisherToUse = findPublisherToUse.hint(Document.parse(query.getHint()));
}

if (query.getMeta().hasValues()) {
for (Entry<String, Object> entry : query.getMeta().values()) {
modifiers.append(entry.getKey(), entry.getValue());
if (meta.hasValues()) {

if (StringUtils.hasText(meta.getComment())) {
findPublisherToUse = findPublisherToUse.comment(meta.getComment());
}

if (meta.getSnapshot()) {
findPublisherToUse = findPublisherToUse.snapshot(meta.getSnapshot());
}
}

if (!modifiers.isEmpty()) {
findPublisherToUse = findPublisherToUse.modifiers(modifiers);
if (meta.getMaxScan() != null) {
findPublisherToUse = findPublisherToUse.maxScan(meta.getMaxScan());
}

if (meta.getMaxTimeMsec() != null) {
findPublisherToUse = findPublisherToUse.maxTime(meta.getMaxTimeMsec(), TimeUnit.MILLISECONDS);
}

if (meta.getCursorBatchSize() != null) {
findPublisherToUse = findPublisherToUse.batchSize(meta.getCursorBatchSize());
}
}

} catch (RuntimeException e) {
throw potentiallyConvertRuntimeException(e, exceptionTranslator);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,13 +164,14 @@ private void applyPropertySpecs(String path, Document source, Class<?> probeType
if (exampleSpecAccessor.hasPropertySpecifier(mappedPropertyPath)) {

PropertyValueTransformer valueTransformer = exampleSpecAccessor.getValueTransformerForPath(mappedPropertyPath);
value = valueTransformer.convert(value);
if (value == null) {
Optional converted = valueTransformer.apply(Optional.ofNullable(value));

if(!converted.isPresent()) {
iter.remove();
continue;
}

entry.setValue(value);
entry.setValue(converted.get());
}

if (entry.getValue() instanceof String) {
Expand Down
Loading