Skip to content

Commit a856572

Browse files
authored
2.x: Subject/Processor improvements & small cleanup (#4437)
1 parent f97c50d commit a856572

38 files changed

+2321
-2213
lines changed

src/main/java/io/reactivex/Flowable.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -3757,25 +3757,29 @@ public static <T, D> Flowable<T> using(Callable<? extends D> resourceSupplier,
37573757
* the parameter name.
37583758
* @param value the value to validate
37593759
* @param paramName the parameter name of the value
3760+
* @return the value
37603761
* @throws IllegalArgumentException if bufferSize &lt;= 0
37613762
*/
3762-
private static void verifyPositive(int value, String paramName) {
3763+
protected static int verifyPositive(int value, String paramName) {
37633764
if (value <= 0) {
37643765
throw new IllegalArgumentException(paramName + " > 0 required but it was " + value);
37653766
}
3767+
return value;
37663768
}
37673769

37683770
/**
37693771
* Validate that the given value is positive or report an IllegalArgumentException with
37703772
* the parameter name.
37713773
* @param value the value to validate
37723774
* @param paramName the parameter name of the value
3775+
* @return the value
37733776
* @throws IllegalArgumentException if bufferSize &lt;= 0
37743777
*/
3775-
private static void verifyPositive(long value, String paramName) {
3778+
protected static long verifyPositive(long value, String paramName) {
37763779
if (value <= 0L) {
37773780
throw new IllegalArgumentException(paramName + " > 0 required but it was " + value);
37783781
}
3782+
return value;
37793783
}
37803784

37813785
/**

src/main/java/io/reactivex/Observable.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -3307,25 +3307,29 @@ public static <T, D> Observable<T> using(Callable<? extends D> resourceSupplier,
33073307
* the parameter name.
33083308
* @param value the value to validate
33093309
* @param paramName the parameter name of the value
3310+
* @return value
33103311
* @throws IllegalArgumentException if bufferSize &lt;= 0
33113312
*/
3312-
private static void verifyPositive(int value, String paramName) {
3313+
protected static int verifyPositive(int value, String paramName) {
33133314
if (value <= 0) {
33143315
throw new IllegalArgumentException(paramName + " > 0 required but it was " + value);
33153316
}
3317+
return value;
33163318
}
33173319

33183320
/**
33193321
* Validate that the given value is positive or report an IllegalArgumentException with
33203322
* the parameter name.
33213323
* @param value the value to validate
33223324
* @param paramName the parameter name of the value
3325+
* @return value
33233326
* @throws IllegalArgumentException if bufferSize &lt;= 0
33243327
*/
3325-
private static void verifyPositive(long value, String paramName) {
3328+
protected static long verifyPositive(long value, String paramName) {
33263329
if (value <= 0L) {
33273330
throw new IllegalArgumentException(paramName + " > 0 required but it was " + value);
33283331
}
3332+
return value;
33293333
}
33303334

33313335
/**

src/main/java/io/reactivex/internal/operators/flowable/FlowableGroupJoin.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,7 @@ void drain() {
241241
@SuppressWarnings("unchecked")
242242
TLeft left = (TLeft)val;
243243

244-
UnicastProcessor<TRight> up = new UnicastProcessor<TRight>();
244+
UnicastProcessor<TRight> up = UnicastProcessor.<TRight>create();
245245
int idx = leftIndex++;
246246
lefts.put(idx, up);
247247

src/main/java/io/reactivex/internal/operators/flowable/FlowableRepeatWhen.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ public void subscribeActual(Subscriber<? super T> s) {
3939

4040
SerializedSubscriber<T> z = new SerializedSubscriber<T>(s);
4141

42-
FlowableProcessor<Object> processor = new UnicastProcessor<Object>(8).toSerialized();
42+
FlowableProcessor<Object> processor = UnicastProcessor.<Object>create(8).toSerialized();
4343

4444
Publisher<?> when;
4545

src/main/java/io/reactivex/internal/operators/flowable/FlowableRetryWhen.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public FlowableRetryWhen(Publisher<T> source,
3737
public void subscribeActual(Subscriber<? super T> s) {
3838
SerializedSubscriber<T> z = new SerializedSubscriber<T>(s);
3939

40-
FlowableProcessor<Throwable> processor = new UnicastProcessor<Throwable>(8).toSerialized();
40+
FlowableProcessor<Throwable> processor = UnicastProcessor.<Throwable>create(8).toSerialized();
4141

4242
Publisher<?> when;
4343

src/main/java/io/reactivex/internal/operators/flowable/FlowableWindow.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ public void onNext(T t) {
102102
if (i == 0) {
103103
getAndIncrement();
104104

105-
w = new UnicastProcessor<T>(bufferSize, this);
105+
w = UnicastProcessor.<T>create(bufferSize, this);
106106
window = w;
107107

108108
actual.onNext(w);
@@ -232,7 +232,7 @@ public void onNext(T t) {
232232
getAndIncrement();
233233

234234

235-
w = new UnicastProcessor<T>(bufferSize, this);
235+
w = UnicastProcessor.<T>create(bufferSize, this);
236236
window = w;
237237

238238
actual.onNext(w);
@@ -388,7 +388,7 @@ public void onNext(T t) {
388388
if (!cancelled) {
389389
getAndIncrement();
390390

391-
UnicastProcessor<T> w = new UnicastProcessor<T>(bufferSize, this);
391+
UnicastProcessor<T> w = UnicastProcessor.<T>create(bufferSize, this);
392392

393393
windows.offer(w);
394394

src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowBoundary.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ public void onSubscribe(Subscription s) {
8686
return;
8787
}
8888

89-
UnicastProcessor<T> w = new UnicastProcessor<T>(bufferSize);
89+
UnicastProcessor<T> w = UnicastProcessor.<T>create(bufferSize);
9090

9191
long r = requested();
9292
if (r != 0L) {
@@ -226,7 +226,7 @@ void drainLoop() {
226226
continue;
227227
}
228228

229-
w = new UnicastProcessor<T>(bufferSize);
229+
w = UnicastProcessor.<T>create(bufferSize);
230230

231231
long r = requested();
232232
if (r != 0L) {

src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowBoundarySelector.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,7 @@ void drainLoop() {
263263
}
264264

265265

266-
w = new UnicastProcessor<T>(bufferSize);
266+
w = UnicastProcessor.<T>create(bufferSize);
267267

268268
long r = requested();
269269
if (r != 0L) {

src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowBoundarySupplier.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ public void onSubscribe(Subscription s) {
104104
return;
105105
}
106106

107-
UnicastProcessor<T> w = new UnicastProcessor<T>(bufferSize);
107+
UnicastProcessor<T> w = UnicastProcessor.<T>create(bufferSize);
108108

109109
long r = requested();
110110
if (r != 0L) {
@@ -262,7 +262,7 @@ void drainLoop() {
262262
return;
263263
}
264264

265-
w = new UnicastProcessor<T>(bufferSize);
265+
w = UnicastProcessor.<T>create(bufferSize);
266266

267267
long r = requested();
268268
if (r != 0L) {

src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowTimed.java

+9-9
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ public void onSubscribe(Subscription s) {
112112
}
113113
this.s = s;
114114

115-
window = new UnicastProcessor<T>(bufferSize);
115+
window = UnicastProcessor.<T>create(bufferSize);
116116

117117
Subscriber<? super Flowable<T>> a = actual;
118118
a.onSubscribe(this);
@@ -267,7 +267,7 @@ void drainLoop() {
267267
if (o == NEXT) {
268268
w.onComplete();
269269
if (!term) {
270-
w = new UnicastProcessor<T>(bufferSize);
270+
w = UnicastProcessor.<T>create(bufferSize);
271271
window = w;
272272

273273
long r = requested();
@@ -362,7 +362,7 @@ public void onSubscribe(Subscription s) {
362362
return;
363363
}
364364

365-
UnicastProcessor<T> w = new UnicastProcessor<T>(bufferSize);
365+
UnicastProcessor<T> w = UnicastProcessor.<T>create(bufferSize);
366366
window = w;
367367

368368
long r = requested();
@@ -416,7 +416,7 @@ public void onNext(T t) {
416416
long r = requested();
417417

418418
if (r != 0L) {
419-
w = new UnicastProcessor<T>(bufferSize);
419+
w = UnicastProcessor.<T>create(bufferSize);
420420
window = w;
421421
actual.onNext(w);
422422
if (r != Long.MAX_VALUE) {
@@ -558,7 +558,7 @@ void drainLoop() {
558558
if (isHolder) {
559559
ConsumerIndexHolder consumerIndexHolder = (ConsumerIndexHolder) o;
560560
if (producerIndex == consumerIndexHolder.index) {
561-
w = new UnicastProcessor<T>(bufferSize);
561+
w = UnicastProcessor.<T>create(bufferSize);
562562
window = w;
563563

564564
long r = requested();
@@ -591,7 +591,7 @@ void drainLoop() {
591591
long r = requested();
592592

593593
if (r != 0L) {
594-
w = new UnicastProcessor<T>(bufferSize);
594+
w = UnicastProcessor.<T>create(bufferSize);
595595
window = w;
596596
actual.onNext(w);
597597
if (r != Long.MAX_VALUE) {
@@ -699,7 +699,7 @@ public void onSubscribe(Subscription s) {
699699

700700
long r = requested();
701701
if (r != 0L) {
702-
final UnicastProcessor<T> w = new UnicastProcessor<T>(bufferSize);
702+
final UnicastProcessor<T> w = UnicastProcessor.<T>create(bufferSize);
703703
windows.add(w);
704704

705705
actual.onNext(w);
@@ -864,7 +864,7 @@ void drainLoop() {
864864

865865
long r = requested();
866866
if (r != 0L) {
867-
final UnicastProcessor<T> w = new UnicastProcessor<T>(bufferSize);
867+
final UnicastProcessor<T> w = UnicastProcessor.<T>create(bufferSize);
868868
ws.add(w);
869869
a.onNext(w);
870870
if (r != Long.MAX_VALUE) {
@@ -906,7 +906,7 @@ public void run() {
906906
@Override
907907
public void run() {
908908

909-
UnicastProcessor<T> w = new UnicastProcessor<T>(bufferSize);
909+
UnicastProcessor<T> w = UnicastProcessor.<T>create(bufferSize);
910910

911911
SubjectWork<T> sw = new SubjectWork<T>(w, true);
912912
if (!cancelled) {

src/main/java/io/reactivex/internal/operators/observable/ObservableFromArray.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515

1616
import io.reactivex.*;
1717
import io.reactivex.internal.functions.ObjectHelper;
18-
import io.reactivex.internal.subscribers.observable.BaseQueueDisposable;
18+
import io.reactivex.internal.subscribers.observable.BasicQueueDisposable;
1919

2020
public final class ObservableFromArray<T> extends Observable<T> {
2121
final T[] array;
@@ -38,7 +38,7 @@ public void subscribeActual(Observer<? super T> s) {
3838
d.run();
3939
}
4040

41-
static final class FromArrayDisposable<T> extends BaseQueueDisposable<T> {
41+
static final class FromArrayDisposable<T> extends BasicQueueDisposable<T> {
4242

4343
final Observer<? super T> actual;
4444

src/main/java/io/reactivex/internal/operators/observable/ObservableFromIterable.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import io.reactivex.exceptions.Exceptions;
2020
import io.reactivex.internal.disposables.EmptyDisposable;
2121
import io.reactivex.internal.functions.ObjectHelper;
22-
import io.reactivex.internal.subscribers.observable.BaseQueueDisposable;
22+
import io.reactivex.internal.subscribers.observable.BasicQueueDisposable;
2323

2424
public final class ObservableFromIterable<T> extends Observable<T> {
2525
final Iterable<? extends T> source;
@@ -58,7 +58,7 @@ public void subscribeActual(Observer<? super T> s) {
5858
}
5959
}
6060

61-
static final class FromIterableDisposable<T> extends BaseQueueDisposable<T> {
61+
static final class FromIterableDisposable<T> extends BasicQueueDisposable<T> {
6262

6363
final Observer<? super T> actual;
6464

src/main/java/io/reactivex/internal/subscribers/observable/BaseIntQueueDisposable.java src/main/java/io/reactivex/internal/subscribers/observable/BasicIntQueueDisposable.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
* that defaults all unnecessary Queue methods to throw UnsupportedOperationException.
2323
* @param <T> the output value type
2424
*/
25-
public abstract class BaseIntQueueDisposable<T>
25+
public abstract class BasicIntQueueDisposable<T>
2626
extends AtomicInteger
2727
implements QueueDisposable<T> {
2828

src/main/java/io/reactivex/internal/subscribers/observable/BaseQueueDisposable.java src/main/java/io/reactivex/internal/subscribers/observable/BasicQueueDisposable.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
* unnecessary Queue methods to throw UnsupportedOperationException.
2121
* @param <T> the output value type
2222
*/
23-
public abstract class BaseQueueDisposable<T> implements QueueDisposable<T> {
23+
public abstract class BasicQueueDisposable<T> implements QueueDisposable<T> {
2424

2525
@Override
2626
public final boolean offer(T e) {

0 commit comments

Comments
 (0)