diff --git a/src/main/java/io/reactivex/rxjava3/processors/BehaviorProcessor.java b/src/main/java/io/reactivex/rxjava3/processors/BehaviorProcessor.java
index ab4c517e65..ea93669ca3 100644
--- a/src/main/java/io/reactivex/rxjava3/processors/BehaviorProcessor.java
+++ b/src/main/java/io/reactivex/rxjava3/processors/BehaviorProcessor.java
@@ -234,11 +234,11 @@ public static <T> BehaviorProcessor<T> create() {
      */
     BehaviorProcessor(T defaultValue) {
         this();
-        this.value.lazySet(Objects.requireNonNull(defaultValue, "defaultValue is null"));
+        this.value.lazySet(defaultValue);
     }
 
     @Override
-    protected void subscribeActual(Subscriber<? super T> s) {
+    protected void subscribeActual(@NonNull Subscriber<? super T> s) {
         BehaviorSubscription<T> bs = new BehaviorSubscription<>(s, this);
         s.onSubscribe(bs);
         if (add(bs)) {
@@ -258,7 +258,7 @@ protected void subscribeActual(Subscriber<? super T> s) {
     }
 
     @Override
-    public void onSubscribe(Subscription s) {
+    public void onSubscribe(@NonNull Subscription s) {
         if (terminalEvent.get() != null) {
             s.cancel();
             return;
@@ -267,7 +267,7 @@ public void onSubscribe(Subscription s) {
     }
 
     @Override
-    public void onNext(T t) {
+    public void onNext(@NonNull T t) {
         ExceptionHelper.nullCheck(t, "onNext called with a null value.");
 
         if (terminalEvent.get() != null) {
@@ -281,7 +281,7 @@ public void onNext(T t) {
     }
 
     @Override
-    public void onError(Throwable t) {
+    public void onError(@NonNull Throwable t) {
         ExceptionHelper.nullCheck(t, "onError called with a null Throwable.");
         if (!terminalEvent.compareAndSet(null, t)) {
             RxJavaPlugins.onError(t);
@@ -316,14 +316,13 @@ public void onComplete() {
      * <p>History: 2.0.8 - experimental
      * @param t the item to emit, not null
      * @return true if the item was emitted to all Subscribers
+     * @throws NullPointerException if {@code t} is {@code null}
      * @since 2.2
      */
     @CheckReturnValue
-    public boolean offer(T t) {
-        if (t == null) {
-            onError(ExceptionHelper.createNullPointerException("offer called with a null value."));
-            return true;
-        }
+    public boolean offer(@NonNull T t) {
+        ExceptionHelper.nullCheck(t, "offer called with a null value.");
+
         BehaviorSubscription<T>[] array = subscribers.get();
 
         for (BehaviorSubscription<T> s : array) {
diff --git a/src/main/java/io/reactivex/rxjava3/processors/MulticastProcessor.java b/src/main/java/io/reactivex/rxjava3/processors/MulticastProcessor.java
index d258ba1718..ea300afe34 100644
--- a/src/main/java/io/reactivex/rxjava3/processors/MulticastProcessor.java
+++ b/src/main/java/io/reactivex/rxjava3/processors/MulticastProcessor.java
@@ -258,7 +258,7 @@ public void startUnbounded() {
     }
 
     @Override
-    public void onSubscribe(Subscription s) {
+    public void onSubscribe(@NonNull Subscription s) {
         if (SubscriptionHelper.setOnce(upstream, s)) {
             if (s instanceof QueueSubscription) {
                 @SuppressWarnings("unchecked")
@@ -288,7 +288,7 @@ public void onSubscribe(Subscription s) {
     }
 
     @Override
-    public void onNext(T t) {
+    public void onNext(@NonNull T t) {
         if (once.get()) {
             return;
         }
@@ -306,26 +306,29 @@ public void onNext(T t) {
     /**
      * Tries to offer an item into the internal queue and returns false
      * if the queue is full.
-     * @param t the item to offer, not null
+     * @param t the item to offer, not {@code null}
      * @return true if successful, false if the queue is full
+     * @throws NullPointerException if {@code t} is {@code null}
+     * @throws IllegalStateException if the processor is in fusion mode
      */
     @CheckReturnValue
-    public boolean offer(T t) {
+    public boolean offer(@NonNull T t) {
+        ExceptionHelper.nullCheck(t, "offer called with a null value.");
         if (once.get()) {
             return false;
         }
-        ExceptionHelper.nullCheck(t, "offer called with a null value.");
         if (fusionMode == QueueSubscription.NONE) {
             if (queue.offer(t)) {
                 drain();
                 return true;
             }
+            return false;
         }
-        return false;
+        throw new IllegalStateException("offer() should not be called in fusion mode!");
     }
 
     @Override
-    public void onError(Throwable t) {
+    public void onError(@NonNull Throwable t) {
         ExceptionHelper.nullCheck(t, "onError called with a null Throwable.");
         if (once.compareAndSet(false, true)) {
             error = t;
@@ -369,7 +372,7 @@ public Throwable getThrowable() {
     }
 
     @Override
-    protected void subscribeActual(Subscriber<? super T> s) {
+    protected void subscribeActual(@NonNull Subscriber<? super T> s) {
         MulticastSubscription<T> ms = new MulticastSubscription<>(s, this);
         s.onSubscribe(ms);
         if (add(ms)) {
diff --git a/src/main/java/io/reactivex/rxjava3/processors/PublishProcessor.java b/src/main/java/io/reactivex/rxjava3/processors/PublishProcessor.java
index 36bc4eff34..0a73cf1bf8 100644
--- a/src/main/java/io/reactivex/rxjava3/processors/PublishProcessor.java
+++ b/src/main/java/io/reactivex/rxjava3/processors/PublishProcessor.java
@@ -141,7 +141,7 @@ public static <T> PublishProcessor<T> create() {
     }
 
     @Override
-    protected void subscribeActual(Subscriber<? super T> t) {
+    protected void subscribeActual(@NonNull Subscriber<? super T> t) {
         PublishSubscription<T> ps = new PublishSubscription<>(t, this);
         t.onSubscribe(ps);
         if (add(ps)) {
@@ -226,7 +226,7 @@ void remove(PublishSubscription<T> ps) {
     }
 
     @Override
-    public void onSubscribe(Subscription s) {
+    public void onSubscribe(@NonNull Subscription s) {
         if (subscribers.get() == TERMINATED) {
             s.cancel();
             return;
@@ -236,7 +236,7 @@ public void onSubscribe(Subscription s) {
     }
 
     @Override
-    public void onNext(T t) {
+    public void onNext(@NonNull T t) {
         ExceptionHelper.nullCheck(t, "onNext called with a null value.");
         for (PublishSubscription<T> s : subscribers.get()) {
             s.onNext(t);
@@ -245,7 +245,7 @@ public void onNext(T t) {
 
     @SuppressWarnings("unchecked")
     @Override
-    public void onError(Throwable t) {
+    public void onError(@NonNull Throwable t) {
         ExceptionHelper.nullCheck(t, "onError called with a null Throwable.");
         if (subscribers.get() == TERMINATED) {
             RxJavaPlugins.onError(t);
@@ -281,14 +281,13 @@ public void onComplete() {
      * <p>History: 2.0.8 - experimental
      * @param t the item to emit, not null
      * @return true if the item was emitted to all Subscribers
+     * @throws NullPointerException if {@code t} is {@code null}
      * @since 2.2
      */
     @CheckReturnValue
-    public boolean offer(T t) {
-        if (t == null) {
-            onError(ExceptionHelper.createNullPointerException("offer called with a null value."));
-            return true;
-        }
+    public boolean offer(@NonNull T t) {
+        ExceptionHelper.nullCheck(t, "offer called with a null value.");
+
         PublishSubscription<T>[] array = subscribers.get();
 
         for (PublishSubscription<T> s : array) {
diff --git a/src/test/java/io/reactivex/rxjava3/processors/BehaviorProcessorTest.java b/src/test/java/io/reactivex/rxjava3/processors/BehaviorProcessorTest.java
index 0f04c4baea..d49ba5f565 100644
--- a/src/test/java/io/reactivex/rxjava3/processors/BehaviorProcessorTest.java
+++ b/src/test/java/io/reactivex/rxjava3/processors/BehaviorProcessorTest.java
@@ -673,12 +673,14 @@ public void offer() {
 
         ts = pp.test(1);
 
-        assertTrue(pp.offer(null));
-
-        ts.assertFailure(NullPointerException.class, 2);
+        try {
+            pp.offer(null);
+            fail("Should have thrown NPE!");
+        } catch (NullPointerException expected) {
+            // expected
+        }
 
-        assertTrue(pp.hasThrowable());
-        assertTrue(pp.getThrowable().toString(), pp.getThrowable() instanceof NullPointerException);
+        ts.assertValuesOnly(2);
     }
 
     @Test
diff --git a/src/test/java/io/reactivex/rxjava3/processors/MulticastProcessorTest.java b/src/test/java/io/reactivex/rxjava3/processors/MulticastProcessorTest.java
index 4e71919b09..afa14b8deb 100644
--- a/src/test/java/io/reactivex/rxjava3/processors/MulticastProcessorTest.java
+++ b/src/test/java/io/reactivex/rxjava3/processors/MulticastProcessorTest.java
@@ -466,7 +466,12 @@ public void asyncFused() {
             up.onNext(i);
         }
 
-        assertFalse(mp.offer(10));
+        try {
+            mp.offer(10);
+            fail("Should have thrown IllegalStateException");
+        } catch (IllegalStateException expected) {
+            // expected
+        }
 
         up.onComplete();
 
diff --git a/src/test/java/io/reactivex/rxjava3/processors/PublishProcessorTest.java b/src/test/java/io/reactivex/rxjava3/processors/PublishProcessorTest.java
index 277ac40c78..7c413d9dc0 100644
--- a/src/test/java/io/reactivex/rxjava3/processors/PublishProcessorTest.java
+++ b/src/test/java/io/reactivex/rxjava3/processors/PublishProcessorTest.java
@@ -597,12 +597,14 @@ public void offer() {
 
         ts = pp.test(0);
 
-        assertTrue(pp.offer(null));
-
-        ts.assertFailure(NullPointerException.class);
+        try {
+            pp.offer(null);
+            fail("Should have thrown NPE!");
+        } catch (NullPointerException expected) {
+            // expected
+        }
 
-        assertTrue(pp.hasThrowable());
-        assertTrue(pp.getThrowable().toString(), pp.getThrowable() instanceof NullPointerException);
+        ts.assertEmpty();
     }
 
     @Test