Skip to content

Commit 5387e20

Browse files
authored
2.x: improve Flowable.timeout() (#5661)
* 2.x: improve Flowable.timeout() * Remove the now unused FullArbiter(Subscriber) * Don't read the volatile twice
1 parent bffc7f2 commit 5387e20

File tree

9 files changed

+692
-748
lines changed

9 files changed

+692
-748
lines changed

src/main/java/io/reactivex/internal/operators/flowable/FlowableTimeout.java

+226-182
Large diffs are not rendered by default.

src/main/java/io/reactivex/internal/operators/flowable/FlowableTimeoutTimed.java

+160-138
Large diffs are not rendered by default.

src/main/java/io/reactivex/internal/subscribers/FullArbiterSubscriber.java

-57
This file was deleted.

src/main/java/io/reactivex/internal/subscriptions/FullArbiter.java

-232
This file was deleted.

src/main/java/io/reactivex/schedulers/TestScheduler.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -102,14 +102,14 @@ public void triggerActions() {
102102
}
103103

104104
private void triggerActions(long targetTimeInNanoseconds) {
105-
while (!queue.isEmpty()) {
105+
for (;;) {
106106
TimedRunnable current = queue.peek();
107-
if (current.time > targetTimeInNanoseconds) {
107+
if (current == null || current.time > targetTimeInNanoseconds) {
108108
break;
109109
}
110110
// if scheduled time is 0 (immediate) use current virtual time
111111
time = current.time == 0 ? time : current.time;
112-
queue.remove();
112+
queue.remove(current);
113113

114114
// Only execute if not unsubscribed
115115
if (!current.scheduler.disposed) {

src/test/java/io/reactivex/flowable/FlowableNullTests.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -2328,7 +2328,7 @@ public void timeoutFirstItemNull() {
23282328

23292329
@Test(expected = NullPointerException.class)
23302330
public void timeoutFirstItemReturnsNull() {
2331-
just1.timeout(just1, new Function<Integer, Publisher<Object>>() {
2331+
just1.timeout(Flowable.never(), new Function<Integer, Publisher<Object>>() {
23322332
@Override
23332333
public Publisher<Object> apply(Integer v) {
23342334
return null;

0 commit comments

Comments
 (0)