Skip to content

Commit a30fb4d

Browse files
Pratakarnokd
Prat
authored andcommitted
2.x: Add Completable.fromRunnable() (ReactiveX#4629)
1 parent 64f335c commit a30fb4d

File tree

3 files changed

+95
-0
lines changed

3 files changed

+95
-0
lines changed

src/main/java/io/reactivex/Completable.java

+17
Original file line numberDiff line numberDiff line change
@@ -339,6 +339,23 @@ public static Completable fromFuture(final Future<?> future) {
339339
return fromAction(Functions.futureAction(future));
340340
}
341341

342+
/**
343+
* Returns a Completable instance that runs the given Runnable for each subscriber and
344+
* emits either its exception or simply completes.
345+
* <dl>
346+
* <dt><b>Scheduler:</b></dt>
347+
* <dd>{@code fromRunnable} does not operate by default on a particular {@link Scheduler}.</dd>
348+
* </dl>
349+
* @param run the runnable to run for each subscriber
350+
* @return the new Completable instance
351+
* @throws NullPointerException if run is null
352+
*/
353+
@SchedulerSupport(SchedulerSupport.NONE)
354+
public static Completable fromRunnable(final Runnable run) {
355+
ObjectHelper.requireNonNull(run, "run is null");
356+
return RxJavaPlugins.onAssembly(new CompletableFromRunnable(run));
357+
}
358+
342359
/**
343360
* Returns a Completable instance that subscribes to the given Observable, ignores all values and
344361
* emits only the terminal event.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.completable;
15+
16+
import io.reactivex.Completable;
17+
import io.reactivex.CompletableObserver;
18+
import io.reactivex.disposables.Disposable;
19+
import io.reactivex.disposables.Disposables;
20+
import io.reactivex.exceptions.Exceptions;
21+
22+
public final class CompletableFromRunnable extends Completable {
23+
24+
final Runnable runnable;
25+
26+
public CompletableFromRunnable(Runnable runnable) {
27+
this.runnable = runnable;
28+
}
29+
30+
@Override
31+
protected void subscribeActual(CompletableObserver s) {
32+
Disposable d = Disposables.empty();
33+
s.onSubscribe(d);
34+
try {
35+
runnable.run();
36+
} catch (Throwable e) {
37+
Exceptions.throwIfFatal(e);
38+
if (!d.isDisposed()) {
39+
s.onError(e);
40+
}
41+
return;
42+
}
43+
if (!d.isDisposed()) {
44+
s.onComplete();
45+
}
46+
}
47+
}

src/test/java/io/reactivex/completable/CompletableTest.java

+31
Original file line numberDiff line numberDiff line change
@@ -4473,6 +4473,37 @@ public void run() {
44734473
}
44744474
}
44754475

4476+
@Test(expected = NullPointerException.class)
4477+
public void fromRunnableNull() {
4478+
Completable.fromRunnable(null);
4479+
}
4480+
4481+
@Test(timeout = 1000)
4482+
public void fromRunnableNormal() {
4483+
final AtomicInteger calls = new AtomicInteger();
4484+
4485+
Completable c = Completable.fromRunnable(new Runnable() {
4486+
@Override
4487+
public void run() {
4488+
calls.getAndIncrement();
4489+
}
4490+
});
4491+
4492+
c.blockingAwait();
4493+
4494+
Assert.assertEquals(1, calls.get());
4495+
}
4496+
4497+
@Test(timeout = 1000, expected = TestException.class)
4498+
public void fromRunnableThrows() {
4499+
Completable c = Completable.fromRunnable(new Runnable() {
4500+
@Override
4501+
public void run() { throw new TestException(); }
4502+
});
4503+
4504+
c.blockingAwait();
4505+
}
4506+
44764507
@Test(expected = NullPointerException.class)
44774508
public void doOnErrorNullValue() {
44784509
Completable.complete().doOnError(null);

0 commit comments

Comments
 (0)