Skip to content

Commit 4577f1a

Browse files
bobvanderlindenakarnokd
authored andcommitted
2.x: use generic type instead of Object in combineLatest and zip (#4211)
1 parent 487a0ba commit 4577f1a

File tree

3 files changed

+28
-26
lines changed

3 files changed

+28
-26
lines changed

src/main/java/io/reactivex/Observable.java

+11-11
Original file line numberDiff line numberDiff line change
@@ -93,22 +93,22 @@ static int bufferSize() {
9393
}
9494

9595
@SchedulerSupport(SchedulerSupport.NONE)
96-
public static <T, R> Observable<R> combineLatest(Function<? super Object[], ? extends R> combiner, boolean delayError, int bufferSize, ObservableConsumable<? extends T>... sources) {
96+
public static <T, R> Observable<R> combineLatest(Function<? super T[], ? extends R> combiner, boolean delayError, int bufferSize, ObservableConsumable<? extends T>... sources) {
9797
return combineLatest(sources, combiner, delayError, bufferSize);
9898
}
9999

100100
@SchedulerSupport(SchedulerSupport.NONE)
101-
public static <T, R> Observable<R> combineLatest(Iterable<? extends ObservableConsumable<? extends T>> sources, Function<? super Object[], ? extends R> combiner) {
101+
public static <T, R> Observable<R> combineLatest(Iterable<? extends ObservableConsumable<? extends T>> sources, Function<? super T[], ? extends R> combiner) {
102102
return combineLatest(sources, combiner, false, bufferSize());
103103
}
104104

105105
@SchedulerSupport(SchedulerSupport.NONE)
106-
public static <T, R> Observable<R> combineLatest(Iterable<? extends ObservableConsumable<? extends T>> sources, Function<? super Object[], ? extends R> combiner, boolean delayError) {
106+
public static <T, R> Observable<R> combineLatest(Iterable<? extends ObservableConsumable<? extends T>> sources, Function<? super T[], ? extends R> combiner, boolean delayError) {
107107
return combineLatest(sources, combiner, delayError, bufferSize());
108108
}
109109

110110
@SchedulerSupport(SchedulerSupport.NONE)
111-
public static <T, R> Observable<R> combineLatest(Iterable<? extends ObservableConsumable<? extends T>> sources, Function<? super Object[], ? extends R> combiner, boolean delayError, int bufferSize) {
111+
public static <T, R> Observable<R> combineLatest(Iterable<? extends ObservableConsumable<? extends T>> sources, Function<? super T[], ? extends R> combiner, boolean delayError, int bufferSize) {
112112
Objects.requireNonNull(sources, "sources is null");
113113
Objects.requireNonNull(combiner, "combiner is null");
114114
validateBufferSize(bufferSize);
@@ -119,17 +119,17 @@ public static <T, R> Observable<R> combineLatest(Iterable<? extends ObservableCo
119119
}
120120

121121
@SchedulerSupport(SchedulerSupport.NONE)
122-
public static <T, R> Observable<R> combineLatest(ObservableConsumable<? extends T>[] sources, Function<? super Object[], ? extends R> combiner) {
122+
public static <T, R> Observable<R> combineLatest(ObservableConsumable<? extends T>[] sources, Function<? super T[], ? extends R> combiner) {
123123
return combineLatest(sources, combiner, false, bufferSize());
124124
}
125125

126126
@SchedulerSupport(SchedulerSupport.NONE)
127-
public static <T, R> Observable<R> combineLatest(ObservableConsumable<? extends T>[] sources, Function<? super Object[], ? extends R> combiner, boolean delayError) {
127+
public static <T, R> Observable<R> combineLatest(ObservableConsumable<? extends T>[] sources, Function<? super T[], ? extends R> combiner, boolean delayError) {
128128
return combineLatest(sources, combiner, delayError, bufferSize());
129129
}
130130

131131
@SchedulerSupport(SchedulerSupport.NONE)
132-
public static <T, R> Observable<R> combineLatest(ObservableConsumable<? extends T>[] sources, Function<? super Object[], ? extends R> combiner, boolean delayError, int bufferSize) {
132+
public static <T, R> Observable<R> combineLatest(ObservableConsumable<? extends T>[] sources, Function<? super T[], ? extends R> combiner, boolean delayError, int bufferSize) {
133133
validateBufferSize(bufferSize);
134134
Objects.requireNonNull(combiner, "combiner is null");
135135
if (sources.length == 0) {
@@ -962,15 +962,15 @@ private static void validateBufferSize(int bufferSize) {
962962
}
963963

964964
@SchedulerSupport(SchedulerSupport.NONE)
965-
public static <T, R> Observable<R> zip(Iterable<? extends ObservableConsumable<? extends T>> sources, Function<? super Object[], ? extends R> zipper) {
965+
public static <T, R> Observable<R> zip(Iterable<? extends ObservableConsumable<? extends T>> sources, Function<? super T[], ? extends R> zipper) {
966966
Objects.requireNonNull(zipper, "zipper is null");
967967
Objects.requireNonNull(sources, "sources is null");
968968
return new ObservableZip<T, R>(null, sources, zipper, bufferSize(), false);
969969
}
970970

971971
@SuppressWarnings({ "rawtypes", "unchecked" })
972972
@SchedulerSupport(SchedulerSupport.NONE)
973-
public static <T, R> Observable<R> zip(ObservableConsumable<? extends ObservableConsumable<? extends T>> sources, final Function<Object[], R> zipper) {
973+
public static <T, R> Observable<R> zip(ObservableConsumable<? extends ObservableConsumable<? extends T>> sources, final Function<T[], R> zipper) {
974974
Objects.requireNonNull(zipper, "zipper is null");
975975
Objects.requireNonNull(sources, "sources is null");
976976
// FIXME don't want to fiddle with manual type inference, this will be inlined later anyway
@@ -1073,7 +1073,7 @@ public static <T1, T2, T3, T4, T5, T6, T7, T8, T9, R> Observable<R> zip(
10731073
}
10741074

10751075
@SchedulerSupport(SchedulerSupport.NONE)
1076-
public static <T, R> Observable<R> zipArray(Function<? super Object[], ? extends R> zipper,
1076+
public static <T, R> Observable<R> zipArray(Function<? super T[], ? extends R> zipper,
10771077
boolean delayError, int bufferSize, ObservableConsumable<? extends T>... sources) {
10781078
if (sources.length == 0) {
10791079
return empty();
@@ -1084,7 +1084,7 @@ public static <T, R> Observable<R> zipArray(Function<? super Object[], ? extends
10841084
}
10851085

10861086
@SchedulerSupport(SchedulerSupport.NONE)
1087-
public static <T, R> Observable<R> zipIterable(Function<? super Object[], ? extends R> zipper,
1087+
public static <T, R> Observable<R> zipIterable(Function<? super T[], ? extends R> zipper,
10881088
boolean delayError, int bufferSize,
10891089
Iterable<? extends ObservableConsumable<? extends T>> sources) {
10901090
Objects.requireNonNull(zipper, "zipper is null");

src/main/java/io/reactivex/internal/operators/observable/ObservableCombineLatest.java

+10-8
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,13 @@
2929
public final class ObservableCombineLatest<T, R> extends Observable<R> {
3030
final ObservableConsumable<? extends T>[] sources;
3131
final Iterable<? extends ObservableConsumable<? extends T>> sourcesIterable;
32-
final Function<? super Object[], ? extends R> combiner;
32+
final Function<? super T[], ? extends R> combiner;
3333
final int bufferSize;
3434
final boolean delayError;
3535

3636
public ObservableCombineLatest(ObservableConsumable<? extends T>[] sources,
3737
Iterable<? extends ObservableConsumable<? extends T>> sourcesIterable,
38-
Function<? super Object[], ? extends R> combiner, int bufferSize,
38+
Function<? super T[], ? extends R> combiner, int bufferSize,
3939
boolean delayError) {
4040
this.sources = sources;
4141
this.sourcesIterable = sourcesIterable;
@@ -77,11 +77,11 @@ static final class LatestCoordinator<T, R> extends AtomicInteger implements Disp
7777
/** */
7878
private static final long serialVersionUID = 8567835998786448817L;
7979
final Observer<? super R> actual;
80-
final Function<? super Object[], ? extends R> combiner;
80+
final Function<? super T[], ? extends R> combiner;
8181
final int count;
8282
final CombinerSubscriber<T, R>[] subscribers;
8383
final int bufferSize;
84-
final Object[] latest;
84+
final T[] latest;
8585
final SpscLinkedArrayQueue<Object> queue;
8686
final boolean delayError;
8787

@@ -96,14 +96,14 @@ static final class LatestCoordinator<T, R> extends AtomicInteger implements Disp
9696

9797
@SuppressWarnings("unchecked")
9898
public LatestCoordinator(Observer<? super R> actual,
99-
Function<? super Object[], ? extends R> combiner,
99+
Function<? super T[], ? extends R> combiner,
100100
int count, int bufferSize, boolean delayError) {
101101
this.actual = actual;
102102
this.combiner = combiner;
103103
this.count = count;
104104
this.bufferSize = bufferSize;
105105
this.delayError = delayError;
106-
this.latest = new Object[count];
106+
this.latest = (T[])new Object[count];
107107
this.subscribers = new CombinerSubscriber[count];
108108
this.queue = new SpscLinkedArrayQueue<Object>(bufferSize);
109109
}
@@ -167,7 +167,7 @@ void combine(T value, int index) {
167167
return;
168168
}
169169
len = latest.length;
170-
Object o = latest[index];
170+
T o = latest[index];
171171
a = active;
172172
if (o == null) {
173173
active = ++a;
@@ -230,7 +230,9 @@ void drain() {
230230
}
231231

232232
q.poll();
233-
Object[] array = (Object[])q.poll();
233+
234+
@SuppressWarnings("unchecked")
235+
T[] array = (T[])q.poll();
234236

235237
if (array == null) {
236238
cancelled = true;

src/main/java/io/reactivex/internal/operators/observable/ObservableZip.java

+7-7
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,13 @@ public final class ObservableZip<T, R> extends Observable<R> {
2626

2727
final ObservableConsumable<? extends T>[] sources;
2828
final Iterable<? extends ObservableConsumable<? extends T>> sourcesIterable;
29-
final Function<? super Object[], ? extends R> zipper;
29+
final Function<? super T[], ? extends R> zipper;
3030
final int bufferSize;
3131
final boolean delayError;
3232

3333
public ObservableZip(ObservableConsumable<? extends T>[] sources,
3434
Iterable<? extends ObservableConsumable<? extends T>> sourcesIterable,
35-
Function<? super Object[], ? extends R> zipper,
35+
Function<? super T[], ? extends R> zipper,
3636
int bufferSize,
3737
boolean delayError) {
3838
this.sources = sources;
@@ -74,21 +74,21 @@ static final class ZipCoordinator<T, R> extends AtomicInteger implements Disposa
7474
/** */
7575
private static final long serialVersionUID = 2983708048395377667L;
7676
final Observer<? super R> actual;
77-
final Function<? super Object[], ? extends R> zipper;
77+
final Function<? super T[], ? extends R> zipper;
7878
final ZipSubscriber<T, R>[] subscribers;
79-
final Object[] row;
79+
final T[] row;
8080
final boolean delayError;
8181

8282
volatile boolean cancelled;
8383

8484
@SuppressWarnings("unchecked")
8585
public ZipCoordinator(Observer<? super R> actual,
86-
Function<? super Object[], ? extends R> zipper,
86+
Function<? super T[], ? extends R> zipper,
8787
int count, boolean delayError) {
8888
this.actual = actual;
8989
this.zipper = zipper;
9090
this.subscribers = new ZipSubscriber[count];
91-
this.row = new Object[count];
91+
this.row = (T[])new Object[count];
9292
this.delayError = delayError;
9393
}
9494

@@ -140,7 +140,7 @@ public void drain() {
140140

141141
final ZipSubscriber<T, R>[] zs = subscribers;
142142
final Observer<? super R> a = actual;
143-
final Object[] os = row;
143+
final T[] os = row;
144144
final boolean delayError = this.delayError;
145145

146146
for (;;) {

0 commit comments

Comments
 (0)