Skip to content

Commit b19671f

Browse files
committed
2.x: fix groupBy cancellation with evicting map factory (ReactiveX#5933)
1 parent 2770e4d commit b19671f

File tree

2 files changed

+115
-13
lines changed

2 files changed

+115
-13
lines changed

src/main/java/io/reactivex/internal/operators/flowable/FlowableGroupBy.java

+6
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,12 @@ public void cancel() {
237237
// cancelling the main source means we don't want any more groups
238238
// but running groups still require new values
239239
if (cancelled.compareAndSet(false, true)) {
240+
if (evictedGroups != null) {
241+
GroupedUnicast<K, V> evictedGroup;
242+
while ((evictedGroup = evictedGroups.poll()) != null) {
243+
evictedGroup.onComplete();
244+
}
245+
}
240246
if (groupCount.decrementAndGet() == 0) {
241247
s.cancel();
242248
}

src/test/java/io/reactivex/internal/operators/flowable/FlowableGroupByTest.java

+109-13
Original file line numberDiff line numberDiff line change
@@ -13,32 +13,66 @@
1313

1414
package io.reactivex.internal.operators.flowable;
1515

16-
import static org.junit.Assert.*;
17-
import static org.mockito.ArgumentMatchers.*;
18-
import static org.mockito.Mockito.*;
16+
import static org.junit.Assert.assertArrayEquals;
17+
import static org.junit.Assert.assertEquals;
18+
import static org.junit.Assert.assertNotNull;
19+
import static org.junit.Assert.assertTrue;
20+
import static org.mockito.ArgumentMatchers.any;
21+
import static org.mockito.ArgumentMatchers.anyInt;
22+
import static org.mockito.Mockito.mock;
23+
import static org.mockito.Mockito.never;
24+
import static org.mockito.Mockito.verify;
1925

2026
import java.io.IOException;
21-
import java.util.*;
22-
import java.util.concurrent.*;
23-
import java.util.concurrent.atomic.*;
27+
import java.util.ArrayList;
28+
import java.util.Arrays;
29+
import java.util.Collection;
30+
import java.util.HashMap;
31+
import java.util.List;
32+
import java.util.Map;
33+
import java.util.Set;
34+
import java.util.concurrent.ConcurrentHashMap;
35+
import java.util.concurrent.ConcurrentLinkedQueue;
36+
import java.util.concurrent.CopyOnWriteArrayList;
37+
import java.util.concurrent.CountDownLatch;
38+
import java.util.concurrent.TimeUnit;
39+
import java.util.concurrent.atomic.AtomicBoolean;
40+
import java.util.concurrent.atomic.AtomicInteger;
41+
import java.util.concurrent.atomic.AtomicReference;
2442

2543
import org.junit.Test;
2644
import org.mockito.Mockito;
27-
import org.reactivestreams.*;
28-
29-
import com.google.common.cache.*;
30-
31-
import io.reactivex.*;
45+
import org.reactivestreams.Publisher;
46+
import org.reactivestreams.Subscriber;
47+
import org.reactivestreams.Subscription;
48+
49+
import com.google.common.base.Ticker;
50+
import com.google.common.cache.CacheBuilder;
51+
import com.google.common.cache.RemovalListener;
52+
import com.google.common.cache.RemovalNotification;
53+
54+
import io.reactivex.BackpressureStrategy;
55+
import io.reactivex.Flowable;
56+
import io.reactivex.Notification;
57+
import io.reactivex.Scheduler;
58+
import io.reactivex.TestHelper;
3259
import io.reactivex.exceptions.TestException;
3360
import io.reactivex.flowables.GroupedFlowable;
34-
import io.reactivex.functions.*;
61+
import io.reactivex.functions.Action;
62+
import io.reactivex.functions.Consumer;
63+
import io.reactivex.functions.Function;
64+
import io.reactivex.functions.Predicate;
3565
import io.reactivex.internal.functions.Functions;
3666
import io.reactivex.internal.fuseable.QueueFuseable;
3767
import io.reactivex.internal.subscriptions.BooleanSubscription;
68+
import io.reactivex.processors.FlowableProcessor;
3869
import io.reactivex.processors.PublishProcessor;
3970
import io.reactivex.schedulers.Schedulers;
71+
import io.reactivex.schedulers.TestScheduler;
4072
import io.reactivex.subjects.PublishSubject;
41-
import io.reactivex.subscribers.*;
73+
import io.reactivex.subscribers.DefaultSubscriber;
74+
import io.reactivex.subscribers.SubscriberFusion;
75+
import io.reactivex.subscribers.TestSubscriber;
4276

4377
public class FlowableGroupByTest {
4478

@@ -1932,6 +1966,45 @@ public Publisher<Integer> apply(GroupedFlowable<Integer, Integer> g) throws Exce
19321966
ts.assertNoValues()
19331967
.assertError(ex);
19341968
}
1969+
1970+
@Test
1971+
public void testGroupByEvictionCancellationIssue5933() {
1972+
FlowableProcessor<Integer> source = PublishProcessor.create();
1973+
final TestScheduler scheduler = new TestScheduler();
1974+
Function<Consumer<Object>, Map<Integer, Object>> mapFactory = createEvictingMapFactorySynchronousOnlyDelayed(2, scheduler, 1, TimeUnit.SECONDS);
1975+
final AtomicInteger completed = new AtomicInteger();
1976+
1977+
Flowable<Integer> stream = source
1978+
.groupBy(Functions.<Integer>identity(), Functions.<Integer>identity(), false, Flowable.bufferSize(), mapFactory)
1979+
.flatMap(new Function<GroupedFlowable<Integer, Integer>, Publisher<? extends Integer>>() {
1980+
@Override
1981+
public Publisher<? extends Integer> apply(GroupedFlowable<Integer, Integer> group)
1982+
throws Exception {
1983+
return group
1984+
.doOnComplete(new Action() {
1985+
@Override
1986+
public void run() throws Exception {
1987+
completed.incrementAndGet();
1988+
}
1989+
});
1990+
}
1991+
});
1992+
TestSubscriber<Integer> subscriber = stream.test();
1993+
1994+
source.onNext(1);
1995+
source.onNext(2);
1996+
//this emission will force a completion (eventually)
1997+
source.onNext(3);
1998+
assertEquals(0, completed.get());
1999+
2000+
scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
2001+
2002+
assertEquals(0, completed.get());
2003+
2004+
subscriber.cancel();
2005+
2006+
assertEquals(1, completed.get());
2007+
}
19352008

19362009
private static Function<GroupedFlowable<Integer, Integer>, Publisher<? extends Integer>> addCompletedKey(
19372010
final List<Integer> completed) {
@@ -2086,4 +2159,27 @@ public void accept(Object object) {
20862159
}};
20872160
return evictingMapFactory;
20882161
}
2162+
2163+
private static Function<Consumer<Object>, Map<Integer, Object>> createEvictingMapFactorySynchronousOnlyDelayed(final int maxSize, final Scheduler scheduler, final int delay, final TimeUnit unit) {
2164+
Function<Consumer<Object>, Map<Integer, Object>> evictingMapFactory = //
2165+
new Function<Consumer<Object>, Map<Integer, Object>>() {
2166+
2167+
@Override
2168+
public Map<Integer, Object> apply(final Consumer<Object> notify) throws Exception {
2169+
return new SingleThreadEvictingHashMap<Integer,Object>(maxSize, new Consumer<Object>() {
2170+
@Override
2171+
public void accept(final Object object) {
2172+
scheduler.scheduleDirect(new Runnable() {
2173+
@Override
2174+
public void run() {
2175+
try {
2176+
notify.accept(object);
2177+
} catch (Exception e) {
2178+
throw new RuntimeException(e);
2179+
}}
2180+
});
2181+
}});
2182+
}};
2183+
return evictingMapFactory;
2184+
}
20892185
}

0 commit comments

Comments
 (0)