Skip to content

Commit e2a1220

Browse files
Hacking: Apply skip & limitRequest to source (eg. a db driver)
1 parent ad7315d commit e2a1220

File tree

3 files changed

+406
-9
lines changed

3 files changed

+406
-9
lines changed

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java

Lines changed: 150 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,14 @@
2020
import lombok.AccessLevel;
2121
import lombok.NonNull;
2222
import lombok.RequiredArgsConstructor;
23+
import reactor.core.CoreSubscriber;
2324
import reactor.core.publisher.Flux;
2425
import reactor.core.publisher.Mono;
26+
import reactor.util.context.Context;
2527
import reactor.util.function.Tuple2;
2628
import reactor.util.function.Tuples;
2729

30+
import java.lang.reflect.Field;
2831
import java.util.*;
2932
import java.util.concurrent.TimeUnit;
3033
import java.util.function.Consumer;
@@ -70,7 +73,16 @@
7073
import org.springframework.data.mongodb.core.aggregation.PrefixingDelegatingAggregationOperationContext;
7174
import org.springframework.data.mongodb.core.aggregation.TypeBasedAggregationOperationContext;
7275
import org.springframework.data.mongodb.core.aggregation.TypedAggregation;
73-
import org.springframework.data.mongodb.core.convert.*;
76+
import org.springframework.data.mongodb.core.convert.DbRefResolver;
77+
import org.springframework.data.mongodb.core.convert.JsonSchemaMapper;
78+
import org.springframework.data.mongodb.core.convert.MappingMongoConverter;
79+
import org.springframework.data.mongodb.core.convert.MongoConverter;
80+
import org.springframework.data.mongodb.core.convert.MongoCustomConversions;
81+
import org.springframework.data.mongodb.core.convert.MongoJsonSchemaMapper;
82+
import org.springframework.data.mongodb.core.convert.MongoWriter;
83+
import org.springframework.data.mongodb.core.convert.NoOpDbRefResolver;
84+
import org.springframework.data.mongodb.core.convert.QueryMapper;
85+
import org.springframework.data.mongodb.core.convert.UpdateMapper;
7486
import org.springframework.data.mongodb.core.index.MongoMappingEventPublisher;
7587
import org.springframework.data.mongodb.core.index.ReactiveIndexOperations;
7688
import org.springframework.data.mongodb.core.index.ReactiveMongoPersistentEntityIndexCreator;
@@ -102,6 +114,7 @@
102114
import org.springframework.util.ClassUtils;
103115
import org.springframework.util.CollectionUtils;
104116
import org.springframework.util.ObjectUtils;
117+
import org.springframework.util.ReflectionUtils;
105118
import org.springframework.util.ResourceUtils;
106119
import org.springframework.util.StringUtils;
107120

@@ -113,11 +126,29 @@
113126
import com.mongodb.MongoException;
114127
import com.mongodb.ReadPreference;
115128
import com.mongodb.WriteConcern;
116-
import com.mongodb.client.model.*;
129+
import com.mongodb.client.model.CountOptions;
130+
import com.mongodb.client.model.CreateCollectionOptions;
131+
import com.mongodb.client.model.DeleteOptions;
132+
import com.mongodb.client.model.FindOneAndDeleteOptions;
133+
import com.mongodb.client.model.FindOneAndReplaceOptions;
134+
import com.mongodb.client.model.FindOneAndUpdateOptions;
135+
import com.mongodb.client.model.ReplaceOptions;
136+
import com.mongodb.client.model.ReturnDocument;
137+
import com.mongodb.client.model.UpdateOptions;
138+
import com.mongodb.client.model.ValidationOptions;
117139
import com.mongodb.client.model.changestream.FullDocument;
118140
import com.mongodb.client.result.DeleteResult;
119141
import com.mongodb.client.result.UpdateResult;
120-
import com.mongodb.reactivestreams.client.*;
142+
import com.mongodb.reactivestreams.client.AggregatePublisher;
143+
import com.mongodb.reactivestreams.client.ChangeStreamPublisher;
144+
import com.mongodb.reactivestreams.client.ClientSession;
145+
import com.mongodb.reactivestreams.client.DistinctPublisher;
146+
import com.mongodb.reactivestreams.client.FindPublisher;
147+
import com.mongodb.reactivestreams.client.MapReducePublisher;
148+
import com.mongodb.reactivestreams.client.MongoClient;
149+
import com.mongodb.reactivestreams.client.MongoCollection;
150+
import com.mongodb.reactivestreams.client.MongoDatabase;
151+
import com.mongodb.reactivestreams.client.Success;
121152

