-
Notifications
You must be signed in to change notification settings - Fork 7.6k
Using publish/share/etc leads to upstream operator running on unexpected scheduler #6144
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
This a bug due to operator fusion breaking the thread-confinement expected after |
@akarnokd I've noticed that val scheduler1 = Schedulers.from(Executors.newSingleThreadExecutor { runnable ->
Thread(runnable, "scheduler1")
})
val scheduler2 = Schedulers.from(Executors.newSingleThreadExecutor { runnable ->
Thread(runnable, "scheduler2")
})
val flowable: Flowable<Long> = Flowable.interval(1L, TimeUnit.MILLISECONDS)
.onBackpressureLatest()
.take(10_000)
.observeOn(scheduler1)
.map {
val threadName = Thread.currentThread().name
if (threadName != "scheduler1") {
throw AssertionError("Wrong thread on map #1: $threadName")
}
it
}
.hide()
.share()
.map {
val threadName = Thread.currentThread().name
if (threadName != "scheduler1") {
throw AssertionError("Wrong thread on map #2: $threadName")
}
it
}
flowable
.observeOn(scheduler2)
.doOnNext { Thread.sleep(10L) }
.subscribe {
println("onNext on ${Thread.currentThread().name}: $it")
} Output:
I have two questions about this:
|
No, this is a different effect. Due to backpressure and buffering, the
If you want that |
Thanks for the reply, @akarnokd. That makes sense - I will keep that in mind. Are there any existing mechanisms that allow something like |
No. It is an async boudary in which case all bets are off. You have to ensure the proper thread via observeOn. |
There appears to be a bug, or at the very least an undocumented and unexpected case, where
FlowablePublish
causes upstream operators to be applied on the subscription scheduler rather than the upstream observation scheduler (due topoll
ing). This is reproducible at least in2.1.x
and2.2.0
.Sample (Kotlin)
Expected Behavior
It should be expected that the
.map
operator is applied on thescheduler1
scheduler since it is the scheduler supplied toobserveOn
immediately upstream.Actual Behavior
After about ~100 emissions, the
map
operator is applied on thescheduler2
scheduler. I'm not an expert in the details, but it seems to have something to do with backpressure, buffering, and polling for items upon subscription?Output
The text was updated successfully, but these errors were encountered: