This chapter will point out the specialties for reactive repository support for MongoDB. This builds on the core repository support explained in [repositories]. So make sure you’ve got a sound understanding of the basic concepts explained there.
The reactive space offers various reactive composition libraries. The most common libraries are RxJava and Project Reactor.
Spring Data MongoDB is built on top of the MongoDB Reactive Streams driver to provide maximal interoperability relying on the Reactive Streams initiative. Static APIs such as ReactiveMongoOperations
are provided by using Project Reactor’s Flux
and Mono
types. Project Reactor offers various adapters to convert reactive wrapper types (Flux
to Observable
and vice versa) but conversion can easily clutter your code.
Spring Data’s Repository abstraction is a dynamic API, mostly defined by you and your requirements, as you’re declaring query methods. Reactive MongoDB repositories can be either implemented using RxJava or Project Reactor wrapper types by simply extending from one of the library-specific repository interfaces:
-
ReactiveCrudRepository
-
ReactiveSortingRepository
-
RxJava2CrudRepository
-
RxJava2SortingRepository
Spring Data converts reactive wrapper types behind the scenes so that you can stick to your favorite composition library.
To access domain entities stored in a MongoDB you can leverage our sophisticated repository support that eases implementing those quite significantly. To do so, simply create an interface for your repository:
public class Person {
@Id
private String id;
private String firstname;
private String lastname;
private Address address;
// … getters and setters omitted
}
We have a quite simple domain object here. Note that it has a property named id
of type ObjectId
. The default serialization mechanism used in MongoTemplate
(which is backing the repository support) regards properties named id as document id. Currently we support String
, ObjectId
and BigInteger
as id-types.
public interface ReactivePersonRepository extends ReactiveSortingRepository<Person, Long> {
Flux<Person> findByFirstname(String firstname);
Flux<Person> findByFirstname(Publisher<String> firstname);
Flux<Person> findByFirstnameOrderByLastname(String firstname, Pageable pageable);
Mono<Person> findByFirstnameAndLastname(String firstname, String lastname);
}
For JavaConfig use the @EnableReactiveMongoRepositories
annotation. The annotation carries the very same attributes like the namespace element. If no base package is configured the infrastructure will scan the package of the annotated configuration class.
Note
|
MongoDB uses two different drivers for blocking and reactive (non-blocking) data access. It’s required to create a connection using the Reactive Streams driver to provide the required infrastructure for Spring Data’s Reactive MongoDB support hence you’re required to provide a separate Configuration for MongoDB’s Reactive Streams driver. Please also note that your application will operate on two different connections if using Reactive and Blocking Spring Data MongoDB Templates and Repositories. |
@Configuration
@EnableReactiveMongoRepositories
class ApplicationConfig extends AbstractReactiveMongoConfiguration {
@Override
protected String getDatabaseName() {
return "e-store";
}
@Override
public MongoClient reactiveMongoClient() {
return MongoClients.create();
}
@Override
protected String getMappingBasePackage() {
return "com.oreilly.springdata.mongodb"
}
}
As our domain repository extends ReactiveSortingRepository
it provides you with CRUD operations as well as methods for sorted access to the entities. Working with the repository instance is just a matter of dependency injecting it into a client.
public class PersonRepositoryTests {
@Autowired ReactivePersonRepository repository;
@Test
public void sortsElementsCorrectly() {
Flux<Person> persons = repository.findAll(Sort.by(new Order(ASC, "lastname")));
}
}
Spring Data’s Reactive MongoDB support comes with a reduced feature set compared to the blocking MongoDB Repositories.
Following features are supported:
Warning
|
Reactive Repositories do not support Type-safe Query methods using Querydsl. |
As you’ve just seen there are a few keywords triggering geo-spatial operations within a MongoDB query. The Near
keyword allows some further modification. Let’s have look at some examples:
Near
queriespublic interface PersonRepository extends ReactiveMongoRepository<Person, String>
// { 'location' : { '$near' : [point.x, point.y], '$maxDistance' : distance}}
Flux<Person> findByLocationNear(Point location, Distance distance);
}
Adding a Distance
parameter to the query method allows restricting results to those within the given distance. If the Distance
was set up containing a Metric
we will transparently use $nearSphere
instead of $code.
Note
|
Reactive Geo-spatial repository queries support the domain type and GeoResult<T> results within a reactive wrapper type. GeoPage and GeoResults are not supported as they contradict the deferred result approach with pre-calculating the average distance. Howevery, you can still pass in a Pageable argument to page results yourself.
|
Distance
with Metrics
Point point = new Point(43.7, 48.8);
Distance distance = new Distance(200, Metrics.KILOMETERS);
… = repository.findByLocationNear(point, distance);
// {'location' : {'$nearSphere' : [43.7, 48.8], '$maxDistance' : 0.03135711885774796}}
As you can see using a Distance
equipped with a Metric
causes $nearSphere
clause to be added instead of a plain $near
. Beyond that the actual distance gets calculated according to the Metrics
used.
Note
|
Using @GeoSpatialIndexed(type = GeoSpatialIndexType.GEO_2DSPHERE) on the target property forces usage of $nearSphere operator.
|
public interface PersonRepository extends ReactiveMongoRepository<Person, String>
// {'geoNear' : 'location', 'near' : [x, y] }
Flux<GeoResult<Person>> findByLocationNear(Point location);
// No metric: {'geoNear' : 'person', 'near' : [x, y], maxDistance : distance }
// Metric: {'geoNear' : 'person', 'near' : [x, y], 'maxDistance' : distance,
// 'distanceMultiplier' : metric.multiplier, 'spherical' : true }
Flux<GeoResult<Person>> findByLocationNear(Point location, Distance distance);
// Metric: {'geoNear' : 'person', 'near' : [x, y], 'minDistance' : min,
// 'maxDistance' : max, 'distanceMultiplier' : metric.multiplier,
// 'spherical' : true }
Flux<GeoResult<Person>> findByLocationNear(Point location, Distance min, Distance max);
// {'geoNear' : 'location', 'near' : [x, y] }
Flux<GeoResult<Person>> findByLocationNear(Point location);
}
By default, MongoDB will automatically close a cursor when the client exhausts all results supplied by the cursor. Closing a cursor on exhaustion turns a stream into a finite stream. For capped collections you may use a Tailable Cursor that remains open after the client consumes all initially returned data. Using tailable cursors with a reactive data types allows construction of infinite streams. A tailable cursor remains open until it is closed externally. It emits data as new documents arrive in a capped collection.
Tailable cursors may become dead, or invalid, if either the query returns no match or the cursor returns the document at the "end" of the collection and then the application deletes that document.
Flux<Person> stream = template.tail(query(where("name").is("Joe")), Person.class);
Disposable subscription = stream.doOnNext(person -> System.out.println(person)).subscribe();
// …
// Later: Dispose the subscription to close the stream
subscription.dispose();
Spring Data MongoDB Reactive repositories support infinite streams by annotating a query method with @Tailable
. This works for methods returning Flux
and other reactive types capable of emitting multiple elements.
public interface PersonRepository extends ReactiveMongoRepository<Person, String> {
@Tailable
Flux<Person> findByFirstname(String firstname);
}
Flux<Person> stream = repository.findByFirstname("Joe");
Disposable subscription = stream.doOnNext(System.out::println).subscribe();
// …
// Later: Dispose the subscription to close the stream
subscription.dispose();
Tip
|
Capped collections can be created via MongoOperations.createCollection . Just provide the required CollectionOptions.empty().capped()…
|