Skip to content

Commit 0ed3572

Browse files
authored
3.x: Add S/C retryUntil + marbles (#6869)
1 parent 320a675 commit 0ed3572

File tree

6 files changed

+175
-0
lines changed

6 files changed

+175
-0
lines changed

src/main/java/io/reactivex/rxjava3/core/Completable.java

+20
Original file line numberDiff line numberDiff line change
@@ -2317,6 +2317,26 @@ public final Completable retry(@NonNull Predicate<? super Throwable> predicate)
23172317
return fromPublisher(toFlowable().retry(predicate));
23182318
}
23192319

2320+
/**
2321+
* Retries until the given stop function returns {@code true}.
2322+
* <p>
2323+
* <img width="640" height="354" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.retryUntil.png" alt="">
2324+
* <dl>
2325+
* <dt><b>Scheduler:</b></dt>
2326+
* <dd>{@code retryUntil} does not operate by default on a particular {@link Scheduler}.</dd>
2327+
* </dl>
2328+
* @param stop the function that should return {@code true} to stop retrying
2329+
* @return the new {@code Completable} instance
2330+
* @throws NullPointerException if {@code stop} is {@code null}
2331+
*/
2332+
@CheckReturnValue
2333+
@NonNull
2334+
@SchedulerSupport(SchedulerSupport.NONE)
2335+
public final Completable retryUntil(@NonNull BooleanSupplier stop) {
2336+
Objects.requireNonNull(stop, "stop is null");
2337+
return retry(Long.MAX_VALUE, Functions.predicateReverseFor(stop));
2338+
}
2339+
23202340
/**
23212341
* Returns a {@code Completable} which given a {@link Publisher} and when this {@code Completable} emits an error, delivers
23222342
* that error through a {@link Flowable} and the {@code Publisher} should signal a value indicating a retry in response

src/main/java/io/reactivex/rxjava3/core/Maybe.java

+2
Original file line numberDiff line numberDiff line change
@@ -4433,6 +4433,8 @@ public final Maybe<T> retry(@NonNull Predicate<? super Throwable> predicate) {
44334433

44344434
/**
44354435
* Retries until the given stop function returns {@code true}.
4436+
* <p>
4437+
* <img width="640" height="285" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Maybe.retryUntil.png" alt="">
44364438
* <dl>
44374439
* <dt><b>Scheduler:</b></dt>
44384440
* <dd>{@code retryUntil} does not operate by default on a particular {@link Scheduler}.</dd>

src/main/java/io/reactivex/rxjava3/core/Single.java

+20
Original file line numberDiff line numberDiff line change
@@ -3747,6 +3747,26 @@ public final Single<T> retry(@NonNull Predicate<? super Throwable> predicate) {
37473747
return toSingle(toFlowable().retry(predicate));
37483748
}
37493749

3750+
/**
3751+
* Retries until the given stop function returns {@code true}.
3752+
* <p>
3753+
* <img width="640" height="364" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.retryUntil.png" alt="">
3754+
* <dl>
3755+
* <dt><b>Scheduler:</b></dt>
3756+
* <dd>{@code retryUntil} does not operate by default on a particular {@link Scheduler}.</dd>
3757+
* </dl>
3758+
* @param stop the function that should return {@code true} to stop retrying
3759+
* @return the new {@code Single} instance
3760+
* @throws NullPointerException if {@code stop} is {@code null}
3761+
*/
3762+
@CheckReturnValue
3763+
@NonNull
3764+
@SchedulerSupport(SchedulerSupport.NONE)
3765+
public final Single<T> retryUntil(@NonNull BooleanSupplier stop) {
3766+
Objects.requireNonNull(stop, "stop is null");
3767+
return retry(Long.MAX_VALUE, Functions.predicateReverseFor(stop));
3768+
}
3769+
37503770
/**
37513771
* Re-subscribes to the current {@code Single} if and when the {@link Publisher} returned by the handler
37523772
* function signals a value.

src/test/java/io/reactivex/rxjava3/completable/CompletableRetryTest.java

+39
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.junit.Test;
2121

2222
import io.reactivex.rxjava3.core.*;
23+
import io.reactivex.rxjava3.exceptions.TestException;
2324
import io.reactivex.rxjava3.functions.*;
2425
import io.reactivex.rxjava3.internal.functions.Functions;
2526

@@ -113,4 +114,42 @@ public void retryTimesPredicateWithZeroRetries() {
113114

114115
assertEquals(1, numberOfSubscribeCalls.get());
115116
}
117+
118+
@Test
119+
public void untilTrueEmpty() {
120+
Completable.complete()
121+
.retryUntil(() -> true)
122+
.test()
123+
.assertResult();
124+
}
125+
126+
@Test
127+
public void untilFalseEmpty() {
128+
Completable.complete()
129+
.retryUntil(() -> false)
130+
.test()
131+
.assertResult();
132+
}
133+
134+
@Test
135+
public void untilTrueError() {
136+
Completable.error(new TestException())
137+
.retryUntil(() -> true)
138+
.test()
139+
.assertFailure(TestException.class);
140+
}
141+
142+
@Test
143+
public void untilFalseError() {
144+
AtomicInteger counter = new AtomicInteger();
145+
Completable.defer(() -> {
146+
if (counter.getAndIncrement() == 0) {
147+
return Completable.error(new TestException());
148+
}
149+
return Completable.complete();
150+
})
151+
.retryUntil(() -> false)
152+
.test()
153+
.assertResult();
154+
}
116155
}

src/test/java/io/reactivex/rxjava3/maybe/MaybeRetryTest.java

+55
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.junit.Test;
2222

2323
import io.reactivex.rxjava3.core.*;
24+
import io.reactivex.rxjava3.exceptions.TestException;
2425
import io.reactivex.rxjava3.functions.Predicate;
2526
import io.reactivex.rxjava3.internal.functions.Functions;
2627

@@ -120,4 +121,58 @@ public void retryTimesPredicateWithZeroRetries() {
120121

121122
assertEquals(1, numberOfSubscribeCalls.get());
122123
}
124+
125+
@Test
126+
public void untilTrueJust() {
127+
Maybe.just(1)
128+
.retryUntil(() -> true)
129+
.test()
130+
.assertResult(1);
131+
}
132+
133+
@Test
134+
public void untilFalseJust() {
135+
Maybe.just(1)
136+
.retryUntil(() -> false)
137+
.test()
138+
.assertResult(1);
139+
}
140+
141+
@Test
142+
public void untilTrueEmpty() {
143+
Maybe.empty()
144+
.retryUntil(() -> true)
145+
.test()
146+
.assertResult();
147+
}
148+
149+
@Test
150+
public void untilFalseEmpty() {
151+
Maybe.empty()
152+
.retryUntil(() -> false)
153+
.test()
154+
.assertResult();
155+
}
156+
157+
@Test
158+
public void untilTrueError() {
159+
Maybe.error(new TestException())
160+
.retryUntil(() -> true)
161+
.test()
162+
.assertFailure(TestException.class);
163+
}
164+
165+
@Test
166+
public void untilFalseError() {
167+
AtomicInteger counter = new AtomicInteger();
168+
Maybe.defer(() -> {
169+
if (counter.getAndIncrement() == 0) {
170+
return Maybe.error(new TestException());
171+
}
172+
return Maybe.just(1);
173+
})
174+
.retryUntil(() -> false)
175+
.test()
176+
.assertResult(1);
177+
}
123178
}

src/test/java/io/reactivex/rxjava3/single/SingleRetryTest.java

+39
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.junit.Test;
2222

2323
import io.reactivex.rxjava3.core.*;
24+
import io.reactivex.rxjava3.exceptions.TestException;
2425
import io.reactivex.rxjava3.functions.Predicate;
2526
import io.reactivex.rxjava3.internal.functions.Functions;
2627

@@ -120,4 +121,42 @@ public void retryTimesPredicateWithZeroRetries() {
120121

121122
assertEquals(1, numberOfSubscribeCalls.get());
122123
}
124+
125+
@Test
126+
public void untilTrueJust() {
127+
Single.just(1)
128+
.retryUntil(() -> true)
129+
.test()
130+
.assertResult(1);
131+
}
132+
133+
@Test
134+
public void untilFalseJust() {
135+
Single.just(1)
136+
.retryUntil(() -> false)
137+
.test()
138+
.assertResult(1);
139+
}
140+
141+
@Test
142+
public void untilTrueError() {
143+
Single.error(new TestException())
144+
.retryUntil(() -> true)
145+
.test()
146+
.assertFailure(TestException.class);
147+
}
148+
149+
@Test
150+
public void untilFalseError() {
151+
AtomicInteger counter = new AtomicInteger();
152+
Single.defer(() -> {
153+
if (counter.getAndIncrement() == 0) {
154+
return Single.error(new TestException());
155+
}
156+
return Single.just(1);
157+
})
158+
.retryUntil(() -> false)
159+
.test()
160+
.assertResult(1);
161+
}
123162
}

0 commit comments

Comments
 (0)