Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NPE in SpscLinkedArrayQueue.clear due to concurrent invocation #6673

Closed
akarnokd opened this issue Oct 16, 2019 · 1 comment · Fixed by #6676
Closed

NPE in SpscLinkedArrayQueue.clear due to concurrent invocation #6673

akarnokd opened this issue Oct 16, 2019 · 1 comment · Fixed by #6676
Milestone

Comments

@akarnokd
Copy link
Member

Reported in #5927 (comment)

Fatal Exception: java.lang.NullPointerException: Attempt to invoke virtual method 'java.lang.Object java.util.concurrent.atomic.AtomicReferenceArray.get(int)' on a null object reference
       at io.reactivex.internal.queue.SpscLinkedArrayQueue.lvElement + 251(SpscLinkedArrayQueue.java:251)
       at io.reactivex.internal.queue.SpscLinkedArrayQueue.poll + 138(SpscLinkedArrayQueue.java:138)
       at io.reactivex.internal.queue.SpscLinkedArrayQueue.clear + 186(SpscLinkedArrayQueue.java:186)
       at io.reactivex.internal.operators.flowable.FlowableOnBackpressureBuffer$BackpressureBufferSubscriber.cancel + 154(FlowableOnBackpressureBuffer.java:154)
       at io.reactivex.internal.operators.flowable.FlowableObserveOn$BaseObserveOnSubscriber.cancel + 154(FlowableObserveOn.java:154)
       at io.reactivex.internal.subscribers.BasicFuseableSubscriber.cancel + 158(BasicFuseableSubscriber.java:158)
       at io.reactivex.internal.operators.flowable.FlowableObserveOn$BaseObserveOnSubscriber.cancel + 154(FlowableObserveOn.java:154)
       at io.reactivex.internal.subscribers.BasicFuseableConditionalSubscriber.cancel + 157(BasicFuseableConditionalSubscriber.java:157)
       at io.reactivex.internal.subscribers.BasicFuseableSubscriber.cancel + 158(BasicFuseableSubscriber.java:158)
       at io.reactivex.internal.operators.flowable.FlowableElementAtSingle$ElementAtSubscriber.onNext + 85(FlowableElementAtSingle.java:85)
       at io.reactivex.internal.operators.flowable.FlowableDistinctUntilChanged$DistinctUntilChangedSubscriber.tryOnNext + 101(FlowableDistinctUntilChanged.java:101)
       at io.reactivex.internal.operators.flowable.FlowableMap$MapConditionalSubscriber.tryOnNext + 128(FlowableMap.java:128)
       at io.reactivex.internal.operators.flowable.FlowableObserveOn$ObserveOnConditionalSubscriber.runAsync + 649(FlowableObserveOn.java:649)
       at io.reactivex.internal.operators.flowable.FlowableObserveOn$BaseObserveOnSubscriber.run + 176(FlowableObserveOn.java:176)

We just upgraded our project from 1.x, and did not see these errors in testing, but are seeing them in production. We're working to figure out the exact cause, and will provide more information as we have it.

Here are some additional stack traces, in case you notice any pattern:

