Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Observable.map unsubscribe question #4242

Closed
davidmoten opened this issue Jul 25, 2016 · 9 comments
Closed

Observable.map unsubscribe question #4242

davidmoten opened this issue Jul 25, 2016 · 9 comments

Comments

@davidmoten
Copy link
Collaborator

davidmoten commented Jul 25, 2016

Just looking at OnSubscribeMap and I noticed a possibly undesirable unsubscribe() call in MapSubscriber (L72):

        @Override
        public void onNext(T t) {
            R result;

            try {
                result = mapper.call(t);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                unsubscribe();
                onError(OnErrorThrowable.addValueAsLastCause(ex, t));
                return;
            }

            actual.onNext(result);
        }

If an exception occurs we eagerly unsubscribe from the source before emitting the error. I'm not sure we have a policy on this yet but my first impression is that a length unsubscribe activity could delay the emission of the error and this might not be expected. I wonder if we should delete this unsubscribe() call?

@davidmoten
Copy link
Collaborator Author

Another question about OnSubscribeMap is the defensive use of done. Is this necessary and can I remove it (and thus reduce allocation)? I know the boolean is only one bit but requires a byte and with padding can be up to 8 bytes.

        @Override
        public void onError(Throwable e) {
            if (done) {
                RxJavaHooks.onError(e);
                return;
            }
            done = true;

            actual.onError(e);
        }


        @Override
        public void onCompleted() {
            if (done) {
                return;
            }
            actual.onCompleted();
        }

@akarnokd
Copy link
Member

It has to tell the upstream to stop emitting. Most unsubscribe is simple enough that this doesn't take that long.

The defensive done is required as some sources don't expect unsubscription before sending out the terminal event and would cause double termination in many cases.

Don't change anything in map.

@davidmoten
Copy link
Collaborator Author

It has to tell the upstream to stop emitting. Most unsubscribe is simple enough that this doesn't take that long.

The defensive done is required as some sources don't expect unsubscription before sending out the terminal event and would cause double termination in many cases.

Don't change anything in map.

Righto. As a general rule then is it fair to say when an operator maps an onNext emission to a terminal event that we should follow this pattern (unsubscribe eagerly and use defensive done flag)?

@akarnokd
Copy link
Member

Yes. I found a bunch of operators violating this rule before (around the same time map and filter were fixed) but there could be others.

@davidmoten
Copy link
Collaborator Author

I thought I'd add this rule to the https://github.com/ReactiveX/RxJava/wiki/Implementing-custom-operators-(draft) page but I couldn't edit it. I was thinking of adding just above Further Reading.

When an operator maps an onNext emission to a terminal event then before calling the terminal event it should unsubscribe the subscriber to upstream (usually called the parent). In addition, because upstream may (legally) do something like this:

child.onNext(blah);
//  no check for unsubscribed here
child.onCompleted();

we should ensure that the operator complies with the Observable contract and only emits one terminal event so we use a defensive done flag:

boolean done = false;

@Override 
public void onError(Throwable e) {
    if (done) {
        return;
   }
   done = true;
   ...
}

@Override 
public void onCompleted(Throwable e) {
    if (done) {
        return;
   }
   done = true;
   ...
}

An example of this pattern is seen in OnSubscribeMap.

Would you like to add this info the wiki page @akarnokd?

@akarnokd
Copy link
Member

Done.

@davidmoten
Copy link
Collaborator Author

Thanks. I'll have a look for operators not complying with this.

@DavidMGross
Copy link
Collaborator

You might consider instead adding this information to the page at

http://reactivex.io/documentation/implement-operator.html
("Implementing Your Own Operators")

as most of the RxJava operator-oriented documentation has moved to the
reactivex.io site.

On Tue, Jul 26, 2016 at 2:02 PM, Dave Moten notifications@github.com
wrote:

Closed #4242 #4242.


You are receiving this because you are subscribed to this thread.
Reply to this email directly, view it on GitHub
#4242 (comment), or mute
the thread
https://github.com/notifications/unsubscribe-auth/AESHoLGkvnfudKMCu9u0-7yQbwC8RKZVks5qZnXPgaJpZM4JUm1d
.

David M. Gross
PLP Consulting

@davidmoten
Copy link
Collaborator Author

Good idea @DavidMGross, I'll make a PR

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants