Skip to content

Using publish/share/etc leads to upstream operator running on unexpected scheduler #6144

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
damianw opened this issue Aug 7, 2018 · 5 comments · Fixed by #6145
Closed

Using publish/share/etc leads to upstream operator running on unexpected scheduler #6144

damianw opened this issue Aug 7, 2018 · 5 comments · Fixed by #6145

Comments

@damianw
Copy link

damianw commented Aug 7, 2018

There appears to be a bug, or at the very least an undocumented and unexpected case, where FlowablePublish causes upstream operators to be applied on the subscription scheduler rather than the upstream observation scheduler (due to polling). This is reproducible at least in 2.1.x and 2.2.0.

Sample (Kotlin)

    val scheduler1 = Schedulers.from(Executors.newSingleThreadExecutor { runnable ->
        Thread(runnable, "scheduler1")
    })

    val scheduler2 = Schedulers.from(Executors.newSingleThreadExecutor { runnable ->
        Thread(runnable, "scheduler2")
    })

    val flowable: Flowable<Long> = Flowable.interval(1L, TimeUnit.MILLISECONDS)
            .onBackpressureLatest()
            .take(10_000)
            .observeOn(scheduler1)
            .map {
                val threadName = Thread.currentThread().name
                if (threadName != "scheduler1") {
                    throw AssertionError("Wrong thread: $threadName")
                }
                it
            }
            .share()

    flowable
            .observeOn(scheduler2)
            .doOnNext { Thread.sleep(10L) }
            .subscribe {
                println("onNext on ${Thread.currentThread().name}: $it")
            }

Expected Behavior

It should be expected that the .map operator is applied on the scheduler1 scheduler since it is the scheduler supplied to observeOn immediately upstream.

Actual Behavior

After about ~100 emissions, the map operator is applied on the scheduler2 scheduler. I'm not an expert in the details, but it seems to have something to do with backpressure, buffering, and polling for items upon subscription?

Output

