Skip to content

Commit a6a0bde

Browse files
mp911dechristophstrobl
authored andcommitted
DATAMONGO-1719 - Add fluent reactive operations.
We now provide a fluent API for find, insert, update, aggregate and delete operations that can be used as an alternative for their counterparts in ReactiveMongoOperations. Original Pull Request: spring-projects#487
1 parent 30a8608 commit a6a0bde

29 files changed

+2610
-6
lines changed

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/FindPublisherPreparer.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016 the original author or authors.
2+
* Copyright 2016-2017 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -15,7 +15,6 @@
1515
*/
1616
package org.springframework.data.mongodb.core;
1717

18-
import com.mongodb.DBCursor;
1918
import com.mongodb.reactivestreams.client.FindPublisher;
2019

2120
/**
@@ -28,7 +27,7 @@ interface FindPublisherPreparer {
2827
/**
2928
* Prepare the given cursor (apply limits, skips and so on). Returns the prepared cursor.
3029
*
31-
* @param cursor
30+
* @param findPublisher must not be {@literal null}.
3231
*/
3332
<T> FindPublisher<T> prepare(FindPublisher<T> findPublisher);
3433
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
/*
2+
* Copyright 2017 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.data.mongodb.core;
17+
18+
import reactor.core.publisher.Flux;
19+
20+
import org.springframework.data.mongodb.core.aggregation.Aggregation;
21+
22+
/**
23+
* {@link ReactiveAggregationOperation} allows creation and execution of reactive MongoDB aggregation operations in a
24+
* fluent API style. <br />
25+
* The starting {@literal domainType} is used for mapping the {@link Aggregation} provided via {@code by} into the
26+
* MongoDB specific representation, as well as mapping back the resulting {@link org.bson.Document}. An alternative
27+
* input type for mapping the {@link Aggregation} can be provided by using
28+
* {@link org.springframework.data.mongodb.core.aggregation.TypedAggregation}.
29+
*
30+
* <pre>
31+
* <code>
32+
* aggregateAndReturn(Jedi.class)
33+
* .by(newAggregation(Human.class, project("These are not the droids you are looking for")))
34+
* .all();
35+
* </code>
36+
* </pre>
37+
*
38+
* @author Mark Paluch
39+
* @since 2.0
40+
*/
41+
public interface ReactiveAggregationOperation {
42+
43+
/**
44+
* Start creating an aggregation operation that returns results mapped to the given domain type. <br />
45+
* Use {@link org.springframework.data.mongodb.core.aggregation.TypedAggregation} to specify a potentially different
46+
* input type for he aggregation.
47+
*
48+
* @param domainType must not be {@literal null}.
49+
* @return new instance of {@link ReactiveAggregation}.
50+
* @throws IllegalArgumentException if domainType is {@literal null}.
51+
*/
52+
<T> ReactiveAggregation<T> aggregateAndReturn(Class<T> domainType);
53+
54+
/**
55+
* Collection override (optional).
56+
*/
57+
interface AggregationOperationWithCollection<T> {
58+
59+
/**
60+
* Explicitly set the name of the collection to perform the query on. <br />
61+
* Skip this step to use the default collection derived from the domain type.
62+
*
63+
* @param collection must not be {@literal null} nor {@literal empty}.
64+
* @return new instance of {@link AggregationOperationWithAggregation}.
65+
* @throws IllegalArgumentException if collection is {@literal null}.
66+
*/
67+
AggregationOperationWithAggregation<T> inCollection(String collection);
68+
}
69+
70+
/**
71+
* Trigger execution by calling one of the terminating methods.
72+
*/
73+
interface TerminatingAggregationOperation<T> {
74+
75+
/**
76+
* Apply pipeline operations as specified and stream all matching elements. <br />
77+
*
78+
* @return a {@link Flux} streaming all matching elements. Never {@literal null}.
79+
*/
80+
Flux<T> all();
81+
}
82+
83+
/**
84+
* Define the aggregation with pipeline stages.
85+
*/
86+
interface AggregationOperationWithAggregation<T> {
87+
88+
/**
89+
* Set the aggregation to be used.
90+
*
91+
* @param aggregation must not be {@literal null}.
92+
* @return new instance of {@link TerminatingAggregationOperation}.
93+
* @throws IllegalArgumentException if aggregation is {@literal null}.
94+
*/
95+
TerminatingAggregationOperation<T> by(Aggregation aggregation);
96+
}
97+
98+
interface ReactiveAggregation<T>
99+
extends AggregationOperationWithCollection<T>, AggregationOperationWithAggregation<T> {}
100+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
/*
2+
* Copyright 2017 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.data.mongodb.core;
17+
18+
import lombok.AccessLevel;
19+
import lombok.NonNull;
20+
import lombok.RequiredArgsConstructor;
21+
import lombok.experimental.FieldDefaults;
22+
import reactor.core.publisher.Flux;
23+
24+
import org.springframework.data.mongodb.core.aggregation.Aggregation;
25+
import org.springframework.data.mongodb.core.aggregation.TypedAggregation;
26+
import org.springframework.util.Assert;
27+
import org.springframework.util.StringUtils;
28+
29+
/**
30+
* Implementation of {@link ExecutableAggregationOperation} operating directly on {@link ReactiveMongoTemplate}.
31+
*
32+
* @author Mark Paluch
33+
* @since 2.0
34+
*/
35+
class ReactiveAggregationOperationSupport implements ReactiveAggregationOperation {
36+
37+
private final ReactiveMongoTemplate template;
38+
39+
/**
40+
* Create new instance of {@link ReactiveAggregationOperationSupport}.
41+
*
42+
* @param template must not be {@literal null}.
43+
* @throws IllegalArgumentException if template is {@literal null}.
44+
*/
45+
ReactiveAggregationOperationSupport(ReactiveMongoTemplate template) {
46+
47+
Assert.notNull(template, "Template must not be null!");
48+
49+
this.template = template;
50+
}
51+
52+
@Override
53+
public <T> ReactiveAggregation<T> aggregateAndReturn(Class<T> domainType) {
54+
55+
Assert.notNull(domainType, "DomainType must not be null!");
56+
57+
return new ReactiveAggregationSupport<>(template, domainType, null, null);
58+
}
59+
60+
@RequiredArgsConstructor
61+
@FieldDefaults(level = AccessLevel.PRIVATE, makeFinal = true)
62+
static class ReactiveAggregationSupport<T>
63+
implements AggregationOperationWithAggregation<T>, ReactiveAggregation<T>, TerminatingAggregationOperation<T> {
64+
65+
@NonNull ReactiveMongoTemplate template;
66+
@NonNull Class<T> domainType;
67+
Aggregation aggregation;
68+
String collection;
69+
70+
@Override
71+
public AggregationOperationWithAggregation<T> inCollection(String collection) {
72+
73+
Assert.hasText(collection, "Collection must not be null nor empty!");
74+
75+
return new ReactiveAggregationSupport<>(template, domainType, aggregation, collection);
76+
}
77+
78+
@Override
79+
public TerminatingAggregationOperation<T> by(Aggregation aggregation) {
80+
81+
Assert.notNull(aggregation, "Aggregation must not be null!");
82+
83+
return new ReactiveAggregationSupport<>(template, domainType, aggregation, collection);
84+
}
85+
86+
@Override
87+
public Flux<T> all() {
88+
return template.aggregate(aggregation, getCollectionName(aggregation), domainType);
89+
}
90+
91+
private String getCollectionName(Aggregation aggregation) {
92+
93+
if (StringUtils.hasText(collection)) {
94+
return collection;
95+
}
96+
97+
if (aggregation instanceof TypedAggregation) {
98+
99+
TypedAggregation<?> typedAggregation = (TypedAggregation<?>) aggregation;
100+
101+
if (typedAggregation.getInputType() != null) {
102+
return template.determineCollectionName(typedAggregation.getInputType());
103+
}
104+
}
105+
106+
return template.determineCollectionName(domainType);
107+
}
108+
}
109+
}

0 commit comments

Comments
 (0)