Skip to content

Latest commit

 

History

History
238 lines (171 loc) · 9.99 KB

reactive-mongo-repositories.adoc

File metadata and controls

238 lines (171 loc) · 9.99 KB

Reactive MongoDB repositories

Introduction

This chapter will point out the specialties for reactive repository support for MongoDB. This builds on the core repository support explained in [repositories]. So make sure you’ve got a sound understanding of the basic concepts explained there.

Reactive Composition Libraries

The reactive space offers various reactive composition libraries. The most common libraries are RxJava and Project Reactor.

Spring Data MongoDB is built on top of the MongoDB Reactive Streams driver to provide maximal interoperability relying on the Reactive Streams initiative. Static APIs such as ReactiveMongoOperations are provided by using Project Reactor’s Flux and Mono types. Project Reactor offers various adapters to convert reactive wrapper types (Flux to Observable and vice versa) but conversion can easily clutter your code.

Spring Data’s Repository abstraction is a dynamic API, mostly defined by you and your requirements, as you’re declaring query methods. Reactive MongoDB repositories can be either implemented using RxJava or Project Reactor wrapper types by simply extending from one of the library-specific repository interfaces:

  • ReactiveCrudRepository

  • ReactiveSortingRepository

  • RxJava2CrudRepository

  • RxJava2SortingRepository

Spring Data converts reactive wrapper types behind the scenes so that you can stick to your favorite composition library.

Usage

To access domain entities stored in a MongoDB you can leverage our sophisticated repository support that eases implementing those quite significantly. To do so, simply create an interface for your repository:

Example 1. Sample Person entity
public class Person {

  @Id
  private String id;
  private String firstname;
  private String lastname;
  private Address address;

  // … getters and setters omitted
}

We have a quite simple domain object here. Note that it has a property named id of type ObjectId. The default serialization mechanism used in MongoTemplate (which is backing the repository support) regards properties named id as document id. Currently we support String, ObjectId and BigInteger as id-types.

Example 2. Basic repository interface to persist Person entities
public interface ReactivePersonRepository extends ReactiveSortingRepository<Person, Long> {

  Flux<Person> findByFirstname(String firstname);

  Flux<Person> findByFirstname(Publisher<String> firstname);

  Flux<Person> findByFirstnameOrderByLastname(String firstname, Pageable pageable);

  Mono<Person> findByFirstnameAndLastname(String firstname, String lastname);
}

For JavaConfig use the @EnableReactiveMongoRepositories annotation. The annotation carries the very same attributes like the namespace element. If no base package is configured the infrastructure will scan the package of the annotated configuration class.

Note
MongoDB uses two different drivers for blocking and reactive (non-blocking) data access. It’s required to create a connection using the Reactive Streams driver to provide the required infrastructure for Spring Data’s Reactive MongoDB support hence you’re required to provide a separate Configuration for MongoDB’s Reactive Streams driver. Please also note that your application will operate on two different connections if using Reactive and Blocking Spring Data MongoDB Templates and Repositories.
Example 3. JavaConfig for repositories
@Configuration
@EnableReactiveMongoRepositories
class ApplicationConfig extends AbstractReactiveMongoConfiguration {

  @Override
  protected String getDatabaseName() {
    return "e-store";
  }

  @Override
  public MongoClient reactiveMongoClient() {
    return MongoClients.create();
  }

  @Override
  protected String getMappingBasePackage() {
    return "com.oreilly.springdata.mongodb"
  }
}

As our domain repository extends ReactiveSortingRepository it provides you with CRUD operations as well as methods for sorted access to the entities. Working with the repository instance is just a matter of dependency injecting it into a client.

Example 4. Sorted access to Person entities
public class PersonRepositoryTests {

    @Autowired ReactivePersonRepository repository;

    @Test
    public void sortsElementsCorrectly() {
      Flux<Person> persons = repository.findAll(Sort.by(new Order(ASC, "lastname")));
    }
}

Features

Spring Data’s Reactive MongoDB support comes with a reduced feature set compared to the blocking MongoDB Repositories.

Following features are supported:

Warning
Reactive Repositories do not support Type-safe Query methods using Querydsl.

Geo-spatial repository queries

