Skip to content

Commit 56d5586

Browse files
vanniktechakarnokd
authored andcommitted
2.x: Fix MaybeTimber by using scheduler and unit (#4529)
* 2.x: Fix MaybeTimber by using scheduler and unit * Address issues * Use Star imports
1 parent 1236bf3 commit 56d5586

File tree

6 files changed

+227
-29
lines changed

6 files changed

+227
-29
lines changed

Diff for: src/main/java/io/reactivex/internal/operators/completable/CompletableTimer.java

+37-13
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,15 @@
1414
package io.reactivex.internal.operators.completable;
1515

1616
import java.util.concurrent.TimeUnit;
17+
import java.util.concurrent.atomic.AtomicReference;
1718

1819
import io.reactivex.*;
19-
import io.reactivex.internal.disposables.SequentialDisposable;
20+
import io.reactivex.disposables.Disposable;
21+
import io.reactivex.internal.disposables.DisposableHelper;
2022

23+
/**
24+
* Signals an {@code onCompleted} event after the specified delay
25+
*/
2126
public final class CompletableTimer extends Completable {
2227

2328
final long delay;
@@ -30,20 +35,39 @@ public CompletableTimer(long delay, TimeUnit unit, Scheduler scheduler) {
3035
this.scheduler = scheduler;
3136
}
3237

33-
34-
3538
@Override
3639
protected void subscribeActual(final CompletableObserver s) {
37-
SequentialDisposable sd = new SequentialDisposable();
38-
s.onSubscribe(sd);
39-
if (!sd.isDisposed()) {
40-
sd.replace(scheduler.scheduleDirect(new Runnable() {
41-
@Override
42-
public void run() {
43-
s.onComplete();
44-
}
45-
}, delay, unit));
46-
}
40+
TimerDisposable parent = new TimerDisposable(s);
41+
s.onSubscribe(parent);
42+
parent.setFuture(scheduler.scheduleDirect(parent, delay, unit));
4743
}
4844

45+
static final class TimerDisposable extends AtomicReference<Disposable> implements Disposable, Runnable {
46+
/** */
47+
private static final long serialVersionUID = 3167244060586201109L;
48+
final CompletableObserver actual;
49+
50+
TimerDisposable(final CompletableObserver actual) {
51+
this.actual = actual;
52+
}
53+
54+
@Override
55+
public void run() {
56+
actual.onComplete();
57+
}
58+
59+
@Override
60+
public void dispose() {
61+
DisposableHelper.dispose(this);
62+
}
63+
64+
@Override
65+
public boolean isDisposed() {
66+
return DisposableHelper.isDisposed(get());
67+
}
68+
69+
void setFuture(Disposable d) {
70+
DisposableHelper.replace(this, d);
71+
}
72+
}
4973
}

Diff for: src/main/java/io/reactivex/internal/operators/maybe/MaybeTimer.java

+4-6
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import io.reactivex.internal.disposables.DisposableHelper;
2222

2323
/**
24-
* Signals a 0L after the specified delay
24+
* Signals a {@code 0L} after the specified delay
2525
*/
2626
public final class MaybeTimer extends Maybe<Long> {
2727

@@ -38,20 +38,18 @@ public MaybeTimer(long delay, TimeUnit unit, Scheduler scheduler) {
3838
}
3939

4040
@Override
41-
protected void subscribeActual(MaybeObserver<? super Long> observer) {
41+
protected void subscribeActual(final MaybeObserver<? super Long> observer) {
4242
TimerDisposable parent = new TimerDisposable(observer);
4343
observer.onSubscribe(parent);
44-
45-
parent.setFuture(scheduler.scheduleDirect(parent));
44+
parent.setFuture(scheduler.scheduleDirect(parent, delay, unit));
4645
}
4746

4847
static final class TimerDisposable extends AtomicReference<Disposable> implements Disposable, Runnable {
49-
5048
/** */
5149
private static final long serialVersionUID = 2875964065294031672L;
5250
final MaybeObserver<? super Long> actual;
5351

54-
public TimerDisposable(MaybeObserver<? super Long> actual) {
52+
TimerDisposable(final MaybeObserver<? super Long> actual) {
5553
this.actual = actual;
5654
}
5755

Diff for: src/main/java/io/reactivex/internal/operators/single/SingleTimer.java

+36-10
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,15 @@
1414
package io.reactivex.internal.operators.single;
1515

1616
import java.util.concurrent.TimeUnit;
17+
import java.util.concurrent.atomic.AtomicReference;
1718

1819
import io.reactivex.*;
19-
import io.reactivex.internal.disposables.SequentialDisposable;
20+
import io.reactivex.disposables.Disposable;
21+
import io.reactivex.internal.disposables.DisposableHelper;
2022

23+
/**
24+
* Signals a {@code 0L} after the specified delay
25+
*/
2126
public final class SingleTimer extends Single<Long> {
2227

2328
final long delay;
@@ -32,16 +37,37 @@ public SingleTimer(long delay, TimeUnit unit, Scheduler scheduler) {
3237

3338
@Override
3439
protected void subscribeActual(final SingleObserver<? super Long> s) {
35-
SequentialDisposable sd = new SequentialDisposable();
40+
TimerDisposable parent = new TimerDisposable(s);
41+
s.onSubscribe(parent);
42+
parent.setFuture(scheduler.scheduleDirect(parent, delay, unit));
43+
}
3644

37-
s.onSubscribe(sd);
45+
static final class TimerDisposable extends AtomicReference<Disposable> implements Disposable, Runnable {
46+
/** */
47+
private static final long serialVersionUID = 8465401857522493082L;
48+
final SingleObserver<? super Long> actual;
3849

39-
sd.replace(scheduler.scheduleDirect(new Runnable() {
40-
@Override
41-
public void run() {
42-
s.onSuccess(0L);
43-
}
44-
}, delay, unit));
45-
}
50+
TimerDisposable(final SingleObserver<? super Long> actual) {
51+
this.actual = actual;
52+
}
4653

54+
@Override
55+
public void run() {
56+
actual.onSuccess(0L);
57+
}
58+
59+
@Override
60+
public void dispose() {
61+
DisposableHelper.dispose(this);
62+
}
63+
64+
@Override
65+
public boolean isDisposed() {
66+
return DisposableHelper.isDisposed(get());
67+
}
68+
69+
void setFuture(Disposable d) {
70+
DisposableHelper.replace(this, d);
71+
}
72+
}
4773
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.completable;
15+
16+
import org.junit.Test;
17+
18+
import java.util.concurrent.TimeUnit;
19+
import java.util.concurrent.atomic.AtomicLong;
20+
21+
import io.reactivex.Completable;
22+
import io.reactivex.functions.Action;
23+
import io.reactivex.schedulers.TestScheduler;
24+
25+
import static org.junit.Assert.assertEquals;
26+
27+
public class CompletableTimerTest {
28+
@Test
29+
public void timer() {
30+
final TestScheduler testScheduler = new TestScheduler();
31+
32+
final AtomicLong atomicLong = new AtomicLong();
33+
Completable.timer(2, TimeUnit.SECONDS, testScheduler).subscribe(new Action() {
34+
@Override
35+
public void run() throws Exception {
36+
atomicLong.incrementAndGet();
37+
}
38+
});
39+
40+
assertEquals(0, atomicLong.get());
41+
42+
testScheduler.advanceTimeBy(1, TimeUnit.SECONDS);
43+
44+
assertEquals(0, atomicLong.get());
45+
46+
testScheduler.advanceTimeBy(1, TimeUnit.SECONDS);
47+
48+
assertEquals(1, atomicLong.get());
49+
}
50+
}

Diff for: src/test/java/io/reactivex/maybe/MaybeTimerTest.java

+50
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.maybe;
15+
16+
import org.junit.Test;
17+
18+
import java.util.concurrent.TimeUnit;
19+
import java.util.concurrent.atomic.AtomicLong;
20+
21+
import io.reactivex.Maybe;
22+
import io.reactivex.functions.Consumer;
23+
import io.reactivex.schedulers.TestScheduler;
24+
25+
import static org.junit.Assert.assertEquals;
26+
27+
public class MaybeTimerTest {
28+
@Test
29+
public void timer() {
30+
final TestScheduler testScheduler = new TestScheduler();
31+
32+
final AtomicLong atomicLong = new AtomicLong();
33+
Maybe.timer(2, TimeUnit.SECONDS, testScheduler).subscribe(new Consumer<Long>() {
34+
@Override
35+
public void accept(final Long value) throws Exception {
36+
atomicLong.incrementAndGet();
37+
}
38+
});
39+
40+
assertEquals(0, atomicLong.get());
41+
42+
testScheduler.advanceTimeBy(1, TimeUnit.SECONDS);
43+
44+
assertEquals(0, atomicLong.get());
45+
46+
testScheduler.advanceTimeBy(1, TimeUnit.SECONDS);
47+
48+
assertEquals(1, atomicLong.get());
49+
}
50+
}
+50
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.single;
15+
16+
import org.junit.Test;
17+
18+
import java.util.concurrent.TimeUnit;
19+
import java.util.concurrent.atomic.AtomicLong;
20+
21+
import io.reactivex.Single;
22+
import io.reactivex.functions.Consumer;
23+
import io.reactivex.schedulers.TestScheduler;
24+
25+
import static org.junit.Assert.assertEquals;
26+
27+
public class SingleTimerTest {
28+
@Test
29+
public void timer() {
30+
final TestScheduler testScheduler = new TestScheduler();
31+
32+
final AtomicLong atomicLong = new AtomicLong();
33+
Single.timer(2, TimeUnit.SECONDS, testScheduler).subscribe(new Consumer<Long>() {
34+
@Override
35+
public void accept(final Long value) throws Exception {
36+
atomicLong.incrementAndGet();
37+
}
38+
});
39+
40+
assertEquals(0, atomicLong.get());
41+
42+
testScheduler.advanceTimeBy(1, TimeUnit.SECONDS);
43+
44+
assertEquals(0, atomicLong.get());
45+
46+
testScheduler.advanceTimeBy(1, TimeUnit.SECONDS);
47+
48+
assertEquals(1, atomicLong.get());
49+
}
50+
}

0 commit comments

Comments
 (0)