122153
/**
123154
* Primary implementation of {@link ReactiveMongoOperations}. It simplifies the use of Reactive MongoDB usage and helps
@@ -581,9 +612,98 @@ public <T> Flux<T> createFlux(String collectionName, ReactiveCollectionCallback<
581612
Mono<MongoCollection<Document>> collectionPublisher = Mono
582613
.fromCallable(() -> getAndPrepareCollection(doGetDatabase(), collectionName));
583614

584-
return collectionPublisher.flatMapMany(callback::doInCollection).onErrorMap(translateException());
615+
Flux<T> source = collectionPublisher.flatMapMany(callback::doInCollection).onErrorMap(translateException());
616+
617+
618+
return new Flux<T>() {
619+
620+
@Override
621+
public void subscribe(CoreSubscriber actual) {
622+
623+
Long skip = extractSkip(actual);
624+
Long take = extractLimit(actual);
625+
626+
System.out.println(String.format("Setting offset %s and limit: %s", skip, take));
627+
628+
Context context = Context.empty();
629+
630+
// and here we use the original Flux and evaluate skip / take in the template
631+
if (skip != null && skip > 0L) {
632+
context = context.put("skip", skip);
633+
}
634+
if (take != null && take > 0L) {
635+
context = context.put("take", take);
636+
}
637+
638+
639+
source.subscriberContext(context).subscribe(actual);
640+
}
641+
};
642+
}
643+
644+
// --> HACKING
645+
646+
@Nullable
647+
static Long extractSkip(Subscriber subscriber) {
648+
649+
if (subscriber == null || !ClassUtils.getShortName(subscriber.getClass()).endsWith("SkipSubscriber")) {
650+
return null;
651+
}
652+
653+
java.lang.reflect.Field field = ReflectionUtils.findField(subscriber.getClass(), "remaining");
654+
if (field == null) {
655+
return null;
656+
}
657+
658+
ReflectionUtils.makeAccessible(field);
659+
Long skip = (Long) ReflectionUtils.getField(field, subscriber);
660+
if (skip != null && skip > 0L) {
661+
662+
// reset the field, otherwise we'd skip stuff in the code.
663+
ReflectionUtils.setField(field, subscriber, 0L);
664+
}
665+
666+
return skip;
667+
}
668+
669+
@Nullable
670+
static Long extractLimit(Subscriber subscriber) {
671+
672+
if (subscriber == null) {
673+
return null;
674+
}
675+
676+
if (!ClassUtils.getShortName(subscriber.getClass()).endsWith("TakeSubscriber")) {
677+
return extractLimit(extractPotentialTakeSubscriber(subscriber));
678+
}
679+
680+
java.lang.reflect.Field field = ReflectionUtils.findField(subscriber.getClass(), "n");
681+
if (field == null) {
682+
return null;
683+
}
684+
685+
ReflectionUtils.makeAccessible(field);
686+
return (Long) ReflectionUtils.getField(field, subscriber);
687+
}
688+
689+
@Nullable
690+
static Subscriber extractPotentialTakeSubscriber(Subscriber subscriber) {
691+
692+
if (!ClassUtils.getShortName(subscriber.getClass()).endsWith("SkipSubscriber")) {
693+
return null;
694+
}
695+
696+
Field field = ReflectionUtils.findField(subscriber.getClass(), "actual");
697+
if (field == null) {
698+
return null;
699+
}
700+
701+
ReflectionUtils.makeAccessible(field);
702+
return (Subscriber) ReflectionUtils.getField(field, subscriber);
585703
}
586704

705+
// <--- HACKING
706+
587707
/**
588708
* Create a reusable {@link Mono} for the {@code collectionName} and {@link ReactiveCollectionCallback}.
589709
*
@@ -2539,12 +2659,33 @@ private <T> Flux<T> executeFindMultiInternal(ReactiveCollectionQueryCallback<Doc
25392659

25402660
return createFlux(collectionName, collection -> {
25412661

2542-
FindPublisher<Document> findPublisher = collectionCallback.doInCollection(collection);
2662+
return Mono.subscriberContext().flatMapMany(context -> {
2663+
2664+
FindPublisher<Document> findPublisher = collectionCallback.doInCollection(collection);
2665+
2666+
if (preparer != null) {
2667+
findPublisher = preparer.prepare(findPublisher);
2668+
}
2669+
2670+
Long skip = context.getOrDefault("skip", null);
2671+
Long take = context.getOrDefault("take", null);
2672+
2673+
System.out.println(String.format("Using offset: %s and limit: %s", skip, take));
2674+
2675+
if(skip != null && skip > 0L) {
2676+
findPublisher = findPublisher.skip(skip.intValue());
2677+
}
2678+
2679+
if(take != null && take > 0L) {
2680+
findPublisher = findPublisher.limit(take.intValue());
2681+
}
2682+
2683+
return Flux.from(findPublisher).doOnNext(System.out::println).map(objectCallback::doWith);
2684+
2685+
});
2686+
2687+
25432688

2544-
if (preparer != null) {
2545-
findPublisher = preparer.prepare(findPublisher);
2546-
}
2547-
return Flux.from(findPublisher).map(objectCallback::doWith);
25482689
});
25492690
}
25502691

0 commit comments

Comments
 (0)