Skip to content

Commit 4673525

Browse files
christophstroblmp911de
authored andcommitted
DATAMONGO-2113 - Fix resumeTimestamp conversion for change streams.
We now use the first 32 bits of the timestamp to create the instant and ignore the ordinal value. Original pull request: spring-projects#615.
1 parent 5c009e3 commit 4673525

File tree

7 files changed

+48
-8
lines changed

7 files changed

+48
-8
lines changed

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ChangeStreamEvent.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,9 @@ public ChangeStreamDocument<Document> getRaw() {
8383
*/
8484
@Nullable
8585
public Instant getTimestamp() {
86-
return raw != null && raw.getClusterTime() != null ? Instant.ofEpochMilli(raw.getClusterTime().getValue()) : null;
86+
87+
return raw != null && raw.getClusterTime() != null
88+
? converter.getConversionService().convert(raw.getClusterTime(), Instant.class) : null;
8789
}
8890

8991
/**

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -1915,7 +1915,7 @@ public <T> Flux<ChangeStreamEvent<T>> changeStream(@Nullable String database, @N
19151915

19161916
publisher = options.getResumeToken().map(BsonValue::asDocument).map(publisher::resumeAfter).orElse(publisher);
19171917
publisher = options.getCollation().map(Collation::toMongoCollation).map(publisher::collation).orElse(publisher);
1918-
publisher = options.getResumeTimestamp().map(it -> new BsonTimestamp(it.toEpochMilli()))
1918+
publisher = options.getResumeTimestamp().map(it -> new BsonTimestamp((int) it.getEpochSecond(), 0))
19191919
.map(publisher::startAtOperationTime).orElse(publisher);
19201920
publisher = publisher.fullDocument(options.getFullDocumentLookup().orElse(fullDocument));
19211921

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/convert/MongoConverters.java

+21
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,15 @@
1919
import java.math.BigInteger;
2020
import java.net.MalformedURLException;
2121
import java.net.URL;
22+
import java.time.Instant;
2223
import java.util.ArrayList;
2324
import java.util.Collection;
2425
import java.util.Currency;
2526
import java.util.List;
2627
import java.util.concurrent.atomic.AtomicInteger;
2728
import java.util.concurrent.atomic.AtomicLong;
2829

30+
import org.bson.BsonTimestamp;
2931
import org.bson.Document;
3032
import org.bson.types.Binary;
3133
import org.bson.types.Code;
@@ -86,6 +88,7 @@ static Collection<Object> getConvertersToRegister() {
8688
converters.add(LongToAtomicLongConverter.INSTANCE);
8789
converters.add(IntegerToAtomicIntegerConverter.INSTANCE);
8890
converters.add(BinaryToByteArrayConverter.INSTANCE);
91+
converters.add(BsonTimestampToInstantConverter.INSTANCE);
8992

9093
return converters;
9194
}
@@ -465,4 +468,22 @@ public byte[] convert(Binary source) {
465468
return source.getData();
466469
}
467470
}
471+
472+
/**
473+
* {@link Converter} implementation converting {@link BsonTimestamp} into {@link Instant}.
474+
*
475+
* @author Christoph Strobl
476+
* @since 2.1.2
477+
*/
478+
@ReadingConverter
479+
enum BsonTimestampToInstantConverter implements Converter<BsonTimestamp, Instant> {
480+
481+
INSTANCE;
482+
483+
@Nullable
484+
@Override
485+
public Instant convert(BsonTimestamp source) {
486+
return Instant.ofEpochSecond(source.getTime(), 0);
487+
}
488+
}
468489
}

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTask.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ protected MongoCursor<ChangeStreamDocument<Document>> initCursor(MongoTemplate t
115115
.orElseGet(() -> ClassUtils.isAssignable(Document.class, targetType) ? FullDocument.DEFAULT
116116
: FullDocument.UPDATE_LOOKUP);
117117

118-
startAt = changeStreamOptions.getResumeTimestamp().map(Instant::toEpochMilli).map(BsonTimestamp::new)
118+
startAt = changeStreamOptions.getResumeTimestamp().map(it -> new BsonTimestamp((int) it.getEpochSecond(), 0))
119119
.orElse(null);
120120
}
121121

spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateTests.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import reactor.core.publisher.Mono;
3030
import reactor.test.StepVerifier;
3131

32+
import java.time.Duration;
3233
import java.time.Instant;
3334
import java.util.Arrays;
3435
import java.util.Collections;
@@ -1355,7 +1356,7 @@ public void watchesDatabaseCorrectly() throws InterruptedException {
13551356
}
13561357
}
13571358

