Skip to content

Commit 6261eca

Browse files
committed
2.x: fix groupBy cancellation with evicting map factory (ReactiveX#5933)
1 parent 2770e4d commit 6261eca

File tree

2 files changed

+134
-20
lines changed

2 files changed

+134
-20
lines changed

src/main/java/io/reactivex/internal/operators/flowable/FlowableGroupBy.java

+25-7
Original file line numberDiff line numberDiff line change
@@ -57,14 +57,15 @@ protected void subscribeActual(Subscriber<? super GroupedFlowable<K, V>> s) {
5757

5858
final Map<Object, GroupedUnicast<K, V>> groups;
5959
final Queue<GroupedUnicast<K, V>> evictedGroups;
60+
final AtomicBoolean cancelled = new AtomicBoolean();
6061

6162
try {
6263
if (mapFactory == null) {
6364
evictedGroups = null;
6465
groups = new ConcurrentHashMap<Object, GroupedUnicast<K, V>>();
6566
} else {
6667
evictedGroups = new ConcurrentLinkedQueue<GroupedUnicast<K, V>>();
67-
Consumer<Object> evictionAction = (Consumer) new EvictionAction<K, V>(evictedGroups);
68+
Consumer<Object> evictionAction = (Consumer<Object>)(Consumer<?>) new EvictionAction<K, V>(evictedGroups, cancelled);
6869
groups = (Map) mapFactory.apply(evictionAction);
6970
}
7071
} catch (Exception e) {
@@ -74,7 +75,7 @@ protected void subscribeActual(Subscriber<? super GroupedFlowable<K, V>> s) {
7475
return;
7576
}
7677
GroupBySubscriber<T, K, V> subscriber =
77-
new GroupBySubscriber<T, K, V>(s, keySelector, valueSelector, bufferSize, delayError, groups, evictedGroups);
78+
new GroupBySubscriber<T, K, V>(s, keySelector, valueSelector, bufferSize, delayError, groups, evictedGroups, cancelled);
7879
source.subscribe(subscriber);
7980
}
8081

@@ -97,7 +98,7 @@ public static final class GroupBySubscriber<T, K, V>
9798

9899
Subscription s;
99100

100-
final AtomicBoolean cancelled = new AtomicBoolean();
101+
final AtomicBoolean cancelled;
101102

102103
final AtomicLong requested = new AtomicLong();
103104

@@ -108,9 +109,11 @@ public static final class GroupBySubscriber<T, K, V>
108109

109110
boolean outputFused;
110111

112+
111113
public GroupBySubscriber(Subscriber<? super GroupedFlowable<K, V>> actual, Function<? super T, ? extends K> keySelector,
112114
Function<? super T, ? extends V> valueSelector, int bufferSize, boolean delayError,
113-
Map<Object, GroupedUnicast<K, V>> groups, Queue<GroupedUnicast<K, V>> evictedGroups) {
115+
Map<Object, GroupedUnicast<K, V>> groups, Queue<GroupedUnicast<K, V>> evictedGroups,
116+
AtomicBoolean cancelled) {
114117
this.actual = actual;
115118
this.keySelector = keySelector;
116119
this.valueSelector = valueSelector;
@@ -119,6 +122,7 @@ public GroupBySubscriber(Subscriber<? super GroupedFlowable<K, V>> actual, Funct
119122
this.groups = groups;
120123
this.evictedGroups = evictedGroups;
121124
this.queue = new SpscLinkedArrayQueue<GroupedFlowable<K, V>>(bufferSize);
125+
this.cancelled = cancelled;
122126
}
123127

124128
@Override
@@ -237,6 +241,12 @@ public void cancel() {
237241
// cancelling the main source means we don't want any more groups
238242
// but running groups still require new values
239243
if (cancelled.compareAndSet(false, true)) {
244+
if (evictedGroups != null) {
245+
GroupedUnicast<K, V> evictedGroup;
246+
while ((evictedGroup = evictedGroups.poll()) != null) {
247+
evictedGroup.onComplete();
248+
}
249+
}
240250
if (groupCount.decrementAndGet() == 0) {
241251
s.cancel();
242252
}
@@ -420,14 +430,22 @@ public boolean isEmpty() {
420430
static final class EvictionAction<K, V> implements Consumer<GroupedUnicast<K,V>> {
421431

422432
final Queue<GroupedUnicast<K, V>> evictedGroups;
433+
final AtomicBoolean cancelled;
423434

424-
EvictionAction(Queue<GroupedUnicast<K, V>> evictedGroups) {
435+
EvictionAction(Queue<GroupedUnicast<K, V>> evictedGroups, AtomicBoolean cancelled) {
425436
this.evictedGroups = evictedGroups;
437+
this.cancelled = cancelled;
426438
}
427439

428440
@Override
429-
public void accept(GroupedUnicast<K,V> value) {
430-
evictedGroups.offer(value);
441+
public void accept(GroupedUnicast<K,V> group) {
442+
if (!cancelled.get()) {
443+
evictedGroups.offer(group);
444+
} else {
445+
// note that this call to `onComplete` is safe because
446+
// GroupedUnicast is designed to allow concurrent access
447+
group.onComplete();
448+
}
431449
}
432450
}
433451

src/test/java/io/reactivex/internal/operators/flowable/FlowableGroupByTest.java

+109-13
Original file line numberDiff line numberDiff line change
@@ -13,32 +13,66 @@
1313

1414
package io.reactivex.internal.operators.flowable;
1515

16-
import static org.junit.Assert.*;
17-
import static org.mockito.ArgumentMatchers.*;
18-
import static org.mockito.Mockito.*;
16+
import static org.junit.Assert.assertArrayEquals;
17+
import static org.junit.Assert.assertEquals;
18+
import static org.junit.Assert.assertNotNull;
19+
import static org.junit.Assert.assertTrue;
20+
import static org.mockito.ArgumentMatchers.any;
21+
import static org.mockito.ArgumentMatchers.anyInt;
22+
import static org.mockito.Mockito.mock;
23+
import static org.mockito.Mockito.never;
24+
import static org.mockito.Mockito.verify;
1925

2026
import java.io.IOException;
21-
import java.util.*;
22-
import java.util.concurrent.*;
23-
import java.util.concurrent.atomic.*;
27+
import java.util.ArrayList;
28+
import java.util.Arrays;
29+
import java.util.Collection;
30+
import java.util.HashMap;
31+
import java.util.List;
32+
import java.util.Map;
33+
import java.util.Set;
34+
import java.util.concurrent.ConcurrentHashMap;
35+
import java.util.concurrent.ConcurrentLinkedQueue;
36+
import java.util.concurrent.CopyOnWriteArrayList;
37+
import java.util.concurrent.CountDownLatch;
38+
import java.util.concurrent.TimeUnit;
39+
import java.util.concurrent.atomic.AtomicBoolean;
40+
import java.util.concurrent.atomic.AtomicInteger;
41+
import java.util.concurrent.atomic.AtomicReference;
2442

2543
import org.junit.Test;
2644
import org.mockito.Mockito;
27-
import org.reactivestreams.*;
28-
29-
import com.google.common.cache.*;
30-
31-
import io.reactivex.*;
45+
import org.reactivestreams.Publisher;
46+
import org.reactivestreams.Subscriber;
47+
import org.reactivestreams.Subscription;
48+
49+
import com.google.common.base.Ticker;
50+
import com.google.common.cache.CacheBuilder;
51+
import com.google.common.cache.RemovalListener;
52+
import com.google.common.cache.RemovalNotification;
53+
54+
import io.reactivex.BackpressureStrategy;
55+
import io.reactivex.Flowable;
56+
import io.reactivex.Notification;
57+
import io.reactivex.Scheduler;
58+
import io.reactivex.TestHelper;
3259
import io.reactivex.exceptions.TestException;
3360
import io.reactivex.flowables.GroupedFlowable;
34-
import io.reactivex.functions.*;
61+
import io.reactivex.functions.Action;
62+
import io.reactivex.functions.Consumer;
63+
import io.reactivex.functions.Function;
64+
import io.reactivex.functions.Predicate;
3565
import io.reactivex.internal.functions.Functions;
3666
import io.reactivex.internal.fuseable.QueueFuseable;
3767
import io.reactivex.internal.subscriptions.BooleanSubscription;
68+
import io.reactivex.processors.FlowableProcessor;
3869
import io.reactivex.processors.PublishProcessor;
3970
import io.reactivex.schedulers.Schedulers;
71+
import io.reactivex.schedulers.TestScheduler;
4072
import io.reactivex.subjects.PublishSubject;
41-
import io.reactivex.subscribers.*;
73+
import io.reactivex.subscribers.DefaultSubscriber;
74+
import io.reactivex.subscribers.SubscriberFusion;
75+
import io.reactivex.subscribers.TestSubscriber;
4276

4377
public class FlowableGroupByTest {
4478

@@ -1932,6 +1966,45 @@ public Publisher<Integer> apply(GroupedFlowable<Integer, Integer> g) throws Exce
19321966
ts.assertNoValues()
19331967
.assertError(ex);
19341968
}
1969+
1970+
@Test
1971+
public void testGroupByEvictionCancellationIssue5933() {
1972+
FlowableProcessor<Integer> source = PublishProcessor.create();
1973+
final TestScheduler scheduler = new TestScheduler();
1974+
Function<Consumer<Object>, Map<Integer, Object>> mapFactory = createEvictingMapFactorySynchronousOnlyDelayed(2, scheduler, 1, TimeUnit.SECONDS);
1975+
final AtomicInteger completed = new AtomicInteger();
1976+
1977+
Flowable<Integer> stream = source
1978+
.groupBy(Functions.<Integer>identity(), Functions.<Integer>identity(), false, Flowable.bufferSize(), mapFactory)
1979+
.flatMap(new Function<GroupedFlowable<Integer, Integer>, Publisher<? extends Integer>>() {
1980+
@Override
1981+
public Publisher<? extends Integer> apply(GroupedFlowable<Integer, Integer> group)
1982+
throws Exception {
1983+
return group
1984+
.doOnComplete(new Action() {
1985+
@Override
1986+
public void run() throws Exception {
1987+
completed.incrementAndGet();
1988+
}
1989+
});
1990+
}
1991+
});
1992+
TestSubscriber<Integer> subscriber = stream.test();
1993+
1994+
source.onNext(1);
1995+
source.onNext(2);
1996+
//this emission will force a completion (eventually)
1997+
source.onNext(3);
1998+
assertEquals(0, completed.get());
1999+
2000+
scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
2001+
2002+
assertEquals(0, completed.get());
2003+
2004+
subscriber.cancel();
2005+
2006+
assertEquals(1, completed.get());
2007+
}
19352008

19362009
private static Function<GroupedFlowable<Integer, Integer>, Publisher<? extends Integer>> addCompletedKey(
19372010
final List<Integer> completed) {
@@ -2086,4 +2159,27 @@ public void accept(Object object) {
20862159
}};
20872160
return evictingMapFactory;
20882161
}
2162+
2163+
private static Function<Consumer<Object>, Map<Integer, Object>> createEvictingMapFactorySynchronousOnlyDelayed(final int maxSize, final Scheduler scheduler, final int delay, final TimeUnit unit) {
2164+
Function<Consumer<Object>, Map<Integer, Object>> evictingMapFactory = //
2165+
new Function<Consumer<Object>, Map<Integer, Object>>() {
2166+
2167+
@Override
2168+
public Map<Integer, Object> apply(final Consumer<Object> notify) throws Exception {
2169+
return new SingleThreadEvictingHashMap<Integer,Object>(maxSize, new Consumer<Object>() {
2170+
@Override
2171+
public void accept(final Object object) {
2172+
scheduler.scheduleDirect(new Runnable() {
2173+
@Override
2174+
public void run() {
2175+
try {
2176+
notify.accept(object);
2177+
} catch (Exception e) {
2178+
throw new RuntimeException(e);
2179+
}}
2180+
});
2181+
}});
2182+
}};
2183+
return evictingMapFactory;
2184+
}
20892185
}

0 commit comments

Comments
 (0)