Skip to content

Commit 6fefdc6

Browse files
davidmotenakarnokd
authored andcommitted
2.x fix group-by eviction so that source is cancelled and reduce volatile reads (#5933) (#5947)
1 parent 43ceedf commit 6fefdc6

File tree

2 files changed

+146
-11
lines changed

2 files changed

+146
-11
lines changed

src/main/java/io/reactivex/internal/operators/flowable/FlowableGroupBy.java

+24-11
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,8 @@ public static final class GroupBySubscriber<T, K, V>
104104
final AtomicInteger groupCount = new AtomicInteger(1);
105105

106106
Throwable error;
107-
volatile boolean done;
107+
volatile boolean finished;
108+
boolean done;
108109

109110
boolean outputFused;
110111

@@ -178,12 +179,7 @@ public void onNext(T t) {
178179

179180
group.onNext(v);
180181

181-
if (evictedGroups != null) {
182-
GroupedUnicast<K, V> evictedGroup;
183-
while ((evictedGroup = evictedGroups.poll()) != null) {
184-
evictedGroup.onComplete();
185-
}
186-
}
182+
completeEvictions();
187183

188184
if (newGroup) {
189185
q.offer(group);
@@ -197,6 +193,7 @@ public void onError(Throwable t) {
197193
RxJavaPlugins.onError(t);
198194
return;
199195
}
196+
done = true;
200197
for (GroupedUnicast<K, V> g : groups.values()) {
201198
g.onError(t);
202199
}
@@ -205,7 +202,7 @@ public void onError(Throwable t) {
205202
evictedGroups.clear();
206203
}
207204
error = t;
208-
done = true;
205+
finished = true;
209206
drain();
210207
}
211208

@@ -220,6 +217,7 @@ public void onComplete() {
220217
evictedGroups.clear();
221218
}
222219
done = true;
220+
finished = true;
223221
drain();
224222
}
225223
}
@@ -237,12 +235,27 @@ public void cancel() {
237235
// cancelling the main source means we don't want any more groups
238236
// but running groups still require new values
239237
if (cancelled.compareAndSet(false, true)) {
238+
completeEvictions();
240239
if (groupCount.decrementAndGet() == 0) {
241240
s.cancel();
242241
}
243242
}
244243
}
245244

