Skip to content

Commit bb3260e

Browse files
pestradaakarnokd
authored andcommitted
add delayError to Maybe.delay (#6864)
1 parent 49f1a6d commit bb3260e

File tree

4 files changed

+102
-13
lines changed

4 files changed

+102
-13
lines changed

src/main/java/io/reactivex/rxjava3/core/Maybe.java

+59-6
Original file line numberDiff line numberDiff line change
@@ -2672,6 +2672,7 @@ public final Single<T> defaultIfEmpty(@NonNull T defaultItem) {
26722672
/**
26732673
* Returns a {@code Maybe} that signals the events emitted by the current {@code Maybe} shifted forward in time by a
26742674
* specified delay.
2675+
* An error signal will not be delayed.
26752676
* <p>
26762677
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/delay.png" alt="">
26772678
* <dl>
@@ -2682,17 +2683,68 @@ public final Single<T> defaultIfEmpty(@NonNull T defaultItem) {
26822683
* @param time
26832684
* the delay to shift the source by
26842685
* @param unit
2685-
* the {@link TimeUnit} in which {@code period} is defined
2686+
* the {@link TimeUnit} in which {@code time} is defined
26862687
* @return the new {@code Maybe} instance
26872688
* @throws NullPointerException if {@code unit} is {@code null}
26882689
* @see <a href="http://reactivex.io/documentation/operators/delay.html">ReactiveX operators documentation: Delay</a>
2689-
* @see #delay(long, TimeUnit, Scheduler)
2690+
* @see #delay(long, TimeUnit, Scheduler, boolean)
26902691
*/
26912692
@CheckReturnValue
26922693
@SchedulerSupport(SchedulerSupport.COMPUTATION)
26932694
@NonNull
26942695
public final Maybe<T> delay(long time, @NonNull TimeUnit unit) {
2695-
return delay(time, unit, Schedulers.computation());
2696+
return delay(time, unit, Schedulers.computation(), false);
2697+
}
2698+
2699+
/**
2700+
* Returns a {@code Maybe} that signals the events emitted by the current {@code Maybe} shifted forward in time by a
2701+
* specified delay.
2702+
* <p>
2703+
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/delay.png" alt="">
2704+
* <dl>
2705+
* <dt><b>Scheduler:</b></dt>
2706+
* <dd>This version of {@code delay} operates by default on the {@code computation} {@link Scheduler}.</dd>
2707+
* </dl>
2708+
*
2709+
* @param time the delay to shift the source by
2710+
* @param unit the {@link TimeUnit} in which {@code time} is defined
2711+
* @param delayError if {@code true}, both success and error signals are delayed. if {@code false}, only success signals are delayed.
2712+
* @return the new {@code Maybe} instance
2713+
* @throws NullPointerException if {@code unit} is {@code null}
2714+
* @see <a href="http://reactivex.io/documentation/operators/delay.html">ReactiveX operators documentation: Delay</a>
2715+
* @see #delay(long, TimeUnit, Scheduler, boolean)
2716+
*/
2717+
@CheckReturnValue
2718+
@SchedulerSupport(SchedulerSupport.COMPUTATION)
2719+
@NonNull
2720+
public final Maybe<T> delay(long time, @NonNull TimeUnit unit, boolean delayError) {
2721+
return delay(time, unit, Schedulers.computation(), delayError);
2722+
}
2723+
2724+
/**
2725+
* Returns a {@code Maybe} that signals the events emitted by the current {@code Maybe} shifted forward in time by a
2726+
* specified delay.
2727+
* An error signal will not be delayed.
2728+
* <p>
2729+
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/delay.png" alt="">
2730+
* <dl>
2731+
* <dt><b>Scheduler:</b></dt>
2732+
* <dd>you specify the {@link Scheduler} where the non-blocking wait and emission happens</dd>
2733+
* </dl>
2734+
*
2735+
* @param time the delay to shift the source by
2736+
* @param unit the {@link TimeUnit} in which {@code time} is defined
2737+
* @param scheduler the {@code Scheduler} to use for delaying
2738+
* @return the new {@code Maybe} instance
2739+
* @throws NullPointerException if {@code unit} or {@code scheduler} is {@code null}
2740+
* @see <a href="http://reactivex.io/documentation/operators/delay.html">ReactiveX operators documentation: Delay</a>
2741+
* @see #delay(long, TimeUnit, Scheduler, boolean)
2742+
*/
2743+
@CheckReturnValue
2744+
@SchedulerSupport(SchedulerSupport.CUSTOM)
2745+
@NonNull
2746+
public final Maybe<T> delay(long time, @NonNull TimeUnit unit, @NonNull Scheduler scheduler) {
2747+
return delay(time, unit, scheduler, false);
26962748
}
26972749

26982750
/**
@@ -2708,20 +2760,21 @@ public final Maybe<T> delay(long time, @NonNull TimeUnit unit) {
27082760
* @param time
27092761
* the delay to shift the source by
27102762
* @param unit
2711-
* the time unit of {@code delay}
2763+
* the {@link TimeUnit} in which {@code time} is defined
27122764
* @param scheduler
27132765
* the {@code Scheduler} to use for delaying
2766+
* @param delayError if {@code true}, both success and error signals are delayed. if {@code false}, only success signals are delayed.
27142767
* @return the new {@code Maybe} instance
27152768
* @throws NullPointerException if {@code unit} or {@code scheduler} is {@code null}
27162769
* @see <a href="http://reactivex.io/documentation/operators/delay.html">ReactiveX operators documentation: Delay</a>
27172770
*/
27182771
@CheckReturnValue
27192772
@NonNull
27202773
@SchedulerSupport(SchedulerSupport.CUSTOM)
2721-
public final Maybe<T> delay(long time, @NonNull TimeUnit unit, @NonNull Scheduler scheduler) {
2774+
public final Maybe<T> delay(long time, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, boolean delayError) {
27222775
Objects.requireNonNull(unit, "unit is null");
27232776
Objects.requireNonNull(scheduler, "scheduler is null");
2724-
return RxJavaPlugins.onAssembly(new MaybeDelay<>(this, Math.max(0L, time), unit, scheduler));
2777+
return RxJavaPlugins.onAssembly(new MaybeDelay<>(this, Math.max(0L, time), unit, scheduler, delayError));
27252778
}
27262779

27272780
/**

src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeDelay.java

+13-7
Original file line numberDiff line numberDiff line change
@@ -33,16 +33,19 @@ public final class MaybeDelay<T> extends AbstractMaybeWithUpstream<T, T> {
3333

3434
final Scheduler scheduler;
3535

36-
public MaybeDelay(MaybeSource<T> source, long delay, TimeUnit unit, Scheduler scheduler) {
36+
final boolean delayError;
37+
38+
public MaybeDelay(MaybeSource<T> source, long delay, TimeUnit unit, Scheduler scheduler, boolean delayError) {
3739
super(source);
3840
this.delay = delay;
3941
this.unit = unit;
4042
this.scheduler = scheduler;
43+
this.delayError = delayError;
4144
}
4245

4346
@Override
4447
protected void subscribeActual(MaybeObserver<? super T> observer) {
45-
source.subscribe(new DelayMaybeObserver<>(observer, delay, unit, scheduler));
48+
source.subscribe(new DelayMaybeObserver<>(observer, delay, unit, scheduler, delayError));
4649
}
4750

4851
static final class DelayMaybeObserver<T>
@@ -59,15 +62,18 @@ static final class DelayMaybeObserver<T>
5962

6063
final Scheduler scheduler;
6164

65+
final boolean delayError;
66+
6267
T value;
6368

6469
Throwable error;
6570

66-
DelayMaybeObserver(MaybeObserver<? super T> actual, long delay, TimeUnit unit, Scheduler scheduler) {
71+
DelayMaybeObserver(MaybeObserver<? super T> actual, long delay, TimeUnit unit, Scheduler scheduler, boolean delayError) {
6772
this.downstream = actual;
6873
this.delay = delay;
6974
this.unit = unit;
7075
this.scheduler = scheduler;
76+
this.delayError = delayError;
7177
}
7278

7379
@Override
@@ -105,21 +111,21 @@ public void onSubscribe(Disposable d) {
105111
@Override
106112
public void onSuccess(T value) {
107113
this.value = value;
108-
schedule();
114+
schedule(delay);
109115
}
110116

111117
@Override
112118
public void onError(Throwable e) {
113119
this.error = e;
114-
schedule();
120+
schedule(delayError ? delay : 0);
115121
}
116122

117123
@Override
118124
public void onComplete() {
119-
schedule();
125+
schedule(delay);
120126
}
121127

122-
void schedule() {
128+
void schedule(long delay) {
123129
DisposableHelper.replace(this, scheduler.scheduleDirect(this, delay, unit));
124130
}
125131
}

src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeDelayTest.java

+28
Original file line numberDiff line numberDiff line change
@@ -96,4 +96,32 @@ public Maybe<Object> apply(Maybe<Object> f) throws Exception {
9696
}
9797
});
9898
}
99+
100+
@Test
101+
public void delayedErrorOnSuccess() {
102+
final TestScheduler scheduler = new TestScheduler();
103+
final TestObserver<Integer> observer = Maybe.just(1)
104+
.delay(5, TimeUnit.SECONDS, scheduler, true)
105+
.test();
106+
107+
scheduler.advanceTimeTo(2, TimeUnit.SECONDS);
108+
observer.assertNoValues();
109+
110+
scheduler.advanceTimeTo(5, TimeUnit.SECONDS);
111+
observer.assertValue(1);
112+
}
113+
114+
@Test
115+
public void delayedErrorOnError() {
116+
final TestScheduler scheduler = new TestScheduler();
117+
final TestObserver<?> observer = Maybe.error(new TestException())
118+
.delay(5, TimeUnit.SECONDS, scheduler, true)
119+
.test();
120+
121+
scheduler.advanceTimeTo(2, TimeUnit.SECONDS);
122+
observer.assertNoErrors();
123+
124+
scheduler.advanceTimeTo(5, TimeUnit.SECONDS);
125+
observer.assertError(TestException.class);
126+
}
99127
}

src/test/java/io/reactivex/rxjava3/validators/ParamValidationCheckerTest.java

+2
Original file line numberDiff line numberDiff line change
@@ -289,6 +289,8 @@ public void checkParallelFlowable() {
289289
// negative time is considered as zero time
290290
addOverride(new ParamOverride(Maybe.class, 0, ParamMode.ANY, "delay", Long.TYPE, TimeUnit.class));
291291
addOverride(new ParamOverride(Maybe.class, 0, ParamMode.ANY, "delay", Long.TYPE, TimeUnit.class, Scheduler.class));
292+
addOverride(new ParamOverride(Maybe.class, 0, ParamMode.ANY, "delay", Long.TYPE, TimeUnit.class, Boolean.TYPE));
293+
addOverride(new ParamOverride(Maybe.class, 0, ParamMode.ANY, "delay", Long.TYPE, TimeUnit.class, Scheduler.class, Boolean.TYPE));
292294

293295
// zero repeat is allowed
294296
addOverride(new ParamOverride(Maybe.class, 0, ParamMode.NON_NEGATIVE, "repeat", Long.TYPE));

0 commit comments

Comments
 (0)