Skip to content

Commit 1a362f0

Browse files
committed
ConnectableObservable and autoConnect
1 parent 6bbbb17 commit 1a362f0

File tree

2 files changed

+182
-0
lines changed

2 files changed

+182
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/**
2+
* Copyright 2015 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators;
15+
16+
import java.util.concurrent.atomic.AtomicInteger;
17+
import java.util.function.Consumer;
18+
19+
import org.reactivestreams.*;
20+
21+
import io.reactivex.disposables.Disposable;
22+
import io.reactivex.observables.ConnectableObservable;
23+
24+
/**
25+
* Wraps a ConnectableObservable and calls its connect() method once
26+
* the specified number of Subscribers have subscribed.
27+
*
28+
* @param <T> the value type of the chain
29+
*/
30+
public final class PublisherAutoConnect<T> implements Publisher<T> {
31+
final ConnectableObservable<? extends T> source;
32+
final int numberOfSubscribers;
33+
final Consumer<? super Disposable> connection;
34+
final AtomicInteger clients;
35+
36+
public PublisherAutoConnect(ConnectableObservable<? extends T> source,
37+
int numberOfSubscribers,
38+
Consumer<? super Disposable> connection) {
39+
if (numberOfSubscribers <= 0) {
40+
throw new IllegalArgumentException("numberOfSubscribers > 0 required");
41+
}
42+
this.source = source;
43+
this.numberOfSubscribers = numberOfSubscribers;
44+
this.connection = connection;
45+
this.clients = new AtomicInteger();
46+
}
47+
48+
@Override
49+
public void subscribe(Subscriber<? super T> child) {
50+
source.unsafeSubscribe(child);
51+
if (clients.incrementAndGet() == numberOfSubscribers) {
52+
source.connect(connection);
53+
}
54+
}
55+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
/**
2+
* Copyright 2015 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.observables;
15+
16+
import java.util.function.Consumer;
17+
18+
import org.reactivestreams.*;
19+
20+
import io.reactivex.Observable;
21+
import io.reactivex.disposables.Disposable;
22+
import io.reactivex.internal.operators.PublisherAutoConnect;
23+
24+
/**
25+
* A {@code ConnectableObservable} resembles an ordinary {@link Observable}, except that it does not begin
26+
* emitting items when it is subscribed to, but only when its {@link #connect} method is called. In this way you
27+
* can wait for all intended {@link Subscriber}s to {@link Observable#subscribe} to the {@code Observable}
28+
* before the {@code Observable} begins emitting items.
29+
* <p>
30+
* <img width="640" height="510" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/publishConnect.png" alt="">
31+
*
32+
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Connectable-Observable-Operators">RxJava Wiki:
33+
* Connectable Observable Operators</a>
34+
* @param <T>
35+
* the type of items emitted by the {@code ConnectableObservable}
36+
*/
37+
public abstract class ConnectableObservable<T> extends Observable<T> {
38+
39+
protected ConnectableObservable(Publisher<T> onSubscribe) {
40+
super(onSubscribe);
41+
}
42+
43+
/**
44+
* Instructs the {@code ConnectableObservable} to begin emitting the items from its underlying
45+
* {@link Observable} to its {@link Subscriber}s.
46+
*
47+
* @param connection
48+
* the action that receives the connection subscription before the subscription to source happens
49+
* allowing the caller to synchronously disconnect a synchronous source
50+
* @see <a href="http://reactivex.io/documentation/operators/connect.html">ReactiveX documentation: Connect</a>
51+
*/
52+
public abstract void connect(Consumer<? super Disposable> connection);
53+
54+
/**
55+
* Instructs the {@code ConnectableObservable} to begin emitting the items from its underlying
56+
* {@link Observable} to its {@link Subscriber}s.
57+
* <p>
58+
* To disconnect from a synchronous source, use the {@link #connect(rx.functions.Action1)} method.
59+
*
60+
* @return the subscription representing the connection
61+
* @see <a href="http://reactivex.io/documentation/operators/connect.html">ReactiveX documentation: Connect</a>
62+
*/
63+
public final Disposable connect() {
64+
Disposable[] connection = new Disposable[1];
65+
connect(d -> connection[0] = d);
66+
return connection[0];
67+
}
68+
69+
/**
70+
* Returns an {@code Observable} that stays connected to this {@code ConnectableObservable} as long as there
71+
* is at least one subscription to this {@code ConnectableObservable}.
72+
*
73+
* @return a {@link Observable}
74+
* @see <a href="http://reactivex.io/documentation/operators/refcount.html">ReactiveX documentation: RefCount</a>
75+
*/
76+
public Observable<T> refCount() {
77+
// TODO implement RefCount
78+
// return create(new PublisherRefCount<T>(this));
79+
throw new UnsupportedOperationException();
80+
}
81+
82+
/**
83+
* Returns an Observable that automatically connects to this ConnectableObservable
84+
* when the first Subscriber subscribes.
85+
*
86+
* @return an Observable that automatically connects to this ConnectableObservable
87+
* when the first Subscriber subscribes
88+
*/
89+
public Observable<T> autoConnect() {
90+
return autoConnect(1);
91+
}
92+
/**
93+
* Returns an Observable that automatically connects to this ConnectableObservable
94+
* when the specified number of Subscribers subscribe to it.
95+
*
96+
* @param numberOfSubscribers the number of subscribers to await before calling connect
97+
* on the ConnectableObservable. A non-positive value indicates
98+
* an immediate connection.
99+
* @return an Observable that automatically connects to this ConnectableObservable
100+
* when the specified number of Subscribers subscribe to it
101+
*/
102+
public Observable<T> autoConnect(int numberOfSubscribers) {
103+
return autoConnect(numberOfSubscribers, c -> { });
104+
}
105+
106+
/**
107+
* Returns an Observable that automatically connects to this ConnectableObservable
108+
* when the specified number of Subscribers subscribe to it and calls the
109+
* specified callback with the Subscription associated with the established connection.
110+
*
111+
* @param numberOfSubscribers the number of subscribers to await before calling connect
112+
* on the ConnectableObservable. A non-positive value indicates
113+
* an immediate connection.
114+
* @param connection the callback Action1 that will receive the Subscription representing the
115+
* established connection
116+
* @return an Observable that automatically connects to this ConnectableObservable
117+
* when the specified number of Subscribers subscribe to it and calls the
118+
* specified callback with the Subscription associated with the established connection
119+
*/
120+
public Observable<T> autoConnect(int numberOfSubscribers, Consumer<? super Disposable> connection) {
121+
if (numberOfSubscribers <= 0) {
122+
this.connect(connection);
123+
return this;
124+
}
125+
return create(new PublisherAutoConnect<>(this, numberOfSubscribers, connection));
126+
}
127+
}

0 commit comments

Comments
 (0)