Fatal Exception: java.lang.NullPointerException: Attempt to invoke virtual method 'java.lang.Object java.util.concurrent.atomic.AtomicReferenceArray.get(int)' on a null object reference
       at io.reactivex.internal.queue.SpscLinkedArrayQueue.lvElement + 251(SpscLinkedArrayQueue.java:251)
       at io.reactivex.internal.queue.SpscLinkedArrayQueue.newBufferPoll + 155(SpscLinkedArrayQueue.java:155)
       at io.reactivex.internal.queue.SpscLinkedArrayQueue.poll + 145(SpscLinkedArrayQueue.java:145)
       at io.reactivex.internal.queue.SpscLinkedArrayQueue.clear + 186(SpscLinkedArrayQueue.java:186)
       at io.reactivex.internal.operators.flowable.FlowableOnBackpressureBuffer$BackpressureBufferSubscriber.cancel + 154(FlowableOnBackpressureBuffer.java:154)
       at io.reactivex.internal.operators.flowable.FlowableObserveOn$BaseObserveOnSubscriber.cancel + 154(FlowableObserveOn.java:154)
       at io.reactivex.internal.subscribers.BasicFuseableSubscriber.cancel + 158(BasicFuseableSubscriber.java:158)
       at io.reactivex.internal.operators.flowable.FlowableObserveOn$BaseObserveOnSubscriber.cancel + 154(FlowableObserveOn.java:154)
       at io.reactivex.internal.subscribers.BasicFuseableConditionalSubscriber.cancel + 157(BasicFuseableConditionalSubscriber.java:157)
       at io.reactivex.internal.subscribers.BasicFuseableSubscriber.cancel + 158(BasicFuseableSubscriber.java:158)
       at io.reactivex.internal.operators.mixed.FlowableConcatMapMaybe$ConcatMapMaybeSubscriber.cancel + 168(FlowableConcatMapMaybe.java:168)
       at io.reactivex.internal.operators.flowable.FlowableElementAtSingle$ElementAtSubscriber.onNext + 85(FlowableElementAtSingle.java:85)
       at io.reactivex.internal.operators.mixed.FlowableConcatMapMaybe$ConcatMapMaybeSubscriber.drain + 284(FlowableConcatMapMaybe.java:284)
       at io.reactivex.internal.operators.mixed.FlowableConcatMapMaybe$ConcatMapMaybeSubscriber.onNext + 137(FlowableConcatMapMaybe.java:137)
       at io.reactivex.internal.operators.flowable.FlowableDistinctUntilChanged$DistinctUntilChangedSubscriber.tryOnNext + 101(FlowableDistinctUntilChanged.java:101)
       at io.reactivex.internal.operators.flowable.FlowableMap$MapConditionalSubscriber.tryOnNext + 128(FlowableMap.java:128)
       at io.reactivex.internal.operators.flowable.FlowableObserveOn$ObserveOnConditionalSubscriber.runAsync + 649(FlowableObserveOn.java:649)
       at io.reactivex.internal.operators.flowable.FlowableObserveOn$BaseObserveOnSubscriber.run + 176(FlowableObserveOn.java:176)
Fatal Exception: java.lang.NullPointerException: Attempt to invoke virtual method 'java.lang.Object java.util.concurrent.atomic.AtomicReferenceArray.get(int)' on a null object reference
       at io.reactivex.internal.queue.SpscLinkedArrayQueue.lvElement + 251(SpscLinkedArrayQueue.java:251)
       at io.reactivex.internal.queue.SpscLinkedArrayQueue.newBufferPoll + 155(SpscLinkedArrayQueue.java:155)
       at io.reactivex.internal.queue.SpscLinkedArrayQueue.poll + 145(SpscLinkedArrayQueue.java:145)
       at io.reactivex.internal.queue.SpscLinkedArrayQueue.clear + 186(SpscLinkedArrayQueue.java:186)
       at io.reactivex.internal.operators.flowable.FlowableOnBackpressureBuffer$BackpressureBufferSubscriber.clear + 264(FlowableOnBackpressureBuffer.java:264)
       at io.reactivex.internal.operators.flowable.FlowableObserveOn$BaseObserveOnSubscriber.clear + 236(FlowableObserveOn.java:236)
       at io.reactivex.internal.operators.flowable.FlowableObserveOn$BaseObserveOnSubscriber.checkTerminated + 188(FlowableObserveOn.java:188)
       at io.reactivex.internal.operators.flowable.FlowableObserveOn$ObserveOnSubscriber.runAsync + 399(FlowableObserveOn.java:399)
       at io.reactivex.internal.operators.flowable.FlowableObserveOn$BaseObserveOnSubscriber.run + 176(FlowableObserveOn.java:176)
       at io.reactivex.internal.schedulers.ScheduledRunnable.run + 66(ScheduledRunnable.java:66)
       at io.reactivex.internal.schedulers.ScheduledRunnable.call + 57(ScheduledRunnable.java:57)
