Skip to content

DATAMONGO-1667 - Rename @InfiniteStream to @Tailable. #458

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

<groupId>org.springframework.data</groupId>
<artifactId>spring-data-mongodb-parent</artifactId>
<version>2.0.0.BUILD-SNAPSHOT</version>
<version>2.0.0.DATAMONGO-1667-SNAPSHOT</version>
<packaging>pom</packaging>

<name>Spring Data MongoDB</name>
Expand Down
4 changes: 2 additions & 2 deletions spring-data-mongodb-cross-store/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-mongodb-parent</artifactId>
<version>2.0.0.BUILD-SNAPSHOT</version>
<version>2.0.0.DATAMONGO-1667-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down Expand Up @@ -48,7 +48,7 @@
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-mongodb</artifactId>
<version>2.0.0.BUILD-SNAPSHOT</version>
<version>2.0.0.DATAMONGO-1667-SNAPSHOT</version>
</dependency>

<!-- reactive -->
Expand Down
2 changes: 1 addition & 1 deletion spring-data-mongodb-distribution/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
<parent>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-mongodb-parent</artifactId>
<version>2.0.0.BUILD-SNAPSHOT</version>
<version>2.0.0.DATAMONGO-1667-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion spring-data-mongodb-log4j/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-mongodb-parent</artifactId>
<version>2.0.0.BUILD-SNAPSHOT</version>
<version>2.0.0.DATAMONGO-1667-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion spring-data-mongodb/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
<parent>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-mongodb-parent</artifactId>
<version>2.0.0.BUILD-SNAPSHOT</version>
<version>2.0.0.DATAMONGO-1667-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@
import org.springframework.data.annotation.QueryAnnotation;