1358-
@Test // DATAMONGO-2012
1359+
@Test // DATAMONGO-2012, DATAMONGO-2113
13591360
public void resumesAtTimestampCorrectly() throws InterruptedException {
13601361

13611362
Assumptions.assumeThat(ReplicaSet.required().runsAsReplicaSet()).isTrue();
@@ -1372,7 +1373,7 @@ public void resumesAtTimestampCorrectly() throws InterruptedException {
13721373
Person person2 = new Person("Data", 37);
13731374
Person person3 = new Person("MongoDB", 39);
13741375

1375-
StepVerifier.create(template.save(person1)).expectNextCount(1).verifyComplete();
1376+
StepVerifier.create(template.save(person1).delayElement(Duration.ofSeconds(1))).expectNextCount(1).verifyComplete();
13761377
StepVerifier.create(template.save(person2)).expectNextCount(1).verifyComplete();
13771378

13781379
Thread.sleep(500); // just give it some time to link receive all events

spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/convert/MongoConvertersUnitTests.java

+15-2
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,15 @@
1919
import static org.junit.Assert.*;
2020

2121
import java.math.BigDecimal;
22+
import java.time.Instant;
23+
import java.time.temporal.ChronoUnit;
2224
import java.util.Currency;
2325
import java.util.concurrent.atomic.AtomicInteger;
2426
import java.util.concurrent.atomic.AtomicLong;
2527

28+
import org.assertj.core.data.TemporalUnitLessThanOffset;
29+
import org.bson.BsonTimestamp;
30+
import org.bson.Document;
2631
import org.junit.Test;
2732
import org.springframework.data.geo.Box;
2833
import org.springframework.data.geo.Circle;
@@ -32,14 +37,14 @@
3237
import org.springframework.data.mongodb.core.convert.MongoConverters.AtomicIntegerToIntegerConverter;
3338
import org.springframework.data.mongodb.core.convert.MongoConverters.AtomicLongToLongConverter;
3439
import org.springframework.data.mongodb.core.convert.MongoConverters.BigDecimalToStringConverter;
40+
import org.springframework.data.mongodb.core.convert.MongoConverters.BsonTimestampToInstantConverter;
3541
import org.springframework.data.mongodb.core.convert.MongoConverters.CurrencyToStringConverter;
3642
import org.springframework.data.mongodb.core.convert.MongoConverters.IntegerToAtomicIntegerConverter;
3743
import org.springframework.data.mongodb.core.convert.MongoConverters.LongToAtomicLongConverter;
3844
import org.springframework.data.mongodb.core.convert.MongoConverters.StringToBigDecimalConverter;
3945
import org.springframework.data.mongodb.core.convert.MongoConverters.StringToCurrencyConverter;
4046
import org.springframework.data.mongodb.core.geo.Sphere;
41-
42-
import org.bson.Document;
47+
import org.springframework.data.mongodb.test.util.Assertions;
4348

4449
/**
4550
* Unit tests for {@link MongoConverters}.
@@ -145,4 +150,12 @@ public void convertsLongToAtomicLongCorrectly() {
145150
public void convertsIntegerToAtomicIntegerCorrectly() {
146151
assertThat(IntegerToAtomicIntegerConverter.INSTANCE.convert(100), is(instanceOf(AtomicInteger.class)));
147152
}
153+
154+
@Test // DATAMONGO-2113
155+
public void convertsBsonTimestampToInstantCorrectly() {
156+
157+
Assertions.assertThat(BsonTimestampToInstantConverter.INSTANCE.convert(new BsonTimestamp(6615900307735969796L)))
158+
.isCloseTo(Instant.ofEpochSecond(1540384327), new TemporalUnitLessThanOffset(100, ChronoUnit.MILLIS));
159+
}
160+
148161
}

spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTests.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -405,7 +405,7 @@ public void readsFullDocumentForUpdateWhenNotMappedToDomainTypeButLookupSpecifie
405405
.append("user_name", "jellyBelly").append("age", 8).append("_class", User.class.getName()));
406406
}
407407

408-
@Test // DATAMONGO-2012
408+
@Test // DATAMONGO-2012, DATAMONGO-2113
409409
public void resumeAtTimestampCorrectly() throws InterruptedException {
410410

411411
CollectingMessageListener<ChangeStreamDocument<Document>, User> messageListener1 = new CollectingMessageListener<>();
@@ -415,6 +415,9 @@ public void resumeAtTimestampCorrectly() throws InterruptedException {
415415
awaitSubscription(subscription1);
416416

417417
template.save(jellyBelly);
418+
419+
Thread.sleep(1000); // cluster timestamp is in seconds, so we need to wait at least one.
420+
418421
template.save(sugarSplashy);
419422

420423
awaitMessages(messageListener1, 12);

0 commit comments

Comments
 (0)