onNext on scheduler2: 0
onNext on scheduler2: 1
onNext on scheduler2: 2
onNext on scheduler2: 3
onNext on scheduler2: 4
onNext on scheduler2: 5
onNext on scheduler2: 6
onNext on scheduler2: 7
onNext on scheduler2: 8
onNext on scheduler2: 9
onNext on scheduler2: 10
onNext on scheduler2: 11
onNext on scheduler2: 12
onNext on scheduler2: 13
onNext on scheduler2: 14
onNext on scheduler2: 15
onNext on scheduler2: 16
onNext on scheduler2: 17
onNext on scheduler2: 18
onNext on scheduler2: 19
onNext on scheduler2: 20
onNext on scheduler2: 21
onNext on scheduler2: 22
onNext on scheduler2: 23
onNext on scheduler2: 24
onNext on scheduler2: 25
onNext on scheduler2: 26
onNext on scheduler2: 27
onNext on scheduler2: 28
onNext on scheduler2: 29
onNext on scheduler2: 30
onNext on scheduler2: 31
onNext on scheduler2: 32
onNext on scheduler2: 33
onNext on scheduler2: 34
onNext on scheduler2: 35
onNext on scheduler2: 36
onNext on scheduler2: 37
onNext on scheduler2: 38
onNext on scheduler2: 39
onNext on scheduler2: 40
onNext on scheduler2: 41
onNext on scheduler2: 42
onNext on scheduler2: 43
onNext on scheduler2: 44
onNext on scheduler2: 45
onNext on scheduler2: 46
onNext on scheduler2: 47
onNext on scheduler2: 48
onNext on scheduler2: 49
onNext on scheduler2: 50
onNext on scheduler2: 51
onNext on scheduler2: 52
onNext on scheduler2: 53
onNext on scheduler2: 54
onNext on scheduler2: 55
onNext on scheduler2: 56
onNext on scheduler2: 57
onNext on scheduler2: 58
onNext on scheduler2: 59
onNext on scheduler2: 60
onNext on scheduler2: 61
onNext on scheduler2: 62
onNext on scheduler2: 63
onNext on scheduler2: 64
onNext on scheduler2: 65
onNext on scheduler2: 66
onNext on scheduler2: 67
onNext on scheduler2: 68
onNext on scheduler2: 69
onNext on scheduler2: 70
onNext on scheduler2: 71
onNext on scheduler2: 72
onNext on scheduler2: 73
onNext on scheduler2: 74
onNext on scheduler2: 75
onNext on scheduler2: 76
onNext on scheduler2: 77
onNext on scheduler2: 78
onNext on scheduler2: 79
onNext on scheduler2: 80
onNext on scheduler2: 81
onNext on scheduler2: 82
onNext on scheduler2: 83
onNext on scheduler2: 84
onNext on scheduler2: 85
onNext on scheduler2: 86
onNext on scheduler2: 87
onNext on scheduler2: 88
onNext on scheduler2: 89
onNext on scheduler2: 90
onNext on scheduler2: 91
onNext on scheduler2: 92
onNext on scheduler2: 93
onNext on scheduler2: 94
onNext on scheduler2: 95
io.reactivex.exceptions.OnErrorNotImplementedException: Wrong thread: scheduler2
	at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:704)
	at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:701)
	at io.reactivex.internal.subscribers.LambdaSubscriber.onError(LambdaSubscriber.java:79)
	at io.reactivex.internal.operators.flowable.FlowableDoOnEach$DoOnEachSubscriber.onError(FlowableDoOnEach.java:111)
	at io.reactivex.internal.operators.flowable.FlowableObserveOn$BaseObserveOnSubscriber.checkTerminated(FlowableObserveOn.java:207)
	at io.reactivex.internal.operators.flowable.FlowableObserveOn$ObserveOnSubscriber.runAsync(FlowableObserveOn.java:392)
	at io.reactivex.internal.operators.flowable.FlowableObserveOn$BaseObserveOnSubscriber.run(FlowableObserveOn.java:176)
	at io.reactivex.internal.schedulers.ExecutorScheduler$ExecutorWorker$BooleanRunnable.run(ExecutorScheduler.java:261)
	at io.reactivex.internal.schedulers.ExecutorScheduler$ExecutorWorker.run(ExecutorScheduler.java:226)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.AssertionError: Wrong thread: scheduler2
	at RxTestKt$main$flowable$1.apply(RxTest.kt:22)
	at RxTestKt$main$flowable$1.apply(RxTest.kt)
	at io.reactivex.internal.operators.flowable.FlowableMap$MapSubscriber.poll(FlowableMap.java:81)
	at io.reactivex.internal.operators.flowable.FlowablePublish$PublishSubscriber.dispatch(FlowablePublish.java:510)
	at io.reactivex.internal.operators.flowable.FlowablePublish$InnerSubscriber.request(FlowablePublish.java:615)
	at io.reactivex.internal.operators.flowable.FlowableRefCount$RefCountSubscriber.request(FlowableRefCount.java:216)
	at io.reactivex.internal.operators.flowable.FlowableObserveOn$ObserveOnSubscriber.runAsync(FlowableObserveOn.java:407)
	... 6 more
Exception in thread "scheduler2" io.reactivex.exceptions.OnErrorNotImplementedException: Wrong thread: scheduler2
	at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:704)
	at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:701)
	at io.reactivex.internal.subscribers.LambdaSubscriber.onError(LambdaSubscriber.java:79)
	at io.reactivex.internal.operators.flowable.FlowableDoOnEach$DoOnEachSubscriber.onError(FlowableDoOnEach.java:111)
	at io.reactivex.internal.operators.flowable.FlowableObserveOn$BaseObserveOnSubscriber.checkTerminated(FlowableObserveOn.java:207)
	at io.reactivex.internal.operators.flowable.FlowableObserveOn$ObserveOnSubscriber.runAsync(FlowableObserveOn.java:392)
	at io.reactivex.internal.operators.flowable.FlowableObserveOn$BaseObserveOnSubscriber.run(FlowableObserveOn.java:176)
	at io.reactivex.internal.schedulers.ExecutorScheduler$ExecutorWorker$BooleanRunnable.run(ExecutorScheduler.java:261)
	at io.reactivex.internal.schedulers.ExecutorScheduler$ExecutorWorker.run(ExecutorScheduler.java:226)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.AssertionError: Wrong thread: scheduler2
	at RxTestKt$main$flowable$1.apply(RxTest.kt:22)
	at RxTestKt$main$flowable$1.apply(RxTest.kt)
	at io.reactivex.internal.operators.flowable.FlowableMap$MapSubscriber.poll(FlowableMap.java:81)
	at io.reactivex.internal.operators.flowable.FlowablePublish$PublishSubscriber.dispatch(FlowablePublish.java:510)
	at io.reactivex.internal.operators.flowable.FlowablePublish$InnerSubscriber.request(FlowablePublish.java:615)
	at io.reactivex.internal.operators.flowable.FlowableRefCount$RefCountSubscriber.request(FlowableRefCount.java:216)
	at io.reactivex.internal.operators.flowable.FlowableObserveOn$ObserveOnSubscriber.runAsync(FlowableObserveOn.java:407)
	... 6 more