/**
* Annotation to declare an infinite stream using repository query methods. An infinite stream uses MongoDB's
* {@link com.mongodb.CursorType#TailableAwait tailable} cursors to retrieve data from a capped collection and stream
* data as it is inserted into the collection. An infinite stream can only be used with streams that emit more than one
* element, such as {@link reactor.core.publisher.Flux} or {@link rx.Observable}.
* Annotation to declare an infinite stream using MongoDB's {@link com.mongodb.CursorType#TailableAwait tailable}
* cursors. An infinite stream can only be used with capped collections. Objects are emitted through the stream as data
* is inserted into the collection. An infinite stream can only be used with streams that emit more than one element,
* such as {@link reactor.core.publisher.Flux}.
* <p>
* The stream may become dead, or invalid, if either the query returns no match or the cursor returns the document at
* the "end" of the collection and then the application deletes that document.
Expand All @@ -37,11 +37,12 @@
*
* @author Mark Paluch
* @see <a href="https://docs.mongodb.com/manual/core/tailable-cursors/">Tailable Cursors</a>
* @since 2.0
*/
@Retention(RetentionPolicy.RUNTIME)
@Target({ ElementType.METHOD, ElementType.ANNOTATION_TYPE })
@Documented
@QueryAnnotation
public @interface InfiniteStream {
public @interface Tailable {

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016 the original author or authors.
* Copyright 2016-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -130,7 +130,7 @@ private ReactiveMongoQueryExecution getExecutionToWrap(MongoParameterAccessor ac
return new DeleteExecution(operations, method);
} else if (method.isGeoNearQuery()) {
return new GeoNearExecution(operations, accessor, method.getReturnType());
} else if (isInfiniteStream(method)) {
} else if (isTailable(method)) {
return new TailExecution(operations, accessor.getPageable());
} else if (method.isCollectionQuery()) {
return new CollectionExecution(operations, accessor.getPageable());
Expand All @@ -139,8 +139,8 @@ private ReactiveMongoQueryExecution getExecutionToWrap(MongoParameterAccessor ac
}
}

private boolean isInfiniteStream(MongoQueryMethod method) {
return method.getInfiniteStreamAnnotation() != null;
private boolean isTailable(MongoQueryMethod method) {
return method.getTailableAnnotation() != null;
}

Query applyQueryMetaAttributesWhenPresent(Query query) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2011-2016 the original author or authors.
* Copyright 2011-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -29,9 +29,9 @@
import org.springframework.data.mapping.context.MappingContext;
import org.springframework.data.mongodb.core.mapping.MongoPersistentEntity;
import org.springframework.data.mongodb.core.mapping.MongoPersistentProperty;
import org.springframework.data.mongodb.repository.InfiniteStream;
import org.springframework.data.mongodb.repository.Meta;
import org.springframework.data.mongodb.repository.Query;
import org.springframework.data.mongodb.repository.Tailable;
import org.springframework.data.projection.ProjectionFactory;
import org.springframework.data.repository.core.RepositoryMetadata;
import org.springframework.data.repository.query.QueryMethod;
Expand All @@ -44,7 +44,7 @@

/**
* Mongo specific implementation of {@link QueryMethod}.
*
*
* @author Oliver Gierke
* @author Christoph Strobl
* @author Mark Paluch
Expand All @@ -61,7 +61,7 @@ public class MongoQueryMethod extends QueryMethod {

/**
* Creates a new {@link MongoQueryMethod} from the given {@link Method}.
*
*
* @param method must not be {@literal null}.
* @param metadata must not be {@literal null}.
* @param projectionFactory must not be {@literal null}.
Expand Down Expand Up @@ -89,7 +89,7 @@ protected MongoParameters createParameters(Method method) {

/**
* Returns whether the method has an annotated query.
*
*
* @return
*/
public boolean hasAnnotatedQuery() {
Expand All @@ -99,7 +99,7 @@ public boolean hasAnnotatedQuery() {
/**
* Returns the query string declared in a {@link Query} annotation or {@literal null} if neither the annotation found
* nor the attribute was specified.
*
*
* @return
*/
String getAnnotatedQuery() {
Expand All @@ -110,7 +110,7 @@ String getAnnotatedQuery() {

/**
* Returns the field specification to be used for the query.
*
*
* @return
*/
String getFieldSpecification() {
Expand All @@ -119,7 +119,7 @@ String getFieldSpecification() {
return StringUtils.hasText(value) ? value : null;
}

/*
/*
* (non-Javadoc)
* @see org.springframework.data.repository.query.QueryMethod#getEntityInformation()
*/
Expand Down Expand Up @@ -165,7 +165,7 @@ public MongoParameters getParameters() {

/**
* Returns whether the query is a geo near query.
*
*
* @return
*/
public boolean isGeoNearQuery() {
Expand All @@ -192,7 +192,7 @@ private boolean isGeoNearQuery(Method method) {

/**
* Returns the {@link Query} annotation that is applied to the method or {@code null} if none available.
*
*
* @return
*/
Query getQueryAnnotation() {
Expand All @@ -213,7 +213,7 @@ public boolean hasQueryMetaAttributes() {

/**
* Returns the {@link Meta} annotation that is applied to the method or {@code null} if not available.
*
*
* @return
* @since 1.6
*/
Expand All @@ -222,18 +222,18 @@ Meta getMetaAnnotation() {
}

/**
* Returns the {@link InfiniteStream} annotation that is applied to the method or {@code null} if not available.
* Returns the {@link Tailable} annotation that is applied to the method or {@code null} if not available.
*
* @return
* @since 2.0
*/
InfiniteStream getInfiniteStreamAnnotation() {
return AnnotatedElementUtils.findMergedAnnotation(method, InfiniteStream.class);
Tailable getTailableAnnotation() {
return AnnotatedElementUtils.findMergedAnnotation(method, Tailable.class);
}

/**
* Returns the {@link org.springframework.data.mongodb.core.query.Meta} attributes to be applied.
*
*
* @return never {@literal null}.
* @since 1.6
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ public void shouldFindByLastNameAndSort() {
}

@Test // DATAMONGO-1444
public void shouldUseInfiniteStream() throws Exception {
public void shouldUseTailableCursor() throws Exception {

StepVerifier
.create(template.dropCollection(Capped.class) //
Expand All @@ -195,7 +195,7 @@ public void shouldUseInfiniteStream() throws Exception {
}

@Test // DATAMONGO-1444
public void shouldUseInfiniteStreamWithProjection() throws Exception {
public void shouldUseTailableCursorWithProjection() throws Exception {

StepVerifier
.create(template.dropCollection(Capped.class) //
Expand Down Expand Up @@ -329,10 +329,10 @@ interface ReactivePersonRepository extends ReactiveMongoRepository<Person, Strin

interface ReactiveCappedCollectionRepository extends Repository<Capped, String> {

@InfiniteStream
@Tailable
Flux<Capped> findByKey(String key);

@InfiniteStream
@Tailable
Flux<CappedProjection> findProjectionByKey(String key);
}

Expand Down
9 changes: 5 additions & 4 deletions src/main/asciidoc/reference/reactive-mongo-repositories.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -188,18 +188,18 @@ public interface PersonRepository extends ReactiveMongoRepository<Person, String
----

[[mongo.reactive.repositories.infinite-streams]]
== Infinite Streams
== Infinite Streams with Tailable Cursors

By default, MongoDB will automatically close a cursor when the client has exhausted all results in the cursor. Closing a cursors turns a Stream into a finite stream. However, for capped collections you may use a https://docs.mongodb.com/manual/core/tailable-cursors/[Tailable Cursor] that remains open after the client exhausts the results in the initial cursor. Using Tailable Cursors with a reactive approach allows construction of infinite streams. A Tailable Cursor remains open until it's closed. It emits data as data arrives in a capped collection. Using Tailable Cursors with Collections is not possible as its result would never complete.
By default, MongoDB will automatically close a cursor when the client has exhausted all results in the cursor. Closing a cursors turns a Stream into a finite stream. However, for capped collections you may use a https://docs.mongodb.com/manual/core/tailable-cursors/[Tailable Cursor] that remains open after the client exhausts the results in the initial cursor. Using tailable Cursors with a reactive approach allows construction of infinite streams. A tailable Cursor remains open until it's closed. It emits data as data arrives in a capped collection. Using Tailable Cursors with Collections is not possible as its result would never complete.

Spring Data MongoDB Reactive Repository support supports infinite streams by annotating a query method with `@InfiniteStream`. This works for methods returning `Flux` or `Observable` wrapper types.
Spring Data MongoDB Reactive Repository support supports infinite streams by annotating a query method with `@TailableCursor`. This works for methods returning `Flux` or `Observable` wrapper types.

[source,java]
----

public interface PersonRepository extends ReactiveMongoRepository<Person, String> {

@InfiniteStream
@Tailable
Flux<Person> findByFirstname(String firstname);

}
Expand All @@ -208,6 +208,7 @@ Flux<Person> stream = repository.findByFirstname("Joe");

Disposable subscription = stream.doOnNext(person -> System.out.println(person)).subscribe();


// …

// Later: Dispose the stream
Expand Down