26
26
* Exposes an Observable and Observer that increments n Integers and consumes them in a Blackhole.
27
27
*/
28
28
public abstract class InputWithIncrementingInteger {
29
+ final class DefaultSubscriberImpl extends DefaultSubscriber <Integer > {
30
+ @ Override
31
+ public void onComplete () {
32
+
33
+ }
34
+
35
+ @ Override
36
+ public void onError (Throwable e ) {
37
+
38
+ }
39
+
40
+ @ Override
41
+ public void onNext (Integer t ) {
42
+ bh .consume (t );
43
+ }
44
+ }
45
+
46
+ final class IncrementingIterable implements Iterable <Integer > {
47
+ private final class IncrementingIterator implements Iterator <Integer > {
48
+ int i ;
49
+
50
+ @ Override
51
+ public boolean hasNext () {
52
+ return i < size ;
53
+ }
54
+
55
+ @ Override
56
+ public Integer next () {
57
+ Blackhole .consumeCPU (10 );
58
+ return i ++;
59
+ }
60
+
61
+ @ Override
62
+ public void remove () {
63
+
64
+ }
65
+ }
66
+
67
+ private final int size ;
68
+
69
+ private IncrementingIterable (int size ) {
70
+ this .size = size ;
71
+ }
72
+
73
+ @ Override
74
+ public Iterator <Integer > iterator () {
75
+ return new IncrementingIterator ();
76
+ }
77
+ }
78
+
79
+ final class IncrementingPublisher implements Publisher <Integer > {
80
+ private final int size ;
81
+
82
+ IncrementingPublisher (int size ) {
83
+ this .size = size ;
84
+ }
85
+
86
+ @ Override
87
+ public void subscribe (Subscriber <? super Integer > s ) {
88
+ s .onSubscribe (EmptySubscription .INSTANCE );
89
+ for (int i = 0 ; i < size ; i ++) {
90
+ s .onNext (i );
91
+ }
92
+ s .onComplete ();
93
+ }
94
+ }
95
+
29
96
public Iterable <Integer > iterable ;
30
97
public Flowable <Integer > observable ;
31
98
public Flowable <Integer > firehose ;
@@ -39,42 +106,8 @@ public void setup(final Blackhole bh) {
39
106
final int size = getSize ();
40
107
observable = Flowable .range (0 , size );
41
108
42
- firehose = Flowable .unsafeCreate (new Publisher <Integer >() {
43
-
44
- @ Override
45
- public void subscribe (Subscriber <? super Integer > s ) {
46
- s .onSubscribe (EmptySubscription .INSTANCE );
47
- for (int i = 0 ; i < size ; i ++) {
48
- s .onNext (i );
49
- }
50
- s .onComplete ();
51
- }
52
-
53
- });
54
- iterable = new Iterable <Integer >() {
55
- @ Override
56
- public Iterator <Integer > iterator () {
57
- return new Iterator <Integer >() {
58
- int i ;
59
-
60
- @ Override
61
- public boolean hasNext () {
62
- return i < size ;
63
- }
64
-
65
- @ Override
66
- public Integer next () {
67
- Blackhole .consumeCPU (10 );
68
- return i ++;
69
- }
70
-
71
- @ Override
72
- public void remove () {
73
-
74
- }
75
- };
76
- }
77
- };
109
+ firehose = Flowable .unsafeCreate (new IncrementingPublisher (size ));
110
+ iterable = new IncrementingIterable (size );
78
111
79
112
}
80
113
@@ -83,24 +116,7 @@ public PerfSubscriber newLatchedObserver() {
83
116
}
84
117
85
118
public FlowableSubscriber <Integer > newSubscriber () {
86
- return new DefaultSubscriber <Integer >() {
87
-
88
- @ Override
89
- public void onComplete () {
90
-
91
- }
92
-
93
- @ Override
94
- public void onError (Throwable e ) {
95
-
96
- }
97
-
98
- @ Override
99
- public void onNext (Integer t ) {
100
- bh .consume (t );
101
- }
102
-
103
- };
119
+ return new DefaultSubscriberImpl ();
104
120
}
105
121
106
122
}
0 commit comments