Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

observeOn.concatMap may not invoke the mapper function on the desired thread #6447

Closed
akarnokd opened this issue Mar 28, 2019 · 2 comments
Closed

Comments

@akarnokd
Copy link
Member

akarnokd commented Mar 28, 2019

There are two effects in play here: fusion and trampolining.

Fusion will take the observeOn queue and just pull on it when the subscription happens. Trampolining will use the last interacting thread (the subscription thread or the termination thread) to pull on the internal queue (dedicated or fused) and when there is an item, it will run the mapper on that thread.

Flowable.range(1, 5)
    .observeOn(Schedulers.computation())
    .concatMap(v -> Flowable.just(Thread.currentThread().toString()))
    .blockingSubscribe(System.out::println);

This will likely print computation, main, main, main, main.

Workarounds:

  • Use hide() between observeOn and concatMap to break fusion.
  • Use subscribeOn after concatMap to move the trampoline off the main thread.
  • Use defer+subscribeOn in the mapper function to calculate the actual Flowable on a desired thread, not the mapper thread.

Appeared on StackOverflow.

@kojilin
Copy link
Contributor

kojilin commented Mar 29, 2019

Also concatMapSingle, concatMapMaybe and concatMapCompletable have similar issue.

@akarnokd
Copy link
Member Author

Closing via #6538.

@kojilin If you want, you can post PRs.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants