Skip to content

Commit 8700965

Browse files
authored
2.x: fix window(time, size) not completing windows on timeout (ReactiveX#5106)
1 parent d9e2df9 commit 8700965

File tree

4 files changed

+40
-0
lines changed

4 files changed

+40
-0
lines changed

Diff for: src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowTimed.java

+2
Original file line numberDiff line numberDiff line change
@@ -496,6 +496,8 @@ void drainLoop() {
496496
if (isHolder) {
497497
ConsumerIndexHolder consumerIndexHolder = (ConsumerIndexHolder) o;
498498
if (producerIndex == consumerIndexHolder.index) {
499+
w.onComplete();
500+
499501
w = UnicastProcessor.<T>create(bufferSize);
500502
window = w;
501503

Diff for: src/main/java/io/reactivex/internal/operators/observable/ObservableWindowTimed.java

+2
Original file line numberDiff line numberDiff line change
@@ -439,6 +439,8 @@ void drainLoop() {
439439
if (isHolder) {
440440
ConsumerIndexHolder consumerIndexHolder = (ConsumerIndexHolder) o;
441441
if (producerIndex == consumerIndexHolder.index) {
442+
w.onComplete();
443+
442444
w = UnicastSubject.create(bufferSize);
443445
window = w;
444446

Diff for: src/test/java/io/reactivex/internal/operators/flowable/FlowableWindowWithTimeTest.java

+18
Original file line numberDiff line numberDiff line change
@@ -686,4 +686,22 @@ public Flowable<Integer> apply(Flowable<Integer> v) throws Exception {
686686
.awaitDone(1, TimeUnit.SECONDS)
687687
.assertResult(1, 2);
688688
}
689+
690+
@Test
691+
public void sizeTimeTimeout() {
692+
TestScheduler scheduler = new TestScheduler();
693+
PublishProcessor<Integer> ps = PublishProcessor.<Integer>create();
694+
695+
TestSubscriber<Flowable<Integer>> ts = ps.window(5, TimeUnit.MILLISECONDS, scheduler, 100)
696+
.test()
697+
.assertValueCount(1);
698+
699+
scheduler.advanceTimeBy(5, TimeUnit.MILLISECONDS);
700+
701+
ts.assertValueCount(2)
702+
.assertNoErrors()
703+
.assertNotComplete();
704+
705+
ts.values().get(0).test().assertResult();
706+
}
689707
}

Diff for: src/test/java/io/reactivex/internal/operators/observable/ObservableWindowWithTimeTest.java

+18
Original file line numberDiff line numberDiff line change
@@ -585,4 +585,22 @@ public ObservableSource<Integer> apply(Observable<Integer> v) throws Exception {
585585
.awaitDone(1, TimeUnit.SECONDS)
586586
.assertResult(1, 2);
587587
}
588+
589+
@Test
590+
public void sizeTimeTimeout() {
591+
TestScheduler scheduler = new TestScheduler();
592+
Subject<Integer> ps = PublishSubject.<Integer>create();
593+
594+
TestObserver<Observable<Integer>> ts = ps.window(5, TimeUnit.MILLISECONDS, scheduler, 100)
595+
.test()
596+
.assertValueCount(1);
597+
598+
scheduler.advanceTimeBy(5, TimeUnit.MILLISECONDS);
599+
600+
ts.assertValueCount(2)
601+
.assertNoErrors()
602+
.assertNotComplete();
603+
604+
ts.values().get(0).test().assertResult();
605+
}
588606
}

0 commit comments

Comments
 (0)