Fatal Exception: java.lang.NullPointerException: Attempt to invoke virtual method 'java.lang.Object java.util.concurrent.atomic.AtomicReferenceArray.get(int)' on a null object reference
       at io.reactivex.internal.queue.SpscLinkedArrayQueue.lvElement + 251(SpscLinkedArrayQueue.java:251)
       at io.reactivex.internal.queue.SpscLinkedArrayQueue.poll + 138(SpscLinkedArrayQueue.java:138)
       at io.reactivex.internal.queue.SpscLinkedArrayQueue.clear + 186(SpscLinkedArrayQueue.java:186)
       at io.reactivex.internal.operators.flowable.FlowableOnBackpressureBuffer$BackpressureBufferSubscriber.cancel + 154(FlowableOnBackpressureBuffer.java:154)
       at io.reactivex.internal.operators.flowable.FlowableObserveOn$BaseObserveOnSubscriber.cancel + 154(FlowableObserveOn.java:154)
       at io.reactivex.internal.subscribers.BasicFuseableSubscriber.cancel + 158(BasicFuseableSubscriber.java:158)
       at io.reactivex.internal.operators.flowable.FlowableObserveOn$BaseObserveOnSubscriber.cancel + 154(FlowableObserveOn.java:154)
       at io.reactivex.internal.subscribers.BasicFuseableSubscriber.cancel + 158(BasicFuseableSubscriber.java:158)
       at io.reactivex.internal.operators.flowable.FlowableSkipWhile$SkipWhileSubscriber.cancel + 93(FlowableSkipWhile.java:93)
       at io.reactivex.internal.subscribers.BasicFuseableSubscriber.cancel + 158(BasicFuseableSubscriber.java:158)
       at io.reactivex.internal.operators.flowable.FlowableElementAtSingle$ElementAtSubscriber.onNext + 85(FlowableElementAtSingle.java:85)
       at io.reactivex.internal.operators.flowable.FlowableFilter$FilterSubscriber.tryOnNext + 74(FlowableFilter.java:74)
       at io.reactivex.internal.operators.flowable.FlowableFilter$FilterSubscriber.onNext + 52(FlowableFilter.java:52)
       at io.reactivex.internal.operators.flowable.FlowableSkipWhile$SkipWhileSubscriber.onNext + 56(FlowableSkipWhile.java:56)
       at io.reactivex.internal.operators.flowable.FlowableMap$MapSubscriber.onNext + 68(FlowableMap.java:68)
       at io.reactivex.internal.operators.flowable.FlowableObserveOn$ObserveOnSubscriber.runAsync + 407(FlowableObserveOn.java:407)
       at io.reactivex.internal.operators.flowable.FlowableObserveOn$BaseObserveOnSubscriber.run + 176(FlowableObserveOn.java:176)

The full chain for the last stack trace is:

Single.just(...)
  .delaySubscription(
    Completable.amb(
      listOf(
        Completable.timer(...),
        FlowableProcessor
          .map { ... }
          .onBackpressureBuffer()
          .compose { it.observeOn(...) }
          .map { ... }
          .observeOn(...)
          .map { ... }
          .skipWhile { ... }
          .filter { ... }
          .firstOrErrror()
          .ignoreElement()
      )
    )
      .toObservable<Any>()
  )
  .subscribeOn(...)
@DavidDTA
Copy link
Contributor

As far as I can tell, the chain for the second stack trace is:

FlowableProcessor
  .map { ... }
  .onBackpressureBuffer()
  .compose { it.observeOn(...) }
  .map { ... }
  .observeOn(...)
  .map { ... }
  .distinctUntilChanged()
  .concatMapMaybe { ... }
  .firstOrError()

which is a lot simpler.

In both cases, the FlowableProcessor is subscribed to FlowableProcessor.onBackpressureBuffer().scan { ... }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants