Skip to content

Commit a1b9628

Browse files
authored
2.x: Flowable.take to route post-cancel errors to plugin error handler (#5978)
1 parent dc94f56 commit a1b9628

File tree

4 files changed

+56
-2
lines changed

4 files changed

+56
-2
lines changed

src/main/java/io/reactivex/internal/operators/flowable/FlowableTake.java

+3
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import io.reactivex.*;
2121
import io.reactivex.internal.subscriptions.*;
22+
import io.reactivex.plugins.RxJavaPlugins;
2223

2324
public final class FlowableTake<T> extends AbstractFlowableWithUpstream<T, T> {
2425
final long limit;
@@ -75,6 +76,8 @@ public void onError(Throwable t) {
7576
done = true;
7677
subscription.cancel();
7778
actual.onError(t);
79+
} else {
80+
RxJavaPlugins.onError(t);
7881
}
7982
}
8083
@Override

src/test/java/io/reactivex/internal/operators/flowable/FlowableLimitTest.java

+15
Original file line numberDiff line numberDiff line change
@@ -207,4 +207,19 @@ public void run() {
207207
ts.assertResult(1, 2, 3, 4, 5);
208208
}
209209
}
210+
211+
@Test
212+
public void errorAfterLimitReached() {
213+
List<Throwable> errors = TestHelper.trackPluginErrors();
214+
try {
215+
Flowable.error(new TestException())
216+
.limit(0)
217+
.test()
218+
.assertResult();
219+
220+
TestHelper.assertUndeliverable(errors, 0, TestException.class);
221+
} finally {
222+
RxJavaPlugins.reset();
223+
}
224+
}
210225
}

src/test/java/io/reactivex/internal/operators/flowable/FlowableTakeTest.java

+18-1
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,10 @@
1414
package io.reactivex.internal.operators.flowable;
1515

1616
import static org.junit.Assert.*;
17+
import static org.mockito.ArgumentMatchers.*;
1718
import static org.mockito.Mockito.*;
1819

19-
import java.util.Arrays;
20+
import java.util.*;
2021
import java.util.concurrent.*;
2122
import java.util.concurrent.atomic.*;
2223

@@ -28,6 +29,7 @@
2829
import io.reactivex.exceptions.TestException;
2930
import io.reactivex.functions.*;
3031
import io.reactivex.internal.subscriptions.BooleanSubscription;
32+
import io.reactivex.plugins.RxJavaPlugins;
3133
import io.reactivex.processors.PublishProcessor;
3234
import io.reactivex.schedulers.Schedulers;
3335
import io.reactivex.subscribers.TestSubscriber;
@@ -495,4 +497,19 @@ public void run() {
495497
ts.assertResult(1, 2);
496498
}
497499
}
500+
501+
@Test
502+
public void errorAfterLimitReached() {
503+
List<Throwable> errors = TestHelper.trackPluginErrors();
504+
try {
505+
Flowable.error(new TestException())
506+
.take(0)
507+
.test()
508+
.assertResult();
509+
510+
TestHelper.assertUndeliverable(errors, 0, TestException.class);
511+
} finally {
512+
RxJavaPlugins.reset();
513+
}
514+
}
498515
}

src/test/java/io/reactivex/internal/operators/observable/ObservableTakeTest.java

+20-1
Original file line numberDiff line numberDiff line change
@@ -14,20 +14,24 @@
1414
package io.reactivex.internal.operators.observable;
1515

1616
import static org.junit.Assert.*;
17+
import static org.mockito.ArgumentMatchers.*;
1718
import static org.mockito.Mockito.*;
1819

19-
import java.util.Arrays;
20+
import java.util.*;
2021
import java.util.concurrent.CountDownLatch;
2122
import java.util.concurrent.atomic.*;
2223

2324
import org.junit.*;
2425
import org.mockito.InOrder;
2526

2627
import io.reactivex.*;
28+
import io.reactivex.Observable;
29+
import io.reactivex.Observer;
2730
import io.reactivex.disposables.*;
2831
import io.reactivex.exceptions.TestException;
2932
import io.reactivex.functions.*;
3033
import io.reactivex.observers.TestObserver;
34+
import io.reactivex.plugins.RxJavaPlugins;
3135
import io.reactivex.schedulers.Schedulers;
3236
import io.reactivex.subjects.PublishSubject;
3337

@@ -389,4 +393,19 @@ public ObservableSource<Object> apply(Observable<Object> o) throws Exception {
389393
}
390394
});
391395
}
396+
397+
@Test
398+
public void errorAfterLimitReached() {
399+
List<Throwable> errors = TestHelper.trackPluginErrors();
400+
try {
401+
Observable.error(new TestException())
402+
.take(0)
403+
.test()
404+
.assertResult();
405+
406+
TestHelper.assertUndeliverable(errors, 0, TestException.class);
407+
} finally {
408+
RxJavaPlugins.reset();
409+
}
410+
}
392411
}

0 commit comments

Comments
 (0)