@akarnokd akarnokd added this to the 2.2 backlog milestone Aug 7, 2018
@akarnokd
Copy link
Member

akarnokd commented Aug 7, 2018

This a bug due to operator fusion breaking the thread-confinement expected after observeOn. You can work around it by applying hide() before share(). I'll post a fix shortly.

@damianw
Copy link
Author

damianw commented Aug 13, 2018

@akarnokd I've noticed that .hide() does not always solve the problem - adding a second map (and/or probably other operators) after the share causes the problem to reappear:

    val scheduler1 = Schedulers.from(Executors.newSingleThreadExecutor { runnable ->
        Thread(runnable, "scheduler1")
    })

    val scheduler2 = Schedulers.from(Executors.newSingleThreadExecutor { runnable ->
        Thread(runnable, "scheduler2")
    })

    val flowable: Flowable<Long> = Flowable.interval(1L, TimeUnit.MILLISECONDS)
            .onBackpressureLatest()
            .take(10_000)
            .observeOn(scheduler1)
            .map {
                val threadName = Thread.currentThread().name
                if (threadName != "scheduler1") {
                    throw AssertionError("Wrong thread on map #1: $threadName")
                }
                it
            }
            .hide()
            .share()
            .map {
                val threadName = Thread.currentThread().name
                if (threadName != "scheduler1") {
                    throw AssertionError("Wrong thread on map #2: $threadName")
                }
                it
            }

    flowable
            .observeOn(scheduler2)
            .doOnNext { Thread.sleep(10L) }
            .subscribe {
                println("onNext on ${Thread.currentThread().name}: $it")
            }

Output:

onNext on scheduler2: 0
onNext on scheduler2: 1
...
onNext on scheduler2: 94
onNext on scheduler2: 95
io.reactivex.exceptions.OnErrorNotImplementedException: Wrong thread on map #2: scheduler2
	at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:704)
	at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:701)
	at io.reactivex.internal.subscribers.LambdaSubscriber.onError(LambdaSubscriber.java:79)
	at io.reactivex.internal.operators.flowable.FlowableDoOnEach$DoOnEachSubscriber.onError(FlowableDoOnEach.java:111)
	at io.reactivex.internal.operators.flowable.FlowableObserveOn$BaseObserveOnSubscriber.checkTerminated(FlowableObserveOn.java:207)
	at io.reactivex.internal.operators.flowable.FlowableObserveOn$ObserveOnSubscriber.runAsync(FlowableObserveOn.java:392)
	at io.reactivex.internal.operators.flowable.FlowableObserveOn$BaseObserveOnSubscriber.run(FlowableObserveOn.java:176)
	at io.reactivex.internal.schedulers.ExecutorScheduler$ExecutorWorker$BooleanRunnable.run(ExecutorScheduler.java:261)
	at io.reactivex.internal.schedulers.ExecutorScheduler$ExecutorWorker.run(ExecutorScheduler.java:226)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.AssertionError: Wrong thread on map #2: scheduler2
	at RxTestKt$main$flowable$2.apply(RxTest.kt:31)
	at RxTestKt$main$flowable$2.apply(RxTest.kt)
	at io.reactivex.internal.operators.flowable.FlowableMap$MapSubscriber.onNext(FlowableMap.java:64)
	at io.reactivex.internal.operators.flowable.FlowableRefCount$RefCountSubscriber.onNext(FlowableRefCount.java:193)
	at io.reactivex.internal.operators.flowable.FlowablePublish$PublishSubscriber.dispatch(FlowablePublish.java:545)
	at io.reactivex.internal.operators.flowable.FlowablePublish$InnerSubscriber.request(FlowablePublish.java:615)
	at io.reactivex.internal.operators.flowable.FlowableRefCount$RefCountSubscriber.request(FlowableRefCount.java:216)
	at io.reactivex.internal.subscribers.BasicFuseableSubscriber.request(BasicFuseableSubscriber.java:153)
	at io.reactivex.internal.operators.flowable.FlowableObserveOn$ObserveOnSubscriber.runAsync(FlowableObserveOn.java:407)
	... 6 more