As you’ve just seen there are a few keywords triggering geo-spatial operations within a MongoDB query. The Near keyword allows some further modification. Let’s have look at some examples:

Example 5. Advanced Near queries
public interface PersonRepository extends ReactiveMongoRepository<Person, String>

  // { 'location' : { '$near' : [point.x, point.y], '$maxDistance' : distance}}
  Flux<Person> findByLocationNear(Point location, Distance distance);
}

Adding a Distance parameter to the query method allows restricting results to those within the given distance. If the Distance was set up containing a Metric we will transparently use $nearSphere instead of $code.

Note
Reactive Geo-spatial repository queries support the domain type and GeoResult<T> results within a reactive wrapper type. GeoPage and GeoResults are not supported as they contradict the deferred result approach with pre-calculating the average distance. Howevery, you can still pass in a Pageable argument to page results yourself.
Example 6. Using Distance with Metrics
Point point = new Point(43.7, 48.8);
Distance distance = new Distance(200, Metrics.KILOMETERS);
… = repository.findByLocationNear(point, distance);
// {'location' : {'$nearSphere' : [43.7, 48.8], '$maxDistance' : 0.03135711885774796}}

As you can see using a Distance equipped with a Metric causes $nearSphere clause to be added instead of a plain $near. Beyond that the actual distance gets calculated according to the Metrics used.

Note
Using @GeoSpatialIndexed(type = GeoSpatialIndexType.GEO_2DSPHERE) on the target property forces usage of $nearSphere operator.

Geo-near queries

public interface PersonRepository extends ReactiveMongoRepository<Person, String>

  // {'geoNear' : 'location', 'near' : [x, y] }
  Flux<GeoResult<Person>> findByLocationNear(Point location);

  // No metric: {'geoNear' : 'person', 'near' : [x, y], maxDistance : distance }
  // Metric: {'geoNear' : 'person', 'near' : [x, y], 'maxDistance' : distance,
  //          'distanceMultiplier' : metric.multiplier, 'spherical' : true }
  Flux<GeoResult<Person>> findByLocationNear(Point location, Distance distance);

  // Metric: {'geoNear' : 'person', 'near' : [x, y], 'minDistance' : min,
  //          'maxDistance' : max, 'distanceMultiplier' : metric.multiplier,
  //          'spherical' : true }
  Flux<GeoResult<Person>> findByLocationNear(Point location, Distance min, Distance max);

  // {'geoNear' : 'location', 'near' : [x, y] }
  Flux<GeoResult<Person>> findByLocationNear(Point location);
}

Infinite Streams with Tailable Cursors

By default, MongoDB will automatically close a cursor when the client exhausts all results supplied by the cursor. Closing a cursor on exhaustion turns a stream into a finite stream. For capped collections you may use a Tailable Cursor that remains open after the client consumes all initially returned data. Using tailable cursors with a reactive data types allows construction of infinite streams. A tailable cursor remains open until it is closed externally. It emits data as new documents arrive in a capped collection.

Tailable cursors may become dead, or invalid, if either the query returns no match or the cursor returns the document at the "end" of the collection and then the application deletes that document.

Example 7. Infinite Stream queries with ReactiveMongoOperations
Flux<Person> stream = template.tail(query(where("name").is("Joe")), Person.class);

Disposable subscription = stream.doOnNext(person -> System.out.println(person)).subscribe();

// …

// Later: Dispose the subscription to close the stream
subscription.dispose();

Spring Data MongoDB Reactive repositories support infinite streams by annotating a query method with @Tailable. This works for methods returning Flux and other reactive types capable of emitting multiple elements.

Example 8. Infinite Stream queries with ReactiveMongoRepository
public interface PersonRepository extends ReactiveMongoRepository<Person, String> {

  @Tailable
  Flux<Person> findByFirstname(String firstname);

}

Flux<Person> stream = repository.findByFirstname("Joe");

Disposable subscription = stream.doOnNext(System.out::println).subscribe();

// …

// Later: Dispose the subscription to close the stream
subscription.dispose();
Tip
Capped collections can be created via MongoOperations.createCollection. Just provide the required CollectionOptions.empty().capped()…​