245+
private void completeEvictions() {
246+
if (evictedGroups != null) {
247+
int count = 0;
248+
GroupedUnicast<K, V> evictedGroup;
249+
while ((evictedGroup = evictedGroups.poll()) != null) {
250+
evictedGroup.onComplete();
251+
count++;
252+
}
253+
if (count != 0) {
254+
groupCount.addAndGet(-count);
255+
}
256+
}
257+
}
258+
246259
public void cancel(K key) {
247260
Object mapKey = key != null ? key : NULL_KEY;
248261
groups.remove(mapKey);
@@ -279,7 +292,7 @@ void drainFused() {
279292
return;
280293
}
281294

282-
boolean d = done;
295+
boolean d = finished;
283296

284297
if (d && !delayError) {
285298
Throwable ex = error;
@@ -321,7 +334,7 @@ void drainNormal() {
321334
long e = 0L;
322335

323336
while (e != r) {
324-
boolean d = done;
337+
boolean d = finished;
325338

326339
GroupedFlowable<K, V> t = q.poll();
327340

@@ -340,7 +353,7 @@ void drainNormal() {
340353
e++;
341354
}
342355

343-
if (e == r && checkTerminated(done, q.isEmpty(), a, q)) {
356+
if (e == r && checkTerminated(finished, q.isEmpty(), a, q)) {
344357
return;
345358
}
346359

src/test/java/io/reactivex/internal/operators/flowable/FlowableGroupByTest.java

+122
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.mockito.Mockito;
2727
import org.reactivestreams.*;
2828

29+
import com.google.common.base.Ticker;
2930
import com.google.common.cache.*;
3031

3132
import io.reactivex.*;
@@ -1947,6 +1948,127 @@ public void run() throws Exception {
19471948
}
19481949
};
19491950
}
1951+
1952+
private static final class TestTicker extends Ticker {
1953+
long tick = 0;
1954+
1955+
@Override
1956+
public long read() {
1957+
return tick;
1958+
}
1959+
}
1960+
1961+
@Test
1962+
public void testGroupByEvictionCancellationOfSource5933() {
1963+
PublishProcessor<Integer> source = PublishProcessor.create();
1964+
final TestTicker testTicker = new TestTicker();
1965+
1966+
Function<Consumer<Object>, Map<Integer, Object>> mapFactory = new Function<Consumer<Object>, Map<Integer, Object>>() {
1967+
@Override
1968+
public Map<Integer, Object> apply(final Consumer<Object> action) throws Exception {
1969+
return CacheBuilder.newBuilder() //
1970+
.expireAfterAccess(5, TimeUnit.SECONDS).removalListener(new RemovalListener<Object, Object>() {
1971+
@Override
1972+
public void onRemoval(RemovalNotification<Object, Object> notification) {
1973+
try {
1974+
action.accept(notification.getValue());
1975+
} catch (Exception ex) {
1976+
throw new RuntimeException(ex);
1977+
}
1978+
}
1979+
}).ticker(testTicker) //
1980+
.<Integer, Object>build().asMap();
1981+
}
1982+
};
1983+
1984+
final List<String> list = new CopyOnWriteArrayList<String>();
1985+
Flowable<Integer> stream = source //
1986+
.doOnCancel(new Action() {
1987+
@Override
1988+
public void run() throws Exception {
1989+
list.add("Source canceled");
1990+
}
1991+
})
1992+
.<Integer, Integer>groupBy(Functions.<Integer>identity(), Functions.<Integer>identity(), false,
1993+
Flowable.bufferSize(), mapFactory) //
1994+
.flatMap(new Function<GroupedFlowable<Integer, Integer>, Publisher<? extends Integer>>() {
1995+
@Override
1996+
public Publisher<? extends Integer> apply(GroupedFlowable<Integer, Integer> group)
1997+
throws Exception {
1998+
return group //
1999+
.doOnComplete(new Action() {
2000+
@Override
2001+
public void run() throws Exception {
2002+
list.add("Group completed");
2003+
}
2004+
}).doOnCancel(new Action() {
2005+
@Override
2006+
public void run() throws Exception {
2007+
list.add("Group canceled");
2008+
}
2009+
});
2010+
}
2011+
});
2012+
TestSubscriber<Integer> ts = stream //
2013+
.doOnCancel(new Action() {
2014+
@Override
2015+
public void run() throws Exception {
2016+
list.add("Outer group by canceled");
2017+
}
2018+
}).test();
2019+
2020+
// Send 3 in the same group and wait for them to be seen
2021+
source.onNext(1);
2022+
source.onNext(1);
2023+
source.onNext(1);
2024+
ts.awaitCount(3);
2025+
2026+
// Advance time far enough to evict the group.
2027+
// NOTE -- Comment this line out to make the test "pass".
2028+
testTicker.tick = TimeUnit.SECONDS.toNanos(6);
2029+
2030+
// Send more data in the group (triggering eviction and recreation)
2031+
source.onNext(1);
2032+
2033+
// Wait for the last 2 and then cancel the subscription
2034+
ts.awaitCount(4);
2035+
ts.cancel();
2036+
2037+
// Observe the result. Note that right now the result differs depending on whether eviction occurred or
2038+
// not. The observed sequence in that case is: Group completed, Outer group by canceled., Group canceled.
2039+
// The addition of the "Group completed" is actually fine, but the fact that the cancel doesn't reach the
2040+
// source seems like a bug. Commenting out the setting of "tick" above will produce the "expected" sequence.
2041+
System.out.println(list);
2042+
assertTrue(list.contains("Source canceled"));
2043+
assertEquals(Arrays.asList(
2044+
"Group completed", // this is here when eviction occurs
2045+
"Outer group by canceled",
2046+
"Group canceled",
2047+
"Source canceled" // This is *not* here when eviction occurs
2048+
), list);
2049+
}
2050+
2051+
@Test
2052+
public void testCancellationOfUpstreamWhenGroupedFlowableCompletes() {
2053+
final AtomicBoolean cancelled = new AtomicBoolean();
2054+
Flowable.just(1).repeat().doOnCancel(new Action() {
2055+
@Override
2056+
public void run() throws Exception {
2057+
cancelled.set(true);
2058+
}
2059+
})
2060+
.groupBy(Functions.<Integer>identity(), Functions.<Integer>identity()) //
2061+
.flatMap(new Function<GroupedFlowable<Integer, Integer>, Publisher<? extends Object>>() {
2062+
@Override
2063+
public Publisher<? extends Object> apply(GroupedFlowable<Integer, Integer> g) throws Exception {
2064+
return g.first(0).toFlowable();
2065+
}
2066+
})
2067+
.take(4) //
2068+
.test() //
2069+
.assertComplete();
2070+
assertTrue(cancelled.get());
2071+
}
19502072

19512073
//not thread safe
19522074
private static final class SingleThreadEvictingHashMap<K,V> implements Map<K,V> {

0 commit comments

Comments
 (0)