Skip to content

Commit d3976f5

Browse files
mp911dechristophstrobl
authored andcommitted
DATAMONGO-1311 - Add configuration options for query batch size.
We now allow configuration of the find cursor/find publisher batch sizes using Query.cursorBatchSize(…). Configuring the batch size gives users more fine grained control over the fetch behavior especially in reactive usage scenarios as the batch size defaults in FindPublisher to the remaining demand. This can cause several roundtrips in cases the remaining demand is small and the emitted elements are dropped rapidly (e.g. using filter(…)). On the repository level @meta allows now configuration of the cursor batch size for derived finder methods. interface PersonRepository extends Repository<Person, Long> { @meta(cursorBatchSize = 100) Stream<Person> findAllByLastname(String lastname); } Original Pull Request: #575
1 parent f587e1f commit d3976f5

File tree

10 files changed

+132
-26
lines changed

10 files changed

+132
-26
lines changed

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoTemplate.java

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3279,8 +3279,9 @@ public FindIterable<Document> prepare(FindIterable<Document> cursor) {
32793279
return cursor;
32803280
}
32813281

3282+
Meta meta = query.getMeta();
32823283
if (query.getSkip() <= 0 && query.getLimit() <= 0 && ObjectUtils.isEmpty(query.getSortObject())
3283-
&& !StringUtils.hasText(query.getHint()) && !query.getMeta().hasValues()
3284+
&& !StringUtils.hasText(query.getHint()) && !meta.hasValues()
32843285
&& !query.getCollation().isPresent()) {
32853286
return cursor;
32863287
}
@@ -3301,18 +3302,18 @@ public FindIterable<Document> prepare(FindIterable<Document> cursor) {
33013302
cursorToUse = cursorToUse.sort(sort);
33023303
}
33033304

3304-
Document meta = new Document();
3305+
Document metaDocument = new Document();
33053306
if (StringUtils.hasText(query.getHint())) {
3306-
meta.put("$hint", query.getHint());
3307+
metaDocument.put("$hint", query.getHint());
33073308
}
33083309