Exception in thread "scheduler2" io.reactivex.exceptions.OnErrorNotImplementedException: Wrong thread on map #2: scheduler2
	at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:704)
	at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:701)
	at io.reactivex.internal.subscribers.LambdaSubscriber.onError(LambdaSubscriber.java:79)
	at io.reactivex.internal.operators.flowable.FlowableDoOnEach$DoOnEachSubscriber.onError(FlowableDoOnEach.java:111)
	at io.reactivex.internal.operators.flowable.FlowableObserveOn$BaseObserveOnSubscriber.checkTerminated(FlowableObserveOn.java:207)
	at io.reactivex.internal.operators.flowable.FlowableObserveOn$ObserveOnSubscriber.runAsync(FlowableObserveOn.java:392)
	at io.reactivex.internal.operators.flowable.FlowableObserveOn$BaseObserveOnSubscriber.run(FlowableObserveOn.java:176)
	at io.reactivex.internal.schedulers.ExecutorScheduler$ExecutorWorker$BooleanRunnable.run(ExecutorScheduler.java:261)
	at io.reactivex.internal.schedulers.ExecutorScheduler$ExecutorWorker.run(ExecutorScheduler.java:226)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.AssertionError: Wrong thread on map #2: scheduler2
	at RxTestKt$main$flowable$2.apply(RxTest.kt:31)
	at RxTestKt$main$flowable$2.apply(RxTest.kt)
	at io.reactivex.internal.operators.flowable.FlowableMap$MapSubscriber.onNext(FlowableMap.java:64)
	at io.reactivex.internal.operators.flowable.FlowableRefCount$RefCountSubscriber.onNext(FlowableRefCount.java:193)
	at io.reactivex.internal.operators.flowable.FlowablePublish$PublishSubscriber.dispatch(FlowablePublish.java:545)
	at io.reactivex.internal.operators.flowable.FlowablePublish$InnerSubscriber.request(FlowablePublish.java:615)
	at io.reactivex.internal.operators.flowable.FlowableRefCount$RefCountSubscriber.request(FlowableRefCount.java:216)
	at io.reactivex.internal.subscribers.BasicFuseableSubscriber.request(BasicFuseableSubscriber.java:153)
	at io.reactivex.internal.operators.flowable.FlowableObserveOn$ObserveOnSubscriber.runAsync(FlowableObserveOn.java:407)
	... 6 more

I have two questions about this:

  1. Does 2.x: Fix boundary fusion of concatMap and publish operator #6145 also fix this situation?
  2. Is there any alternative workaround that can also fix this situation?

@akarnokd
Copy link
Member

Does #6145 also fix this situation?

No, this is a different effect. Due to backpressure and buffering, the publish may service its consumer on a different thread than the generator upstream to it.

Is there any alternative workaround that can also fix this situation?

If you want that map to execute on the desired thread, you have to apply observeOn(scheduler1) before it.

@damianw
Copy link
Author

damianw commented Aug 13, 2018

Thanks for the reply, @akarnokd. That makes sense - I will keep that in mind. Are there any existing mechanisms that allow something like publish to preserve the observation scheduling?

@akarnokd
Copy link
Member

No. It is an async boudary in which case all bets are off. You have to ensure the proper thread via observeOn.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants