Skip to content

Commit 6126752

Browse files
artem-zinnatullinakarnokd
authored andcommitted
Add timeout and unit to TimeoutException message (#6234)
* Add timeout and unit to TimeoutException message * New timeout message, add tests
1 parent 1ea1e2a commit 6126752

17 files changed

+125
-55
lines changed

src/main/java/io/reactivex/internal/observers/BlockingMultiObserver.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import io.reactivex.disposables.Disposable;
2020
import io.reactivex.internal.util.*;
2121

22+
import static io.reactivex.internal.util.ExceptionHelper.timeoutMessage;
23+
2224
/**
2325
* A combined Observer that awaits the success or error signal via a CountDownLatch.
2426
* @param <T> the value type
@@ -148,7 +150,7 @@ public Throwable blockingGetError(long timeout, TimeUnit unit) {
148150
BlockingHelper.verifyNonBlocking();
149151
if (!await(timeout, unit)) {
150152
dispose();
151-
throw ExceptionHelper.wrapOrThrow(new TimeoutException());
153+
throw ExceptionHelper.wrapOrThrow(new TimeoutException(timeoutMessage(timeout, unit)));
152154
}
153155
} catch (InterruptedException ex) {
154156
dispose();

src/main/java/io/reactivex/internal/observers/FutureObserver.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import io.reactivex.internal.util.BlockingHelper;
2424
import io.reactivex.plugins.RxJavaPlugins;
2525

26+
import static io.reactivex.internal.util.ExceptionHelper.timeoutMessage;
27+
2628
/**
2729
* An Observer + Future that expects exactly one upstream value and provides it
2830
* via the (blocking) Future API.
@@ -92,7 +94,7 @@ public T get(long timeout, TimeUnit unit) throws InterruptedException, Execution
9294
if (getCount() != 0) {
9395
BlockingHelper.verifyNonBlocking();
9496
if (!await(timeout, unit)) {
95-
throw new TimeoutException();
97+
throw new TimeoutException(timeoutMessage(timeout, unit));
9698
}
9799
}
98100

src/main/java/io/reactivex/internal/observers/FutureSingleObserver.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import io.reactivex.internal.util.BlockingHelper;
2323
import io.reactivex.plugins.RxJavaPlugins;
2424

25+
import static io.reactivex.internal.util.ExceptionHelper.timeoutMessage;
26+
2527
/**
2628
* An Observer + Future that expects exactly one upstream value and provides it
2729
* via the (blocking) Future API.
@@ -91,7 +93,7 @@ public T get(long timeout, TimeUnit unit) throws InterruptedException, Execution
9193
if (getCount() != 0) {
9294
BlockingHelper.verifyNonBlocking();
9395
if (!await(timeout, unit)) {
94-
throw new TimeoutException();
96+
throw new TimeoutException(timeoutMessage(timeout, unit));
9597
}
9698
}
9799

src/main/java/io/reactivex/internal/operators/completable/CompletableTimeout.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import io.reactivex.disposables.*;
2121
import io.reactivex.plugins.RxJavaPlugins;
2222

23+
import static io.reactivex.internal.util.ExceptionHelper.timeoutMessage;
24+
2325
public final class CompletableTimeout extends Completable {
2426

2527
final CompletableSource source;
@@ -104,7 +106,7 @@ public void run() {
104106
if (once.compareAndSet(false, true)) {
105107
set.clear();
106108
if (other == null) {
107-
downstream.onError(new TimeoutException());
109+
downstream.onError(new TimeoutException(timeoutMessage(timeout, unit)));
108110
} else {
109111
other.subscribe(new DisposeObserver());
110112
}

src/main/java/io/reactivex/internal/operators/flowable/FlowableTimeoutTimed.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import io.reactivex.internal.subscriptions.*;
2424
import io.reactivex.plugins.RxJavaPlugins;
2525

26+
import static io.reactivex.internal.util.ExceptionHelper.timeoutMessage;
27+
2628
public final class FlowableTimeoutTimed<T> extends AbstractFlowableWithUpstream<T, T> {
2729
final long timeout;
2830
final TimeUnit unit;
@@ -134,7 +136,7 @@ public void onTimeout(long idx) {
134136
if (compareAndSet(idx, Long.MAX_VALUE)) {
135137
SubscriptionHelper.cancel(upstream);
136138

137-
downstream.onError(new TimeoutException());
139+
downstream.onError(new TimeoutException(timeoutMessage(timeout, unit)));
138140

139141
worker.dispose();
140142
}

src/main/java/io/reactivex/internal/operators/observable/ObservableTimeoutTimed.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import io.reactivex.internal.disposables.*;
2222
import io.reactivex.plugins.RxJavaPlugins;
2323

24+
import static io.reactivex.internal.util.ExceptionHelper.timeoutMessage;
25+
2426
public final class ObservableTimeoutTimed<T> extends AbstractObservableWithUpstream<T, T> {
2527
final long timeout;
2628
final TimeUnit unit;
@@ -129,7 +131,7 @@ public void onTimeout(long idx) {
129131
if (compareAndSet(idx, Long.MAX_VALUE)) {
130132
DisposableHelper.dispose(upstream);
131133

132-
downstream.onError(new TimeoutException());
134+
downstream.onError(new TimeoutException(timeoutMessage(timeout, unit)));
133135

134136
worker.dispose();
135137
}

src/main/java/io/reactivex/internal/operators/single/SingleTimeout.java

+11-3
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import io.reactivex.internal.disposables.DisposableHelper;
2222
import io.reactivex.plugins.RxJavaPlugins;
2323

24+
import static io.reactivex.internal.util.ExceptionHelper.timeoutMessage;
25+
2426
public final class SingleTimeout<T> extends Single<T> {
2527

2628
final SingleSource<T> source;
@@ -45,7 +47,7 @@ public SingleTimeout(SingleSource<T> source, long timeout, TimeUnit unit, Schedu
4547
@Override
4648
protected void subscribeActual(final SingleObserver<? super T> observer) {
4749

48-
TimeoutMainObserver<T> parent = new TimeoutMainObserver<T>(observer, other);
50+
TimeoutMainObserver<T> parent = new TimeoutMainObserver<T>(observer, other, timeout, unit);
4951
observer.onSubscribe(parent);
5052

5153
DisposableHelper.replace(parent.task, scheduler.scheduleDirect(parent, timeout, unit));
@@ -66,6 +68,10 @@ static final class TimeoutMainObserver<T> extends AtomicReference<Disposable>
6668

6769
SingleSource<? extends T> other;
6870

71+
final long timeout;
72+
73+
final TimeUnit unit;
74+
6975
static final class TimeoutFallbackObserver<T> extends AtomicReference<Disposable>
7076
implements SingleObserver<T> {
7177

@@ -92,9 +98,11 @@ public void onError(Throwable e) {
9298
}
9399
}
94100

95-
TimeoutMainObserver(SingleObserver<? super T> actual, SingleSource<? extends T> other) {
101+
TimeoutMainObserver(SingleObserver<? super T> actual, SingleSource<? extends T> other, long timeout, TimeUnit unit) {
96102
this.downstream = actual;
97103
this.other = other;
104+
this.timeout = timeout;
105+
this.unit = unit;
98106
this.task = new AtomicReference<Disposable>();
99107
if (other != null) {
100108
this.fallback = new TimeoutFallbackObserver<T>(actual);
@@ -112,7 +120,7 @@ public void run() {
112120
}
113121
SingleSource<? extends T> other = this.other;
114122
if (other == null) {
115-
downstream.onError(new TimeoutException());
123+
downstream.onError(new TimeoutException(timeoutMessage(timeout, unit)));
116124
} else {
117125
this.other = null;
118126
other.subscribe(fallback);

src/main/java/io/reactivex/internal/subscribers/FutureSubscriber.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import io.reactivex.internal.util.BlockingHelper;
2525
import io.reactivex.plugins.RxJavaPlugins;
2626

27+
import static io.reactivex.internal.util.ExceptionHelper.timeoutMessage;
28+
2729
/**
2830
* A Subscriber + Future that expects exactly one upstream value and provides it
2931
* via the (blocking) Future API.
@@ -93,7 +95,7 @@ public T get(long timeout, TimeUnit unit) throws InterruptedException, Execution
9395
if (getCount() != 0) {
9496
BlockingHelper.verifyNonBlocking();
9597
if (!await(timeout, unit)) {
96-
throw new TimeoutException();
98+
throw new TimeoutException(timeoutMessage(timeout, unit));
9799
}
98100
}
99101

src/main/java/io/reactivex/internal/util/ExceptionHelper.java

+9
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package io.reactivex.internal.util;
1515

1616
import java.util.*;
17+
import java.util.concurrent.TimeUnit;
1718
import java.util.concurrent.atomic.AtomicReference;
1819

1920
import io.reactivex.exceptions.CompositeException;
@@ -121,6 +122,14 @@ public static <E extends Throwable> Exception throwIfThrowable(Throwable e) thro
121122
throw (E)e;
122123
}
123124

125+
public static String timeoutMessage(long timeout, TimeUnit unit) {
126+
return "The source did not signal an event for "
127+
+ timeout
128+
+ " "
129+
+ unit.toString().toLowerCase()
130+
+ " and has been terminated.";
131+
}
132+
124133
static final class Termination extends Throwable {
125134

126135
private static final long serialVersionUID = -4649703670690200604L;

src/test/java/io/reactivex/internal/observers/BlockingMultiObserverTest.java

+15
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,11 @@
1313

1414
package io.reactivex.internal.observers;
1515

16+
import static io.reactivex.internal.util.ExceptionHelper.timeoutMessage;
1617
import static org.junit.Assert.*;
1718

1819
import java.util.concurrent.TimeUnit;
20+
import java.util.concurrent.TimeoutException;
1921

2022
import org.junit.Test;
2123

@@ -132,4 +134,17 @@ public void run() {
132134

133135
assertTrue(bmo.blockingGetError(1, TimeUnit.MINUTES) instanceof TestException);
134136
}
137+
138+
@Test
139+
public void blockingGetErrorTimedOut() {
140+
final BlockingMultiObserver<Integer> bmo = new BlockingMultiObserver<Integer>();
141+
142+
try {
143+
assertNull(bmo.blockingGetError(1, TimeUnit.NANOSECONDS));
144+
fail("Should have thrown");
145+
} catch (RuntimeException expected) {
146+
assertEquals(TimeoutException.class, expected.getCause().getClass());
147+
assertEquals(timeoutMessage(1, TimeUnit.NANOSECONDS), expected.getCause().getMessage());
148+
}
149+
}
135150
}

src/test/java/io/reactivex/internal/observers/FutureObserverTest.java

+11
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
package io.reactivex.internal.observers;
1515

16+
import static io.reactivex.internal.util.ExceptionHelper.timeoutMessage;
1617
import static org.junit.Assert.*;
1718

1819
import java.util.*;
@@ -352,4 +353,14 @@ public void run() {
352353

353354
assertEquals(1, fo.get().intValue());
354355
}
356+
357+
@Test
358+
public void getTimedOut() throws Exception {
359+
try {
360+
fo.get(1, TimeUnit.NANOSECONDS);
361+
fail("Should have thrown");
362+
} catch (TimeoutException expected) {
363+
assertEquals(timeoutMessage(1, TimeUnit.NANOSECONDS), expected.getMessage());
364+
}
365+
}
355366
}

src/test/java/io/reactivex/internal/observers/FutureSingleObserverTest.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
package io.reactivex.internal.observers;
1515

16+
import static io.reactivex.internal.util.ExceptionHelper.timeoutMessage;
1617
import static org.junit.Assert.*;
1718

1819
import java.util.concurrent.*;
@@ -89,8 +90,8 @@ public void timeout() throws Exception {
8990
try {
9091
f.get(100, TimeUnit.MILLISECONDS);
9192
fail("Should have thrown");
92-
} catch (TimeoutException ex) {
93-
// expected
93+
} catch (TimeoutException expected) {
94+
assertEquals(timeoutMessage(100, TimeUnit.MILLISECONDS), expected.getMessage());
9495
}
9596
}
9697

src/test/java/io/reactivex/internal/operators/completable/CompletableTimeoutTest.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
package io.reactivex.internal.operators.completable;
1515

16+
import static io.reactivex.internal.util.ExceptionHelper.timeoutMessage;
1617
import static org.junit.Assert.*;
1718

1819
import java.util.List;
@@ -40,7 +41,7 @@ public void timeoutException() throws Exception {
4041
.timeout(100, TimeUnit.MILLISECONDS, Schedulers.io())
4142
.test()
4243
.awaitDone(5, TimeUnit.SECONDS)
43-
.assertFailure(TimeoutException.class);
44+
.assertFailureAndMessage(TimeoutException.class, timeoutMessage(100, TimeUnit.MILLISECONDS));
4445
}
4546

4647
@Test

src/test/java/io/reactivex/internal/operators/flowable/FlowableTimeoutTests.java

+15-20
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
package io.reactivex.internal.operators.flowable;
1515

16+
import static io.reactivex.internal.util.ExceptionHelper.timeoutMessage;
1617
import static org.junit.Assert.*;
1718
import static org.mockito.ArgumentMatchers.*;
1819
import static org.mockito.Mockito.*;
@@ -82,26 +83,24 @@ public void shouldNotTimeoutIfSecondOnNextWithinTimeout() {
8283

8384
@Test
8485
public void shouldTimeoutIfOnNextNotWithinTimeout() {
85-
Subscriber<String> subscriber = TestHelper.mockSubscriber();
86-
TestSubscriber<String> ts = new TestSubscriber<String>(subscriber);
86+
TestSubscriber<String> subscriber = new TestSubscriber<String>();
8787

88-
withTimeout.subscribe(ts);
88+
withTimeout.subscribe(subscriber);
8989

9090
testScheduler.advanceTimeBy(TIMEOUT + 1, TimeUnit.SECONDS);
91-
verify(subscriber).onError(any(TimeoutException.class));
92-
ts.dispose();
91+
subscriber.assertFailureAndMessage(TimeoutException.class, timeoutMessage(TIMEOUT, TIME_UNIT));
9392
}
9493

9594
@Test
9695
public void shouldTimeoutIfSecondOnNextNotWithinTimeout() {
97-
Subscriber<String> subscriber = TestHelper.mockSubscriber();
96+
TestSubscriber<String> subscriber = new TestSubscriber<String>();
9897
TestSubscriber<String> ts = new TestSubscriber<String>(subscriber);
9998
withTimeout.subscribe(subscriber);
10099
testScheduler.advanceTimeBy(2, TimeUnit.SECONDS);
101100
underlyingSubject.onNext("One");
102-
verify(subscriber).onNext("One");
101+
subscriber.assertValue("One");
103102
testScheduler.advanceTimeBy(TIMEOUT + 1, TimeUnit.SECONDS);
104-
verify(subscriber).onError(any(TimeoutException.class));
103+
subscriber.assertFailureAndMessage(TimeoutException.class, timeoutMessage(TIMEOUT, TIME_UNIT), "One");
105104
ts.dispose();
106105
}
107106

@@ -235,8 +234,7 @@ public void shouldTimeoutIfSynchronizedFlowableEmitFirstOnNextNotWithinTimeout()
235234
final CountDownLatch exit = new CountDownLatch(1);
236235
final CountDownLatch timeoutSetuped = new CountDownLatch(1);
237236

238-
final Subscriber<String> subscriber = TestHelper.mockSubscriber();
239-
final TestSubscriber<String> ts = new TestSubscriber<String>(subscriber);
237+
final TestSubscriber<String> subscriber = new TestSubscriber<String>();
240238

241239
new Thread(new Runnable() {
242240

@@ -258,16 +256,14 @@ public void subscribe(Subscriber<? super String> subscriber) {
258256
}
259257

260258
}).timeout(1, TimeUnit.SECONDS, testScheduler)
261-
.subscribe(ts);
259+
.subscribe(subscriber);
262260
}
263261
}).start();
264262

265263
timeoutSetuped.await();
266264
testScheduler.advanceTimeBy(2, TimeUnit.SECONDS);
267265

268-
InOrder inOrder = inOrder(subscriber);
269-
inOrder.verify(subscriber, times(1)).onError(isA(TimeoutException.class));
270-
inOrder.verifyNoMoreInteractions();
266+
subscriber.assertFailureAndMessage(TimeoutException.class, timeoutMessage(1, TimeUnit.SECONDS));
271267

272268
exit.countDown(); // exit the thread
273269
}
@@ -287,15 +283,12 @@ public void subscribe(Subscriber<? super String> subscriber) {
287283
TestScheduler testScheduler = new TestScheduler();
288284
Flowable<String> observableWithTimeout = never.timeout(1000, TimeUnit.MILLISECONDS, testScheduler);
289285

290-
Subscriber<String> subscriber = TestHelper.mockSubscriber();
291-
TestSubscriber<String> ts = new TestSubscriber<String>(subscriber);
292-
observableWithTimeout.subscribe(ts);
286+
TestSubscriber<String> subscriber = new TestSubscriber<String>();
287+
observableWithTimeout.subscribe(subscriber);
293288

294289
testScheduler.advanceTimeBy(2000, TimeUnit.MILLISECONDS);
295290

296-
InOrder inOrder = inOrder(subscriber);
297-
inOrder.verify(subscriber).onError(isA(TimeoutException.class));
298-
inOrder.verifyNoMoreInteractions();
291+
subscriber.assertFailureAndMessage(TimeoutException.class, timeoutMessage(1000, TimeUnit.MILLISECONDS));
299292

300293
verify(s, times(1)).cancel();
301294
}
@@ -548,11 +541,13 @@ public void run() {
548541
if (ts.valueCount() != 0) {
549542
if (ts.errorCount() != 0) {
550543
ts.assertFailure(TimeoutException.class, 1);
544+
ts.assertErrorMessage(timeoutMessage(1, TimeUnit.SECONDS));
551545
} else {
552546
ts.assertValuesOnly(1);
553547
}
554548
} else {
555549
ts.assertFailure(TimeoutException.class);
550+
ts.assertErrorMessage(timeoutMessage(1, TimeUnit.SECONDS));
556551
}
557552
}
558553
}

0 commit comments

Comments
 (0)