You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
`ScalarCallable` is also `Callable` and thus its value can be extracted practically anytime. For convenience (and for sense), `ScalarCallable` overrides and hides the superclass' `throws Exception` clause - throwing during assembly time is likely unreasonable for scalars.
530
530
531
-
Since Reactive-Streams doesn't allow `null`s in the value flow, we have the opportunity to define `ScalarCallable`s and `Callable`s returning `null` should be considered as an empty source - allowing operators to dispatch on the type `Callable` first then branch on the nullness of `call()`.
531
+
Since ReactiveStreams doesn't allow `null`s in the value flow, we have the opportunity to define `ScalarCallable`s and `Callable`s returning `null` should be considered as an empty source - allowing operators to dispatch on the type `Callable` first then branch on the nullness of `call()`.
532
532
533
533
Interoperating with other libraries, at this level is possible. Reactor-Core uses the same pattern and the two libraries can work with each other's `Publisher+Callable` types. Unfortunately, this means subscription-time only fusion as `ScalarCallable`s live locally in each library.
534
534
535
535
##### Micro-fusion
536
536
537
-
Micro-fusion goes a step deeper and tries to reuse internal structures, mostly queues, in operator pairs, saving on allocation and sometimes on atomic operations. It's property is that, in a way, subverts the standard Reactive-Streams protocol between subsequent operators that both support fusion. However, from the outside world's view, they still work according to the RS protocol.
537
+
Micro-fusion goes a step deeper and tries to reuse internal structures, mostly queues, in operator pairs, saving on allocation and sometimes on atomic operations. It's property is that, in a way, subverts the standard ReactiveStreams protocol between subsequent operators that both support fusion. However, from the outside world's view, they still work according to the RS protocol.
538
538
539
539
Currently, two main kinds of micro-fusion opportunities are available.
Copy file name to clipboardexpand all lines: README.md
+2-2
Original file line number
Diff line number
Diff line change
@@ -10,7 +10,7 @@ It extends the [observer pattern](http://en.wikipedia.org/wiki/Observer_pattern)
10
10
11
11
#### Version 2.x ([Javadoc](http://reactivex.io/RxJava/2.x/javadoc/))
12
12
13
-
- single dependency: [Reactive-Streams](https://github.com/reactive-streams/reactive-streams-jvm)
13
+
- single dependency: [ReactiveStreams](https://github.com/reactive-streams/reactive-streams-jvm)
14
14
- continued support for Java 6+ & [Android](https://github.com/ReactiveX/RxAndroid) 2.3+
15
15
- performance gains through design changes learned through the 1.x cycle and through [Reactive-Streams-Commons](https://github.com/reactor/reactive-streams-commons) research project.
16
16
- Java 8 lambda-friendly API
@@ -72,7 +72,7 @@ Flowable.just("Hello world")
72
72
73
73
RxJava 2 features several base classes you can discover operators on:
74
74
75
-
-[`io.reactivex.Flowable`](http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html): 0..N flows, supporting Reactive-Streams and backpressure
75
+
-[`io.reactivex.Flowable`](http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html): 0..N flows, supporting ReactiveStreams and backpressure
76
76
-[`io.reactivex.Observable`](http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Observable.html): 0..N flows, no backpressure,
77
77
-[`io.reactivex.Single`](http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Single.html): a flow of exactly 1 item or an error,
78
78
-[`io.reactivex.Completable`](http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Completable.html): a flow without items but only a completion or error signal,
*(If you have been following [my blog](http://akarnokd.blogspot.hu/) about RxJava internals, writing operators is maybe only 2 times harder than 1.x; some things have moved around, some tools popped up while others have been dropped but there is a relatively straight mapping from 1.x concepts and approaches to 2.x concepts and approaches.)*
42
42
43
-
In this article, I'll describe the how-to's from the perspective of a developer who skipped the 1.x knowledge base and basically wants to write operators that conforms the Reactive-Streams specification as well as RxJava 2.x's own extensions and additional expectations/requirements.
43
+
In this article, I'll describe the how-to's from the perspective of a developer who skipped the 1.x knowledge base and basically wants to write operators that conforms the ReactiveStreams specification as well as RxJava 2.x's own extensions and additional expectations/requirements.
44
44
45
45
Since **Reactor 3** has the same architecture as **RxJava 2** (no accident, I architected and contributed 80% of **Reactor 3** as well) the same principles outlined in this page applies to writing operators for **Reactor 3**. Note however that they chose different naming and locations for their utility and support classes so you may have to search for the equivalent components.
46
46
@@ -66,7 +66,7 @@ When dealing with backpressure in `Flowable` operators, one needs a way to accou
66
66
67
67
The naive approach for accounting would be to simply call `AtomicLong.getAndAdd()` with new requests and `AtomicLong.addAndGet()` for decrementing based on how many elements were emitted.
68
68
69
-
The problem with this is that the Reactive-Streams specification declares `Long.MAX_VALUE` as the upper bound for outstanding requests (interprets it as the unbounded mode) but adding two large longs may overflow the `long` into a negative value. In addition, if for some reason, there are more values emitted than were requested, the subtraction may yield a negative current request value, causing crashes or hangs.
69
+
The problem with this is that the ReactiveStreams specification declares `Long.MAX_VALUE` as the upper bound for outstanding requests (interprets it as the unbounded mode) but adding two large longs may overflow the `long` into a negative value. In addition, if for some reason, there are more values emitted than were requested, the subtraction may yield a negative current request value, causing crashes or hangs.
70
70
71
71
Therefore, both addition and subtraction have to be capped at `Long.MAX_VALUE` and `0` respectively. Since there is no dedicated `AtomicLong` method for it, we have to use a Compare-And-Set loop. (Usually, requesting happens relatively rarely compared to emission amounts thus the lack of dedicated machine code instruction is not a performance bottleneck.)
72
72
@@ -225,7 +225,7 @@ This simplified queue API gets rid of the unused parts (iterator, collections AP
225
225
226
226
## Deferred actions
227
227
228
-
The Reactive-Streams has a strict requirement that calling `onSubscribe()` must happen before any calls to the rest of the `onXXX` methods and by nature, any calls to `Subscription.request()` and `Subscription.cancel()`. The same logic applies to the design of `Observable`, `Single`, `Completable` and `Maybe` with their connection type of `Disposable`.
228
+
The ReactiveStreams has a strict requirement that calling `onSubscribe()` must happen before any calls to the rest of the `onXXX` methods and by nature, any calls to `Subscription.request()` and `Subscription.cancel()`. The same logic applies to the design of `Observable`, `Single`, `Completable` and `Maybe` with their connection type of `Disposable`.
229
229
230
230
Often though, such call to `onSubscribe` may happen later than the respective `cancel()` needs to happen. For example, the user may want to call `cancel()` before the respective `Subscription` actually becomes available in `subscribeOn`. Other operators may need to call `onSubscribe` before they connect to other sources but at that time, there is no direct way for relaying a `cancel` call to an unavailable upstream `Subscription`.
231
231
@@ -286,7 +286,7 @@ The same pattern applies to `Subscription` with its `cancel()` method and with h
286
286
287
287
### Deferred requesting
288
288
289
-
With `Flowable`s (and Reactive-Streams `Publisher`s) the `request()` calls need to be deferred as well. In one form (the simpler one), the respective late `Subscription` will eventually arrive and we need to relay all previous and all subsequent request amount to its `request()` method.
289
+
With `Flowable`s (and ReactiveStreams `Publisher`s) the `request()` calls need to be deferred as well. In one form (the simpler one), the respective late `Subscription` will eventually arrive and we need to relay all previous and all subsequent request amount to its `request()` method.
290
290
291
291
In 1.x, this behavior was implicitly provided by `rx.Subscriber` but at a high cost that had to be payed by all instances whether or not they needed this feature.
292
292
@@ -561,7 +561,7 @@ On the fast path, when we try to leave it, it is possible a concurrent call to `
561
561
562
562
## FlowableSubscriber
563
563
564
-
Version2.0.7 introduced a new interface, `FlowableSubscriber` that extends `Subscriber` from Reactive-Streams. It has the same methods with the same parameter types but different textual rules attached to it, a set of relaxations to the Reactive-Streams specification to enable better performing RxJava internals while still honoring the specification to the letter for non-RxJava consumers of `Flowable`s.
564
+
Version2.0.7 introduced a new interface, `FlowableSubscriber` that extends `Subscriber` from ReactiveStreams. It has the same methods with the same parameter types but different textual rules attached to it, a set of relaxations to the ReactiveStreams specification to enable better performing RxJava internals while still honoring the specification to the letter for non-RxJava consumers of `Flowable`s.
565
565
566
566
The rule relaxations are as follows:
567
567
@@ -588,7 +588,7 @@ The other base reactive consumers, `Observer`, `SingleObserver`, `MaybeObserver`
588
588
589
589
# Backpressure and cancellation
590
590
591
-
Backpressure (or flow control) in Reactive-Streams is the means to tell the upstream how many elements to produce or to tell it to stop producing elements altogether. Unlike the name suggest, there is no physical pressure preventing the upstream from calling `onNext` but the protocol to honor the request amount.
591
+
Backpressure (or flow control) in ReactiveStreams is the means to tell the upstream how many elements to produce or to tell it to stop producing elements altogether. Unlike the name suggest, there is no physical pressure preventing the upstream from calling `onNext` but the protocol to honor the request amount.
592
592
593
593
## Replenishing
594
594
@@ -1425,15 +1425,15 @@ public final class FlowableMyOperator extends Flowable<Integer> {
1425
1425
}
1426
1426
```
1427
1427
1428
-
When taking other reactive types as inputs in these operators, it is recommended one defines the base reactive interfaces instead of the abstract classes, allowing better interoperability between libraries (especially with `Flowable` operators and other Reactive-Streams `Publisher`s). To recap, these are the class-interface pairs:
1428
+
When taking other reactive types as inputs in these operators, it is recommended one defines the base reactive interfaces instead of the abstract classes, allowing better interoperability between libraries (especially with `Flowable` operators and other ReactiveStreams `Publisher`s). To recap, these are the class-interface pairs:
RxJava 2.x locks down `Flowable.subscribe` (and the same methods in the other types) in order to provide runtime hooks into the various flows, therefore, implementors are given the `subscribeActual()` to be overridden. When it is invoked, all relevant hooks and wrappers have been applied. Implementors should avoid throwing unchecked exceptions as the library generally can't deliver it to the respective `Subscriber` due to lifecycle restrictions of the Reactive-Streams specification and sends it to the global error consumer via `RxJavaPlugins.onError`.
1436
+
RxJava 2.x locks down `Flowable.subscribe` (and the same methods in the other types) in order to provide runtime hooks into the various flows, therefore, implementors are given the `subscribeActual()` to be overridden. When it is invoked, all relevant hooks and wrappers have been applied. Implementors should avoid throwing unchecked exceptions as the library generally can't deliver it to the respective `Subscriber` due to lifecycle restrictions of the ReactiveStreams specification and sends it to the global error consumer via `RxJavaPlugins.onError`.
1437
1437
1438
1438
Unlike in 1.x, In the example above, the incoming `Subscriber` is simply used directly for subscribing again (but still at most once) without any kind of wrapping. In 1.x, one needs to call `Subscribers.wrap` to avoid double calls to `onStart` and cause unexpected double initialization or double-requesting.
1439
1439
@@ -1516,7 +1516,7 @@ public final class MyOperator implements FlowableOperator<Integer, Integer> {
1516
1516
1517
1517
You may recognize that implementing operators via extension or lifting looks quite similar. In both cases, one usually implements a `FlowableSubscriber` (`Observer`, etc) that takes a downstream `Subscriber`, implements the business logic in the `onXXX` methods and somehow (manually or as part of `lift()`'s lifecycle) gets subscribed to an upstream source.
1518
1518
1519
-
The benefit of applying the Reactive-Streams design to all base reactive types is that each consumer type is now an interface and can be applied to operators that have to extend some class. This was a pain in 1.x because `Subscriber` and `SingleSubscriber` are classes themselves, plus `Subscriber.request()` is a protected-final method and an operator's `Subscriber` can't implement the `Producer` interface at the same time. In 2.x there is no such problem and one can have both `Subscriber`, `Subscription` or even `Observer` together in the same consumer type.
1519
+
The benefit of applying the ReactiveStreams design to all base reactive types is that each consumer type is now an interface and can be applied to operators that have to extend some class. This was a pain in 1.x because `Subscriber` and `SingleSubscriber` are classes themselves, plus `Subscriber.request()` is a protected-final method and an operator's `Subscriber` can't implement the `Producer` interface at the same time. In 2.x there is no such problem and one can have both `Subscriber`, `Subscription` or even `Observer` together in the same consumer type.
1520
1520
1521
1521
# Operator fusion
1522
1522
@@ -1569,12 +1569,12 @@ This is the level of the **Rx.NET** library (even up to 3.x) that supports compo
1569
1569
This is what **RxJava 1.x** is categorized, it supports composition, backpressure and synchronous cancellation along with the ability to lift an operator into a sequence.
1570
1570
1571
1571
#### Generation 3
1572
-
This is the level of the Reactive-Streams based libraries such as **Reactor 2** and **Akka-Stream**. They are based upon a specification that evolved out of RxJava but left behind its drawbacks (such as the need to return anything from `subscribe()`). This is incompatible with RxJava 1.x and thus 2.x had to be rewritten from scratch.
1572
+
This is the level of the ReactiveStreams based libraries such as **Reactor 2** and **Akka-Stream**. They are based upon a specification that evolved out of RxJava but left behind its drawbacks (such as the need to return anything from `subscribe()`). This is incompatible with RxJava 1.x and thus 2.x had to be rewritten from scratch.
1573
1573
1574
1574
#### Generation 4
1575
-
This level expands upon the Reactive-Streams interfaces with operator-fusion (in a compatible fashion, that is, op-fusion is optional between two stages and works without them). **Reactor 3** and **RxJava 2** are at this level. The material around **Akka-Stream** mentions operator-fusion as well, however, **Akka-Stream** is not a native Reactive-Streams implementation (requires a materializer to get a `Publisher` out) and as such it is only Gen 3.
1575
+
This level expands upon the ReactiveStreams interfaces with operator-fusion (in a compatible fashion, that is, op-fusion is optional between two stages and works without them). **Reactor 3** and **RxJava 2** are at this level. The material around **Akka-Stream** mentions operator-fusion as well, however, **Akka-Stream** is not a native ReactiveStreams implementation (requires a materializer to get a `Publisher` out) and as such it is only Gen 3.
1576
1576
1577
-
There are discussions among the 4th generation library providers to have the elements of operator-fusion standardized in Reactive-Streams 2.0 specification (or in a neighboring extension) and have **RxJava 3** and **Reactor 4** work together on that aspect as well.
1577
+
There are discussions among the 4th generation library providers to have the elements of operator-fusion standardized in ReactiveStreams 2.0 specification (or in a neighboring extension) and have **RxJava 3** and **Reactor 4** work together on that aspect as well.
1578
1578
1579
1579
## Components
1580
1580
@@ -1629,7 +1629,7 @@ The reason for the two separate interfaces is that if a source is constant, like
1629
1629
1630
1630
`Callable` denotes sources, such as `fromCallable` that indicates the single value has to be calculated at runtime of the flow. Bythis logic, you can see that `ScalarCallable` is a `Callable` on its own right because the constant can be "calculated" as late as the runtime phase of the flow.
1631
1631
1632
-
SinceReactive-Streams forbids using `null`s as emission values, we can use `null` in `(Scalar)Callable` marked sources to indicate there is no value to be emitted, thus one can't mistake an user's `null` with the empty indicator `null`.For example, this is how `empty()` is implemented:
1632
+
SinceReactiveStreams forbids using `null`s as emission values, we can use `null` in `(Scalar)Callable` marked sources to indicate there is no value to be emitted, thus one can't mistake an user's `null` with the empty indicator `null`.For example, this is how `empty()` is implemented:
0 commit comments