Skip to content

Commit f82a197

Browse files
authored
2.x: fix buffer(time, maxSize) duplicating buffers on time-size race (#5427)
1 parent 73a85c1 commit f82a197

File tree

4 files changed

+94
-23
lines changed

4 files changed

+94
-23
lines changed

src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferTimed.java

+6-11
Original file line numberDiff line numberDiff line change
@@ -460,12 +460,12 @@ public void onNext(T t) {
460460
if (b.size() < maxSize) {
461461
return;
462462
}
463-
}
464463

465-
if (restartTimerOnMaxSize) {
466464
buffer = null;
467465
producerIndex++;
466+
}
468467

468+
if (restartTimerOnMaxSize) {
469469
timer.dispose();
470470
}
471471

@@ -480,17 +480,12 @@ public void onNext(T t) {
480480
return;
481481
}
482482

483+
synchronized (this) {
484+
buffer = b;
485+
consumerIndex++;
486+
}
483487
if (restartTimerOnMaxSize) {
484-
synchronized (this) {
485-
buffer = b;
486-
consumerIndex++;
487-
}
488-
489488
timer = w.schedulePeriodically(this, timespan, timespan, unit);
490-
} else {
491-
synchronized (this) {
492-
buffer = b;
493-
}
494489
}
495490
}
496491

src/main/java/io/reactivex/internal/operators/observable/ObservableBufferTimed.java

+6-12
Original file line numberDiff line numberDiff line change
@@ -458,12 +458,11 @@ public void onNext(T t) {
458458
if (b.size() < maxSize) {
459459
return;
460460
}
461-
}
462-
463-
if (restartTimerOnMaxSize) {
464461
buffer = null;
465462
producerIndex++;
463+
}
466464

465+
if (restartTimerOnMaxSize) {
467466
timer.dispose();
468467
}
469468

@@ -478,17 +477,12 @@ public void onNext(T t) {
478477
return;
479478
}
480479

480+
synchronized (this) {
481+
buffer = b;
482+
consumerIndex++;
483+
}
481484
if (restartTimerOnMaxSize) {
482-
synchronized (this) {
483-
buffer = b;
484-
consumerIndex++;
485-
}
486-
487485
timer = w.schedulePeriodically(this, timespan, timespan, unit);
488-
} else {
489-
synchronized (this) {
490-
buffer = b;
491-
}
492486
}
493487
}
494488

src/test/java/io/reactivex/internal/operators/flowable/FlowableBufferTest.java

+41
Original file line numberDiff line numberDiff line change
@@ -1973,4 +1973,45 @@ public void skipBackpressure() {
19731973
.test()
19741974
.assertResult(Arrays.asList(1, 2), Arrays.asList(4, 5), Arrays.asList(7, 8), Arrays.asList(10));
19751975
}
1976+
1977+
@Test
1978+
public void withTimeAndSizeCapacityRace() {
1979+
for (int i = 0; i < 1000; i++) {
1980+
final TestScheduler scheduler = new TestScheduler();
1981+
1982+
final PublishProcessor<Object> ps = PublishProcessor.create();
1983+
1984+
TestSubscriber<List<Object>> ts = ps.buffer(1, TimeUnit.SECONDS, scheduler, 5).test();
1985+
1986+
ps.onNext(1);
1987+
ps.onNext(2);
1988+
ps.onNext(3);
1989+
ps.onNext(4);
1990+
1991+
Runnable r1 = new Runnable() {
1992+
@Override
1993+
public void run() {
1994+
ps.onNext(5);
1995+
}
1996+
};
1997+
1998+
Runnable r2 = new Runnable() {
1999+
@Override
2000+
public void run() {
2001+
scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
2002+
}
2003+
};
2004+
2005+
TestHelper.race(r1, r2);
2006+
2007+
ps.onComplete();
2008+
2009+
int items = 0;
2010+
for (List<Object> o : ts.values()) {
2011+
items += o.size();
2012+
}
2013+
2014+
assertEquals("Round: " + i, 5, items);
2015+
}
2016+
}
19762017
}

src/test/java/io/reactivex/internal/operators/observable/ObservableBufferTest.java

+41
Original file line numberDiff line numberDiff line change
@@ -1398,4 +1398,45 @@ public void bufferTimedExactBoundedError() {
13981398
to
13991399
.assertFailure(TestException.class);
14001400
}
1401+
1402+
@Test
1403+
public void withTimeAndSizeCapacityRace() {
1404+
for (int i = 0; i < 1000; i++) {
1405+
final TestScheduler scheduler = new TestScheduler();
1406+
1407+
final PublishSubject<Object> ps = PublishSubject.create();
1408+
1409+
TestObserver<List<Object>> ts = ps.buffer(1, TimeUnit.SECONDS, scheduler, 5).test();
1410+
1411+
ps.onNext(1);
1412+
ps.onNext(2);
1413+
ps.onNext(3);
1414+
ps.onNext(4);
1415+
1416+
Runnable r1 = new Runnable() {
1417+
@Override
1418+
public void run() {
1419+
ps.onNext(5);
1420+
}
1421+
};
1422+
1423+
Runnable r2 = new Runnable() {
1424+
@Override
1425+
public void run() {
1426+
scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
1427+
}
1428+
};
1429+
1430+
TestHelper.race(r1, r2);
1431+
1432+
ps.onComplete();
1433+
1434+
int items = 0;
1435+
for (List<Object> o : ts.values()) {
1436+
items += o.size();
1437+
}
1438+
1439+
assertEquals("Round: " + i, 5, items);
1440+
}
1441+
}
14011442
}

0 commit comments

Comments
 (0)