Skip to content

Commit 28d1352

Browse files
authored
2.x: fix scan(seed, f) to emit accumulated values asap (#5090)
1 parent 64d2d15 commit 28d1352

File tree

3 files changed

+412
-234
lines changed

3 files changed

+412
-234
lines changed

src/main/java/io/reactivex/internal/operators/flowable/FlowableScanSeed.java

+154-20
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,17 @@
1313
package io.reactivex.internal.operators.flowable;
1414

1515
import java.util.concurrent.Callable;
16+
import java.util.concurrent.atomic.*;
1617

1718
import org.reactivestreams.*;
1819

1920
import io.reactivex.exceptions.Exceptions;
2021
import io.reactivex.functions.BiFunction;
2122
import io.reactivex.internal.functions.ObjectHelper;
22-
import io.reactivex.internal.subscribers.SinglePostCompleteSubscriber;
23-
import io.reactivex.internal.subscriptions.EmptySubscription;
23+
import io.reactivex.internal.fuseable.SimplePlainQueue;
24+
import io.reactivex.internal.queue.SpscArrayQueue;
25+
import io.reactivex.internal.subscriptions.*;
26+
import io.reactivex.internal.util.BackpressureHelper;
2427
import io.reactivex.plugins.RxJavaPlugins;
2528

2629
public final class FlowableScanSeed<T, R> extends AbstractFlowableWithUpstream<T, R> {
@@ -45,20 +48,57 @@ protected void subscribeActual(Subscriber<? super R> s) {
4548
return;
4649
}
4750

48-
source.subscribe(new ScanSeedSubscriber<T, R>(s, accumulator, r));
51+
source.subscribe(new ScanSeedSubscriber<T, R>(s, accumulator, r, bufferSize()));
4952
}
5053

51-
static final class ScanSeedSubscriber<T, R> extends SinglePostCompleteSubscriber<T, R> {
54+
static final class ScanSeedSubscriber<T, R>
55+
extends AtomicInteger
56+
implements Subscriber<T>, Subscription {
5257
private static final long serialVersionUID = -1776795561228106469L;
5358

59+
final Subscriber<? super R> actual;
60+
5461
final BiFunction<R, ? super T, R> accumulator;
5562

56-
boolean done;
63+
final SimplePlainQueue<R> queue;
64+
65+
final AtomicLong requested;
66+
67+
final int prefetch;
68+
69+
final int limit;
70+
71+
volatile boolean cancelled;
72+
73+
volatile boolean done;
74+
Throwable error;
75+
76+
Subscription s;
77+
78+
R value;
5779

58-
ScanSeedSubscriber(Subscriber<? super R> actual, BiFunction<R, ? super T, R> accumulator, R value) {
59-
super(actual);
80+
int consumed;
81+
82+
ScanSeedSubscriber(Subscriber<? super R> actual, BiFunction<R, ? super T, R> accumulator, R value, int prefetch) {
83+
this.actual = actual;
6084
this.accumulator = accumulator;
6185
this.value = value;
86+
this.prefetch = prefetch;
87+
this.limit = prefetch - (prefetch >> 2);
88+
this.queue = new SpscArrayQueue<R>(prefetch);
89+
this.queue.offer(value);
90+
this.requested = new AtomicLong();
91+
}
92+
93+
@Override
94+
public void onSubscribe(Subscription s) {
95+
if (SubscriptionHelper.validate(this.s, s)) {
96+
this.s = s;
97+
98+
actual.onSubscribe(this);
99+
100+
s.request(prefetch - 1);
101+
}
62102
}
63103

64104
@Override
@@ -68,21 +108,18 @@ public void onNext(T t) {
68108
}
69109

70110
R v = value;
71-
72-
R u;
73-
74111
try {
75-
u = ObjectHelper.requireNonNull(accumulator.apply(v, t), "The accumulator returned a null value");
76-
} catch (Throwable e) {
77-
Exceptions.throwIfFatal(e);
112+
v = ObjectHelper.requireNonNull(accumulator.apply(v, t), "The accumulator returned a null value");
113+
} catch (Throwable ex) {
114+
Exceptions.throwIfFatal(ex);
78115
s.cancel();
79-
onError(e);
116+
onError(ex);
80117
return;
81118
}
82119

83-
value = u;
84-
produced++;
85-
actual.onNext(v);
120+
value = v;
121+
queue.offer(v);
122+
drain();
86123
}
87124

88125
@Override
@@ -91,9 +128,9 @@ public void onError(Throwable t) {
91128
RxJavaPlugins.onError(t);
92129
return;
93130
}
131+
error = t;
94132
done = true;
95-
value = null;
96-
actual.onError(t);
133+
drain();
97134
}
98135

99136
@Override
@@ -102,7 +139,104 @@ public void onComplete() {
102139
return;
103140
}
104141
done = true;
105-
complete(value);
142+
drain();
143+
}
144+
145+
@Override
146+
public void cancel() {
147+
cancelled = true;
148+
s.cancel();
149+
if (getAndIncrement() == 0) {
150+
queue.clear();
151+
}
152+
}
153+
154+
@Override
155+
public void request(long n) {
156+
if (SubscriptionHelper.validate(n)) {
157+
BackpressureHelper.add(requested, n);
158+
drain();
159+
}
160+
}
161+
162+
void drain() {
163+
if (getAndIncrement() != 0) {
164+
return;
165+
}
166+
167+
int missed = 1;
168+
Subscriber<? super R> a = actual;
169+
SimplePlainQueue<R> q = queue;
170+
int lim = limit;
171+
int c = consumed;
172+
173+
for (;;) {
174+
175+
long r = requested.get();
176+
long e = 0L;
177+
178+
while (e != r) {
179+
if (cancelled) {
180+
q.clear();
181+
return;
182+
}
183+
boolean d = done;
184+
185+
if (d) {
186+
Throwable ex = error;
187+
if (ex != null) {
188+
q.clear();
189+
a.onError(ex);
190+
return;
191+
}
192+
}
193+
194+
R v = q.poll();
195+
boolean empty = v == null;
196+
197+
if (d && empty) {
198+
a.onComplete();
199+
return;
200+
}
201+
202+
if (empty) {
203+
break;
204+
}
205+
206+
a.onNext(v);
207+
208+
e++;
209+
if (++c == lim) {
210+
c = 0;
211+
s.request(lim);
212+
}
213+
}
214+
215+
if (e == r) {
216+
if (done) {
217+
Throwable ex = error;
218+
if (ex != null) {
219+
q.clear();
220+
a.onError(ex);
221+
return;
222+
}
223+
if (q.isEmpty()) {
224+
a.onComplete();
225+
return;
226+
}
227+
}
228+
}
229+
230+
if (e != 0L) {
231+
BackpressureHelper.produced(requested, e);
232+
}
233+
234+
consumed = c;
235+
missed = addAndGet(-missed);
236+
if (missed == 0) {
237+
break;
238+
}
239+
}
106240
}
107241
}
108242
}

0 commit comments

Comments
 (0)