Skip to content

Commit 78c2ab2

Browse files
committed
DATAMONGO-2012 - Polishing.
Simplify conditional flow. Replace AtomicReference construction in ChangeStreamEvent with AtomicReferenceFieldUpdater usage to reduce object allocations to streamline lazy body conversion usage. Tweak Javadoc and reference docs. Original pull request: spring-projects#576.
1 parent 88150ec commit 78c2ab2

File tree

8 files changed

+100
-69
lines changed

8 files changed

+100
-69
lines changed

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ChangeStreamEvent.java

+33-15
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
import lombok.EqualsAndHashCode;
1919

2020
import java.time.Instant;
21-
import java.util.concurrent.atomic.AtomicReference;
21+
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
2222

2323
import org.bson.BsonValue;
2424
import org.bson.Document;
@@ -41,11 +41,17 @@
4141
@EqualsAndHashCode
4242
public class ChangeStreamEvent<T> {
4343

44+
@SuppressWarnings("rawtypes") //
45+
private static final AtomicReferenceFieldUpdater<ChangeStreamEvent, Object> CONVERTED_UPDATER = AtomicReferenceFieldUpdater
46+
.newUpdater(ChangeStreamEvent.class, Object.class, "converted");
47+
4448
private final @Nullable ChangeStreamDocument<Document> raw;
4549

4650
private final Class<T> targetType;
4751
private final MongoConverter converter;
48-
private final AtomicReference<T> converted = new AtomicReference<>();
52+
53+
// accessed through CONVERTED_UPDATER.
54+
private volatile @Nullable T converted;
4955

5056
/**
5157
* @param raw can be {@literal null}.
@@ -77,7 +83,7 @@ public ChangeStreamDocument<Document> getRaw() {
7783
*/
7884
@Nullable
7985
public Instant getTimestamp() {
80-
return raw != null ? Instant.ofEpochMilli(raw.getClusterTime().getValue()) : null;
86+
return raw != null && raw.getClusterTime() != null ? Instant.ofEpochMilli(raw.getClusterTime().getValue()) : null;
8187
}
8288

8389
/**
@@ -133,36 +139,48 @@ public T getBody() {
133139
return null;
134140
}
135141

136-
if (raw.getFullDocument() == null) {
137-
return targetType.cast(raw.getFullDocument());
142+
Document fullDocument = raw.getFullDocument();
143+
144+
if (fullDocument == null) {
145+
return targetType.cast(fullDocument);
138146
}
139147

140-
return getConverted();
148+
return getConverted(fullDocument);
141149
}
142150

143-
private T getConverted() {
151+
@SuppressWarnings("unchecked")
152+
private T getConverted(Document fullDocument) {
153+
return (T) doGetConverted(fullDocument);
154+
}
155+
156+
private Object doGetConverted(Document fullDocument) {
157+
158+
Object result = CONVERTED_UPDATER.get(this);
144159

145-
T result = converted.get();
146160
if (result != null) {
147161
return result;
148162
}
149163

150-
if (ClassUtils.isAssignable(Document.class, raw.getFullDocument().getClass())) {
164+
if (ClassUtils.isAssignable(Document.class, fullDocument.getClass())) {
151165

152-
result = converter.read(targetType, raw.getFullDocument());
153-
return converted.compareAndSet(null, result) ? result : converted.get();
166+
result = converter.read(targetType, fullDocument);
167+
return CONVERTED_UPDATER.compareAndSet(this, null, result) ? result : CONVERTED_UPDATER.get(this);
154168
}
155169

156-
if (converter.getConversionService().canConvert(raw.getFullDocument().getClass(), targetType)) {
170+
if (converter.getConversionService().canConvert(fullDocument.getClass(), targetType)) {
157171

158-
result = converter.getConversionService().convert(raw.getFullDocument(), targetType);
159-
return converted.compareAndSet(null, result) ? result : converted.get();
172+
result = converter.getConversionService().convert(fullDocument, targetType);
173+
return CONVERTED_UPDATER.compareAndSet(this, null, result) ? result : CONVERTED_UPDATER.get(this);
160174
}
161175

162176
throw new IllegalArgumentException(String.format("No converter found capable of converting %s to %s",
163-
raw.getFullDocument().getClass(), targetType));
177+
fullDocument.getClass(), targetType));
164178
}
165179

180+
/*
181+
* (non-Javadoc)
182+
* @see java.lang.Object#toString()
183+
*/
166184
@Override
167185
public String toString() {
168186
return "ChangeStreamEvent {" + "raw=" + raw + ", targetType=" + targetType + '}';

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoOperations.java

+12-7
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.reactivestreams.Publisher;
2727
import org.reactivestreams.Subscription;
2828
import org.springframework.data.geo.GeoResult;
29+
import org.springframework.data.mongodb.ReactiveMongoDatabaseFactory;
2930
import org.springframework.data.mongodb.core.aggregation.Aggregation;
3031
import org.springframework.data.mongodb.core.aggregation.AggregationOptions;
3132
import org.springframework.data.mongodb.core.aggregation.TypedAggregation;
@@ -1356,9 +1357,9 @@ <S, T> Mono<T> findAndReplace(Query query, S replacement, FindAndReplaceOptions
13561357
<T> Flux<T> tail(Query query, Class<T> entityClass, String collectionName);
13571358

13581359
/**
1359-
* Subscribe to a MongoDB <a href="https://docs.mongodb.com/manual/changeStreams/">Change Streams</a> for all events
1360-
* in the configured default database via the reactive infrastructure. Use the optional provided {@link Aggregation}
1361-
* to filter events. The stream will not be completed unless the {@link org.reactivestreams.Subscription} is
1360+
* Subscribe to a MongoDB <a href="https://docs.mongodb.com/manual/changeStreams/">Change Stream</a> for all events in
1361+
* the configured default database via the reactive infrastructure. Use the optional provided {@link Aggregation} to
1362+
* filter events. The stream will not be completed unless the {@link org.reactivestreams.Subscription} is
13621363
* {@link Subscription#cancel() canceled}.
13631364
* <p />
13641365
* The {@link ChangeStreamEvent#getBody()} is mapped to the {@literal resultType} while the
@@ -1372,14 +1373,16 @@ <S, T> Mono<T> findAndReplace(Query query, S replacement, FindAndReplaceOptions
13721373
* @param <T>
13731374
* @return the {@link Flux} emitting {@link ChangeStreamEvent events} as they arrive.
13741375
* @since 2.1
1376+
* @see ReactiveMongoDatabaseFactory#getMongoDatabase()
1377+
* @see ChangeStreamOptions#getFilter()
13751378
*/
13761379
default <T> Flux<ChangeStreamEvent<T>> changeStream(ChangeStreamOptions options, Class<T> targetType) {
13771380
return changeStream(null, options, targetType);
13781381
}
13791382

13801383
/**
1381-
* Subscribe to a MongoDB <a href="https://docs.mongodb.com/manual/changeStreams/">Change Streams</a> for all events
1382-
* in the given collection via the reactive infrastructure. Use the optional provided {@link Aggregation} to filter
1384+
* Subscribe to a MongoDB <a href="https://docs.mongodb.com/manual/changeStreams/">Change Stream</a> for all events in
1385+
* the given collection via the reactive infrastructure. Use the optional provided {@link Aggregation} to filter
13831386
* events. The stream will not be completed unless the {@link org.reactivestreams.Subscription} is
13841387
* {@link Subscription#cancel() canceled}.
13851388
* <p />
@@ -1389,12 +1392,13 @@ default <T> Flux<ChangeStreamEvent<T>> changeStream(ChangeStreamOptions options,
13891392
* Use {@link ChangeStreamOptions} to set arguments like {@link ChangeStreamOptions#getResumeToken() the resumseToken}
13901393
* for resuming change streams.
13911394
*
1392-
* @param collectionName the collection to watch. Can be {@literal null}, watches all collections if so.
1395+
* @param collectionName the collection to watch. Can be {@literal null} to watch all collections.
13931396
* @param options must not be {@literal null}. Use {@link ChangeStreamOptions#empty()}.
13941397
* @param targetType the result type to use.
13951398
* @param <T>
13961399
* @return the {@link Flux} emitting {@link ChangeStreamEvent events} as they arrive.
13971400
* @since 2.1
1401+
* @see ChangeStreamOptions#getFilter()
13981402
*/
13991403
default <T> Flux<ChangeStreamEvent<T>> changeStream(@Nullable String collectionName, ChangeStreamOptions options,
14001404
Class<T> targetType) {
@@ -1403,7 +1407,7 @@ default <T> Flux<ChangeStreamEvent<T>> changeStream(@Nullable String collectionN
14031407
}
14041408

14051409
/**
1406-
* Subscribe to a MongoDB <a href="https://docs.mongodb.com/manual/changeStreams/">Change Streams</a> via the reactive
1410+
* Subscribe to a MongoDB <a href="https://docs.mongodb.com/manual/changeStreams/">Change Stream</a> via the reactive
14071411
* infrastructure. Use the optional provided {@link Aggregation} to filter events. The stream will not be completed
14081412
* unless the {@link org.reactivestreams.Subscription} is {@link Subscription#cancel() canceled}.
14091413
* <p />
@@ -1420,6 +1424,7 @@ default <T> Flux<ChangeStreamEvent<T>> changeStream(@Nullable String collectionN
14201424
* @param <T>
14211425
* @return the {@link Flux} emitting {@link ChangeStreamEvent events} as they arrive.
14221426
* @since 2.1
1427+
* @see ChangeStreamOptions#getFilter()
14231428
*/
14241429
<T> Flux<ChangeStreamEvent<T>> changeStream(@Nullable String database, @Nullable String collectionName,
14251430
ChangeStreamOptions options, Class<T> targetType);

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java

+7-8
Original file line numberDiff line numberDiff line change
@@ -2028,11 +2028,8 @@ public <T> Flux<ChangeStreamEvent<T>> changeStream(@Nullable String database, @N
20282028

20292029
List<Document> prepareFilter(ChangeStreamOptions options) {
20302030

2031-
if (!options.getFilter().isPresent()) {
2032-
return Collections.emptyList();
2033-
}
2031+
Object filter = options.getFilter().orElse(Collections.emptyList());
20342032

2035-
Object filter = options.getFilter().get();
20362033
if (filter instanceof Aggregation) {
20372034
Aggregation agg = (Aggregation) filter;
20382035
AggregationOperationContext context = agg instanceof TypedAggregation
@@ -2042,12 +2039,14 @@ List<Document> prepareFilter(ChangeStreamOptions options) {
20422039

20432040
return agg.toPipeline(new PrefixingDelegatingAggregationOperationContext(context, "fullDocument",
20442041
Arrays.asList("operationType", "fullDocument", "documentKey", "updateDescription", "ns")));
2045-
} else if (filter instanceof List) {
2042+
}
2043+
2044+
if (filter instanceof List) {
20462045
return (List<Document>) filter;
2047-
} else {
2048-
throw new IllegalArgumentException(
2049-
"ChangeStreamRequestOptions.filter mut be either an Aggregation or a plain list of Documents");
20502046
}
2047+
2048+
throw new IllegalArgumentException(
2049+
"ChangeStreamRequestOptions.filter mut be either an Aggregation or a plain list of Documents");
20512050
}
20522051

20532052
/*

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamRequest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -46,13 +46,13 @@
4646
*
4747
* or {@link com.mongodb.client.MongoDatabase} which receives events from all {@link com.mongodb.client.MongoCollection
4848
* collections} in that database.
49-
*
49+
*
5050
* <pre>
5151
* <code>
5252
* ChangeStreamRequest<Document> request = new ChangeStreamRequest<>(System.out::println, RequestOptions.justDatabase("test"));
5353
* </code>
5454
* </pre>
55-
*
55+
*
5656
* For more advanced scenarios {@link ChangeStreamOptions} offers abstractions for options like filtering, resuming,...
5757
*
5858
* <pre>

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTask.java

+21-13
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
* {@link Task} implementation for obtaining {@link ChangeStreamDocument ChangeStreamDocuments} from MongoDB.
5959
*
6060
* @author Christoph Strobl
61+
* @author Mark Paluch
6162
* @since 2.1
6263
*/
6364
class ChangeStreamTask extends CursorReadingTask<ChangeStreamDocument<Document>, Object> {
@@ -77,7 +78,7 @@ class ChangeStreamTask extends CursorReadingTask<ChangeStreamDocument<Document>,
7778
mongoConverter = template.getConverter();
7879
}
7980

80-
/*
81+
/*
8182
* (non-Javadoc)
8283
* @see org.springframework.data.mongodb.core.messaging.CursorReadingTask#initCursor(org.springframework.data.mongodb.core.MongoTemplate, org.springframework.data.mongodb.core.messaging.SubscriptionRequest.RequestOptions, java.lang.Class)
8384
*/
@@ -114,9 +115,8 @@ protected MongoCursor<ChangeStreamDocument<Document>> initCursor(MongoTemplate t
114115
.orElseGet(() -> ClassUtils.isAssignable(Document.class, targetType) ? FullDocument.DEFAULT
115116
: FullDocument.UPDATE_LOOKUP);
116117

117-
if (changeStreamOptions.getResumeTimestamp().isPresent()) {
118-
startAt = new BsonTimestamp(changeStreamOptions.getResumeTimestamp().get().toEpochMilli());
119-
}
118+
startAt = changeStreamOptions.getResumeTimestamp().map(Instant::toEpochMilli).map(BsonTimestamp::new)
119+
.orElse(null);
120120
}
121121

122122
MongoDatabase db = StringUtils.hasText(options.getDatabaseName())
@@ -149,13 +149,15 @@ protected MongoCursor<ChangeStreamDocument<Document>> initCursor(MongoTemplate t
149149
return iterable.iterator();
150150
}
151151

152+
@SuppressWarnings("unchecked")
152153
List<Document> prepareFilter(MongoTemplate template, ChangeStreamOptions options) {
153154

154155
if (!options.getFilter().isPresent()) {
155156
return Collections.emptyList();
156157
}
157158

158-
Object filter = options.getFilter().get();
159+
Object filter = options.getFilter().orElse(null);
160+
159161
if (filter instanceof Aggregation) {
160162
Aggregation agg = (Aggregation) filter;
161163
AggregationOperationContext context = agg instanceof TypedAggregation
@@ -164,14 +166,20 @@ List<Document> prepareFilter(MongoTemplate template, ChangeStreamOptions options
164166
: Aggregation.DEFAULT_CONTEXT;
165167

166168
return agg.toPipeline(new PrefixingDelegatingAggregationOperationContext(context, "fullDocument", blacklist));
167-
} else if (filter instanceof List) {
169+
}
170+
171+
if (filter instanceof List) {
168172
return (List<Document>) filter;
169-
} else {
170-
throw new IllegalArgumentException(
171-
"ChangeStreamRequestOptions.filter mut be either an Aggregation or a plain list of Documents");
172173
}
174+
175+
throw new IllegalArgumentException(
176+
"ChangeStreamRequestOptions.filter mut be either an Aggregation or a plain list of Documents");
173177
}
174178

179+
/*
180+
* (non-Javadoc)
181+
* @see org.springframework.data.mongodb.core.messaging.CursorReadingTask#createMessage(java.lang.Object, java.lang.Class, org.springframework.data.mongodb.core.messaging.SubscriptionRequest.RequestOptions)
182+
*/
175183
@Override
176184
protected Message<ChangeStreamDocument<Document>, Object> createMessage(ChangeStreamDocument<Document> source,
177185
Class<Object> targetType, RequestOptions options) {
@@ -202,7 +210,7 @@ static class ChangeStreamEventMessage<T> implements Message<ChangeStreamDocument
202210
private final ChangeStreamEvent<T> delegate;
203211
private final MessageProperties messageProperties;
204212

205-
/*
213+
/*
206214
* (non-Javadoc)
207215
* @see org.springframework.data.mongodb.core.messaging.Message#getRaw()
208216
*/
@@ -212,7 +220,7 @@ public ChangeStreamDocument<Document> getRaw() {
212220
return delegate.getRaw();
213221
}
214222

215-
/*
223+
/*
216224
* (non-Javadoc)
217225
* @see org.springframework.data.mongodb.core.messaging.Message#getBody()
218226
*/
@@ -222,7 +230,7 @@ public T getBody() {
222230
return delegate.getBody();
223231
}
224232

225-
/*
233+
/*
226234
* (non-Javadoc)
227235
* @see org.springframework.data.mongodb.core.messaging.Message#getProperties()
228236
*/
@@ -232,7 +240,7 @@ public MessageProperties getProperties() {
232240
}
233241

234242
/**
235-
* @return the resume token or {@litearl null} if not set.
243+
* @return the resume token or {@literal null} if not set.
236244
* @see ChangeStreamEvent#getResumeToken()
237245
*/
238246
@Nullable

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/SubscriptionRequest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ interface RequestOptions {
5757
* Get the database name of the db.
5858
*
5959
* @return the name of the database to subscribe to. Can be {@literal null} in which case the default
60-
* {@link MongoDbFactory#getDb()} is used.
60+
* {@link MongoDbFactory#getDb() database} is used.
6161
*/
6262
@Nullable
6363
default String getDatabaseName() {

spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/DefaultMessageListenerContainerTests.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -56,16 +56,16 @@ public class DefaultMessageListenerContainerTests {
5656
public static final String DATABASE_NAME = "change-stream-events";
5757
public static final String COLLECTION_NAME = "collection-1";
5858
public static final String COLLECTION_2_NAME = "collection-2";
59-
MongoDbFactory dbFactory;
6059

60+
public @Rule TestRule replSet = ReplicaSet.none();
61+
62+
MongoDbFactory dbFactory;
6163
MongoCollection<Document> collection;
62-
private MongoCollection<Document> collection2;
64+
MongoCollection<Document> collection2;
6365

6466
private CollectingMessageListener<Object, Object> messageListener;
6567
private MongoTemplate template;
6668

67-
public @Rule TestRule replSet = ReplicaSet.none();
68-
6969
@Before
7070
public void setUp() {
7171

0 commit comments

Comments
 (0)