3309-
if (query.getMeta().hasValues()) {
3310+
if (meta.hasValues()) {
33103311

3311-
for (Entry<String, Object> entry : query.getMeta().values()) {
3312-
meta.put(entry.getKey(), entry.getValue());
3312+
for (Entry<String, Object> entry : meta.values()) {
3313+
metaDocument.put(entry.getKey(), entry.getValue());
33133314
}
33143315

3315-
for (Meta.CursorOption option : query.getMeta().getFlags()) {
3316+
for (Meta.CursorOption option : meta.getFlags()) {
33163317

33173318
switch (option) {
33183319

@@ -3326,9 +3327,13 @@ public FindIterable<Document> prepare(FindIterable<Document> cursor) {
33263327
throw new IllegalArgumentException(String.format("%s is no supported flag.", option));
33273328
}
33283329
}
3330+
3331+
if (meta.getCursorBatchSize() != null) {
3332+
cursorToUse = cursorToUse.batchSize(meta.getCursorBatchSize());
3333+
}
33293334
}
33303335

3331-
cursorToUse = cursorToUse.modifiers(meta);
3336+
cursorToUse = cursorToUse.modifiers(metaDocument);
33323337
} catch (RuntimeException e) {
33333338
throw potentiallyConvertRuntimeException(e, exceptionTranslator);
33343339
}

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@
9898
import org.springframework.data.mongodb.core.query.Collation;
9999
import org.springframework.data.mongodb.core.query.Criteria;
100100
import org.springframework.data.mongodb.core.query.CriteriaDefinition;
101+
import org.springframework.data.mongodb.core.query.Meta;
101102
import org.springframework.data.mongodb.core.query.NearQuery;
102103
import org.springframework.data.mongodb.core.query.Query;
103104
import org.springframework.data.mongodb.core.query.Update;
@@ -971,7 +972,6 @@ protected <O> Flux<O> aggregate(Aggregation aggregation, String collectionName,
971972
List<Document> pipeline = aggregationUtil.createPipeline(aggregation, rootContext);
972973

973974
Assert.isTrue(!options.isExplain(), "Cannot use explain option with streaming!");
974-
Assert.isNull(options.getCursorBatchSize(), "Cannot use batchSize cursor option with streaming!");
975975

976976
if (LOGGER.isDebugEnabled()) {
977977
LOGGER.debug("Streaming aggregation: {} in collection {}", serializeToJsonSafely(pipeline), collectionName);
@@ -987,6 +987,10 @@ private <O> Flux<O> aggregateAndMap(MongoCollection<Document> collection, List<D
987987
AggregatePublisher<Document> cursor = collection.aggregate(pipeline, Document.class)
988988
.allowDiskUse(options.isAllowDiskUse());
989989

990+
if (options.getCursorBatchSize() != null) {
991+
cursor = cursor.batchSize(options.getCursorBatchSize());
992+
}
993+
990994
if (options.getCollation().isPresent()) {
991995
cursor = cursor.collation(options.getCollation().map(Collation::toMongoCollation).get());
992996
}
@@ -3216,8 +3220,9 @@ public <T> FindPublisher<T> prepare(FindPublisher<T> findPublisher) {
32163220
findPublisherToUse = query.getCollation().map(Collation::toMongoCollation).map(findPublisher::collation)
32173221
.orElse(findPublisher);
32183222

3223+
Meta meta = query.getMeta();
32193224
if (query.getSkip() <= 0 && query.getLimit() <= 0 && ObjectUtils.isEmpty(query.getSortObject())
3220-
&& !StringUtils.hasText(query.getHint()) && !query.getMeta().hasValues()) {
3225+
&& !StringUtils.hasText(query.getHint()) && !meta.hasValues()) {
32213226
return findPublisherToUse;
32223227
}
32233228

@@ -3238,10 +3243,14 @@ public <T> FindPublisher<T> prepare(FindPublisher<T> findPublisher) {
32383243
modifiers.append("$hint", query.getHint());
32393244
}
32403245

3241-
if (query.getMeta().hasValues()) {
3242-
for (Entry<String, Object> entry : query.getMeta().values()) {
3246+
if (meta.hasValues()) {
3247+
for (Entry<String, Object> entry : meta.values()) {
32433248
modifiers.append(entry.getKey(), entry.getValue());
32443249
}
3250+
3251+
if (meta.getCursorBatchSize() != null) {
3252+
findPublisherToUse = findPublisherToUse.batchSize(meta.getCursorBatchSize());
3253+
}
32453254
}
32463255

32473256
if (!modifiers.isEmpty()) {

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/query/Meta.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ private enum MetaKey {
5050

5151
private final Map<String, Object> values = new LinkedHashMap<String, Object>(2);
5252
private final Set<CursorOption> flags = new LinkedHashSet<CursorOption>();
53+
private Integer cursorBatchSize;
5354

5455
/**
5556
* @return {@literal null} if not set.
@@ -128,6 +129,25 @@ public boolean getSnapshot() {
128129
return getValue(MetaKey.SNAPSHOT.key, false);
129130
}
130131

132+
/**
133+
* @return {@literal null} if not set.
134+
* @since 2.1
135+
*/
136+
@Nullable
137+
public Integer getCursorBatchSize() {
138+
return cursorBatchSize;
139+
}
140+
141+
/**
142+
* Apply the batch size for a query.
143+
*
144+
* @param cursorBatchSize
145+
* @since 2.1
146+
*/
147+
public void setCursorBatchSize(int cursorBatchSize) {
148+
this.cursorBatchSize = cursorBatchSize;
149+
}
150+
131151
/**
132152
* Add {@link CursorOption} influencing behavior of the {@link com.mongodb.DBCursor}.
133153
*
@@ -153,7 +173,7 @@ public Set<CursorOption> getFlags() {
153173
* @return
154174
*/
155175
public boolean hasValues() {
156-
return !this.values.isEmpty() || !this.flags.isEmpty();
176+
return !this.values.isEmpty() || !this.flags.isEmpty() || this.cursorBatchSize != null;
157177
}
158178

159179
/**

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/query/Query.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -347,6 +347,18 @@ public Query useSnapshot() {
347347
return this;
348348
}
349349

350+
/**
351+
* @param batchSize
352+
* @return
353+
* @see Meta#setCursorBatchSize(int)
354+
* @since 2.1
355+
*/
356+
public Query cursorBatchSize(int batchSize) {
357+
358+
meta.setCursorBatchSize(batchSize);
359+
return this;
360+
}
361+
350362
/**
351363
* @return
352364
* @see org.springframework.data.mongodb.core.query.Meta.CursorOption#NO_TIMEOUT

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/Meta.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,14 @@
5050
*/
5151
long maxScanDocuments() default -1;
5252

53+
/**
54+
* Sets the number of documents to return per batch.
55+
*
56+
* @return
57+
* @since 2.1
58+
*/
59+
int cursorBatchSize() default -1;
60+
5361
/**
5462
* Add a comment to the query.
5563
*

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/MongoQueryMethod.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,10 @@ public org.springframework.data.mongodb.core.query.Meta getQueryMetaAttributes()
274274
metaAttributes.setMaxScan(meta.maxScanDocuments());
275275
}
276276

277+
if (meta.cursorBatchSize() > 0) {
278+
metaAttributes.setCursorBatchSize(meta.cursorBatchSize());
279+
}
280+
277281
if (StringUtils.hasText(meta.comment())) {
278282
metaAttributes.setComment(meta.comment());
279283
}

spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/MongoTemplateUnitTests.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import static org.springframework.data.mongodb.core.aggregation.Aggregation.*;
2323
import static org.springframework.data.mongodb.test.util.IsBsonObject.*;
2424

25-
import com.mongodb.client.model.ReplaceOptions;
2625
import lombok.Data;
2726

2827
import java.math.BigInteger;
@@ -93,6 +92,7 @@
9392
import com.mongodb.client.model.DeleteOptions;
9493
import com.mongodb.client.model.FindOneAndDeleteOptions;
9594
import com.mongodb.client.model.FindOneAndUpdateOptions;
95+
import com.mongodb.client.model.ReplaceOptions;
9696
import com.mongodb.client.model.UpdateOptions;
9797
import com.mongodb.client.result.UpdateResult;
9898

@@ -161,7 +161,7 @@ public void rejectsNullDatabaseName() throws Exception {
161161
new MongoTemplate(mongo, null);
162162
}
163163

164-
@Test(expected = IllegalArgumentException.class) // DATAMONGO-1968
164+
@Test(expected = IllegalArgumentException.class) // DATAMONGO-1968
165165
public void rejectsNullMongo() {
166166
new MongoTemplate((MongoClient) null, "database");
167167
}
@@ -653,6 +653,17 @@ public void onBeforeConvert(BeforeConvertEvent<VersionedEntity> event) {
653653
assertThat(updateCaptor.getValue(), isBsonObject().containing("$set.jon", "snow").notContaining("$isolated"));
654654
}
655655

656+
@Test // DATAMONGO-1311
657+
public void executeQueryShouldUseBatchSizeWhenPresent() {
658+
659+
when(findIterable.batchSize(anyInt())).thenReturn(findIterable);
660+
661+
Query query = new Query().cursorBatchSize(1234);
662+
template.find(query, Person.class);
663+
664+
verify(findIterable).batchSize(1234);
665+
}
666+
656667
@Test // DATAMONGO-1518
657668
public void executeQueryShouldUseCollationWhenPresent() {
658669

@@ -969,7 +980,7 @@ private MongoTemplate mockOutGetDb() {
969980
return template;
970981
}
971982

972-
/*
983+
/*
973984
* (non-Javadoc)
974985
* @see org.springframework.data.mongodb.core.core.MongoOperationsUnitTests#getOperations()
975986
*/

spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateUnitTests.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@
2222
import static org.springframework.data.mongodb.core.aggregation.Aggregation.*;
2323

2424
import com.mongodb.client.model.ReplaceOptions;
25+
import com.mongodb.reactivestreams.client.AggregatePublisher;
2526
import lombok.Data;
27+
import org.springframework.data.mongodb.core.query.Query;
2628
import reactor.core.publisher.Mono;
2729
import reactor.test.StepVerifier;
2830

@@ -79,6 +81,7 @@ public class ReactiveMongoTemplateUnitTests {
7981
@Mock MongoDatabase db;
8082
@Mock MongoCollection collection;
8183
@Mock FindPublisher findPublisher;
84+
@Mock AggregatePublisher aggregatePublisher;
8285
@Mock Publisher runCommandPublisher;
8386

8487
MongoExceptionTranslator exceptionTranslator = new MongoExceptionTranslator();
@@ -95,10 +98,15 @@ public void setUp() {
9598
when(db.runCommand(any(), any(Class.class))).thenReturn(runCommandPublisher);
9699
when(collection.find(any(Class.class))).thenReturn(findPublisher);
97100
when(collection.find(any(Document.class), any(Class.class))).thenReturn(findPublisher);
101+
when(collection.aggregate(anyList())).thenReturn(aggregatePublisher);
102+
when(collection.aggregate(anyList(), any(Class.class))).thenReturn(aggregatePublisher);
98103
when(findPublisher.projection(any())).thenReturn(findPublisher);
99104
when(findPublisher.limit(anyInt())).thenReturn(findPublisher);
100105
when(findPublisher.collation(any())).thenReturn(findPublisher);
101106
when(findPublisher.first()).thenReturn(findPublisher);
107+
when(aggregatePublisher.allowDiskUse(anyBoolean())).thenReturn(aggregatePublisher);
108+
when(aggregatePublisher.collation(any())).thenReturn(aggregatePublisher);
109+
when(aggregatePublisher.first()).thenReturn(findPublisher);
102110

103111
this.mappingContext = new MongoMappingContext();
104112
this.converter = new MappingMongoConverter(new NoOpDbRefResolver(), mappingContext);
@@ -136,6 +144,17 @@ public void autogeneratesIdForMap() {
136144
}).verifyComplete();
137145
}
138146

147+
@Test // DATAMONGO-1311
148+
public void executeQueryShouldUseBatchSizeWhenPresent() {
149+
150+
when(findPublisher.batchSize(anyInt())).thenReturn(findPublisher);
151+
152+
Query query = new Query().cursorBatchSize(1234);
153+
template.find(query, Person.class).subscribe();
154+
155+
verify(findPublisher).batchSize(1234);
156+
}
157+
139158
@Test // DATAMONGO-1518
140159
public void findShouldUseCollationWhenPresent() {
141160

spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/aggregation/ReactiveAggregationUnitTests.java

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
import static org.mockito.ArgumentMatchers.any;
1919
import static org.mockito.ArgumentMatchers.eq;
2020
import static org.mockito.Mockito.*;
21+
import static org.mockito.Mockito.anyInt;
22+
import static org.mockito.Mockito.anyList;
2123
import static org.springframework.data.mongodb.core.aggregation.Aggregation.*;
2224

2325
import org.bson.Document;
@@ -80,16 +82,6 @@ public void shouldHandleMissingEntityClass() {
8082
template.aggregate(newAggregation(), INPUT_COLLECTION, null);
8183
}
8284

83-
@Test(expected = IllegalArgumentException.class) // DATAMONGO-1646
84-
public void errorsOnCursorBatchSizeUsage() {
85-
86-
template.aggregate(
87-
newAggregation(Product.class, //
88-
project("name", "netPrice")) //
89-
.withOptions(AggregationOptions.builder().cursorBatchSize(10).build()),
90-
INPUT_COLLECTION, TagCount.class).subscribe();
91-
}
92-
9385
@Test(expected = IllegalArgumentException.class) // DATAMONGO-1646
9486
public void errorsOnExplainUsage() {
9587

@@ -101,6 +93,20 @@ public void errorsOnExplainUsage() {
10193
.subscribe();
10294
}
10395

96+
@Test // DATAMONGO-1646, DATAMONGO-1311
97+
public void appliesBatchSizeWhenPresent() {
98+
99+
when(publisher.batchSize(anyInt())).thenReturn(publisher);
100+
101+
AggregationOptions options = AggregationOptions.builder().cursorBatchSize(1234).build();
102+
template.aggregate(newAggregation(Product.class, //
103+
project("name", "netPrice")) //
104+
.withOptions(options),
105+
INPUT_COLLECTION, TagCount.class).subscribe();
106+
107+
verify(publisher).batchSize(1234);
108+
}
109+
104110
@Test // DATAMONGO-1646
105111
public void appliesCollationCorrectlyWhenPresent() {
106112

spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/query/MongoQueryMethodUnitTests.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,15 @@ public void createsMongoQueryMethodWithMaxExecutionTimeCorrectly() throws Except
146146
assertThat(method.getQueryMetaAttributes().getMaxTimeMsec(), is(100L));
147147
}
148148

149+
@Test // DATAMONGO-1311
150+
public void createsMongoQueryMethodWithBatchSizeCorrectly() throws Exception {
151+
152+
MongoQueryMethod method = queryMethod(PersonRepository.class, "batchSize");
153+
154+
assertThat(method.hasQueryMetaAttributes(), is(true));
155+
assertThat(method.getQueryMetaAttributes().getCursorBatchSize(), is(100));
156+
}
157+
149158
@Test // DATAMONGO-1403
150159
public void createsMongoQueryMethodWithSpellFixedMaxExecutionTimeCorrectly() throws Exception {
151160

@@ -233,6 +242,9 @@ interface PersonRepository extends Repository<User, Long> {
233242
@Meta
234243
List<User> emptyMetaAnnotation();
235244

245+
@Meta(cursorBatchSize = 100)
246+
List<User> batchSize();
247+
236248
@Meta(maxExecutionTimeMs = 100)
237249
List<User> metaWithMaxExecutionTime();
238250

0 commit comments

Comments
 (0)