diff --git a/.github/workflows/gradle-wrapper-validation.yml b/.github/workflows/gradle-wrapper-validation.yml
index 44e5c1a692..3e905f1c92 100644
--- a/.github/workflows/gradle-wrapper-validation.yml
+++ b/.github/workflows/gradle-wrapper-validation.yml
@@ -9,5 +9,5 @@ jobs:
name: "Validation"
runs-on: ubuntu-latest
steps:
- - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
+ - uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1
- uses: gradle/wrapper-validation-action@f9c9c575b8b21b6485636a91ffecd10e558c62f6 # v3.5.0
diff --git a/.github/workflows/gradle_branch.yml b/.github/workflows/gradle_branch.yml
index 650cbe7c05..cb7231a7bf 100644
--- a/.github/workflows/gradle_branch.yml
+++ b/.github/workflows/gradle_branch.yml
@@ -15,14 +15,14 @@ jobs:
runs-on: ubuntu-latest
steps:
- - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- - name: Set up JDK 8
- uses: actions/setup-java@c5195efecf7bdfc987ee8bae7a71cb8b11521c00 # v4.7.1
+ - uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1
+ - name: Set up JDK 11
+ uses: actions/setup-java@f2beeb24e141e01a676f977032f5a29d81c9e27e # v5.1.0
with:
distribution: 'zulu'
- java-version: '8'
+ java-version: '11'
- name: Cache Gradle packages
- uses: actions/cache@5a3ec84eff668545956fd18022155c47e93e2684 # v4.2.3
+ uses: actions/cache@9255dc7a253b0ccc959486e2bca901246202afeb # v5.0.1
with:
path: ~/.gradle/caches
key: ${{ runner.os }}-gradle-${{ secrets.CACHE_VERSION }}-${{ hashFiles('**/*.gradle') }}
@@ -32,6 +32,6 @@ jobs:
- name: Build RxJava
run: ./gradlew build --stacktrace
- name: Upload to Codecov
- uses: codecov/codecov-action@b9fd7d16f6d7d1b5d2bec1a2887e65ceed900238 # v4.6.0
+ uses: codecov/codecov-action@671740ac38dd9b0130fbe1cec585b89eea48d3de # v5.5.2
- name: Generate Javadoc
run: ./gradlew javadoc --stacktrace
diff --git a/.github/workflows/gradle_jdk11.yml b/.github/workflows/gradle_jdk11.yml
index 33c781fca9..771a1afe0f 100644
--- a/.github/workflows/gradle_jdk11.yml
+++ b/.github/workflows/gradle_jdk11.yml
@@ -12,19 +12,22 @@ on:
permissions:
contents: read
+env:
+ BUILD_WITH_11: true
+
jobs:
build:
runs-on: ubuntu-latest
steps:
- - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
+ - uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1
- name: Set up JDK 11
- uses: actions/setup-java@c5195efecf7bdfc987ee8bae7a71cb8b11521c00 # v4.7.1
+ uses: actions/setup-java@f2beeb24e141e01a676f977032f5a29d81c9e27e # v5.1.0
with:
distribution: 'zulu'
java-version: '11'
- name: Cache Gradle packages
- uses: actions/cache@5a3ec84eff668545956fd18022155c47e93e2684 # v4.2.3
+ uses: actions/cache@9255dc7a253b0ccc959486e2bca901246202afeb # v5.0.1
with:
path: ~/.gradle/caches
key: ${{ runner.os }}-gradle-1-${{ hashFiles('**/*.gradle') }}
@@ -35,5 +38,5 @@ jobs:
run: ./gradlew -PjavaCompatibility=9 jar
- name: Build RxJava
run: ./gradlew build --stacktrace
- - name: Generate Javadoc
- run: ./gradlew javadoc --stacktrace
+# - name: Generate Javadoc
+# run: ./gradlew javadoc --stacktrace
diff --git a/.github/workflows/gradle_pr.yml b/.github/workflows/gradle_pr.yml
index 712139379f..f76409cd46 100644
--- a/.github/workflows/gradle_pr.yml
+++ b/.github/workflows/gradle_pr.yml
@@ -15,14 +15,14 @@ jobs:
runs-on: ubuntu-latest
steps:
- - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- - name: Set up JDK 8
- uses: actions/setup-java@c5195efecf7bdfc987ee8bae7a71cb8b11521c00 # v4.7.1
+ - uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1
+ - name: Set up JDK 11
+ uses: actions/setup-java@f2beeb24e141e01a676f977032f5a29d81c9e27e # v5.1.0
with:
distribution: 'zulu'
- java-version: '8'
+ java-version: '11'
- name: Cache Gradle packages
- uses: actions/cache@5a3ec84eff668545956fd18022155c47e93e2684 # v4.2.3
+ uses: actions/cache@9255dc7a253b0ccc959486e2bca901246202afeb # v5.0.1
with:
path: ~/.gradle/caches
key: ${{ runner.os }}-gradle-1-${{ hashFiles('**/*.gradle') }}
@@ -32,6 +32,6 @@ jobs:
- name: Build RxJava
run: ./gradlew build --stacktrace
- name: Upload to Codecov
- uses: codecov/codecov-action@b9fd7d16f6d7d1b5d2bec1a2887e65ceed900238 # v4.6.0
+ uses: codecov/codecov-action@671740ac38dd9b0130fbe1cec585b89eea48d3de # v5.5.2
- name: Generate Javadoc
run: ./gradlew javadoc --stacktrace
diff --git a/.github/workflows/gradle_release.yml b/.github/workflows/gradle_release.yml
index a341c27ffc..b08c3fe08e 100644
--- a/.github/workflows/gradle_release.yml
+++ b/.github/workflows/gradle_release.yml
@@ -22,14 +22,14 @@ jobs:
env:
CI_BUILD_NUMBER: ${{ github.run_number }}
steps:
- - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- - name: Set up JDK 8
- uses: actions/setup-java@c5195efecf7bdfc987ee8bae7a71cb8b11521c00 # v4.7.1
+ - uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1
+ - name: Set up JDK 11
+ uses: actions/setup-java@f2beeb24e141e01a676f977032f5a29d81c9e27e # v5.1.0
with:
distribution: 'zulu'
- java-version: '8'
+ java-version: '11'
- name: Cache Gradle packages
- uses: actions/cache@5a3ec84eff668545956fd18022155c47e93e2684 # v4.2.3
+ uses: actions/cache@9255dc7a253b0ccc959486e2bca901246202afeb # v5.0.1
with:
path: ~/.gradle/caches
key: ${{ runner.os }}-gradle-${{ secrets.CACHE_VERSION }}-${{ hashFiles('**/*.gradle') }}
@@ -43,9 +43,18 @@ jobs:
- name: Build RxJava
run: ./gradlew build --stacktrace --no-daemon
- name: Upload to Codecov
- uses: codecov/codecov-action@b9fd7d16f6d7d1b5d2bec1a2887e65ceed900238 # v4.6.0
- - name: Upload release
- run: ./gradlew -PreleaseMode=full publish --no-daemon --no-parallel --stacktrace
+ uses: codecov/codecov-action@671740ac38dd9b0130fbe1cec585b89eea48d3de # v5.5.2
+# - name: Upload release
+# run: ./gradlew -PreleaseMode=full publish --no-daemon --no-parallel --stacktrace
+# env:
+# # Define secrets at https://github.com/ReactiveX/RxJava/settings/secrets/actions
+# # ------------------------------------------------------------------------------
+# ORG_GRADLE_PROJECT_mavenCentralUsername: ${{ secrets.SONATYPE_USER }}
+# ORG_GRADLE_PROJECT_mavenCentralPassword: ${{ secrets.SONATYPE_PASSWORD }}
+# ORG_GRADLE_PROJECT_SIGNING_PRIVATE_KEY: ${{ secrets.SIGNING_PRIVATE_KEY }}
+# ORG_GRADLE_PROJECT_SIGNING_PASSWORD: ${{ secrets.SIGNING_PASSWORD }}
+ - name: Publish release
+ run: ./gradlew -PreleaseMode=full publishAndReleaseToMavenCentral --no-configuration-cache --no-daemon --no-parallel --stacktrace
env:
# Define secrets at https://github.com/ReactiveX/RxJava/settings/secrets/actions
# ------------------------------------------------------------------------------
@@ -53,13 +62,6 @@ jobs:
ORG_GRADLE_PROJECT_mavenCentralPassword: ${{ secrets.SONATYPE_PASSWORD }}
ORG_GRADLE_PROJECT_SIGNING_PRIVATE_KEY: ${{ secrets.SIGNING_PRIVATE_KEY }}
ORG_GRADLE_PROJECT_SIGNING_PASSWORD: ${{ secrets.SIGNING_PASSWORD }}
- - name: Publish release
- run: ./gradlew -PreleaseMode=full closeAndReleaseRepository --no-daemon --no-parallel --stacktrace
- env:
- # Define secrets at https://github.com/ReactiveX/RxJava/settings/secrets/actions
- # ------------------------------------------------------------------------------
- ORG_GRADLE_PROJECT_mavenCentralUsername: ${{ secrets.SONATYPE_USER }}
- ORG_GRADLE_PROJECT_mavenCentralPassword: ${{ secrets.SONATYPE_PASSWORD }}
- name: Push Javadoc
run: ./push_javadoc.sh
env:
diff --git a/.github/workflows/gradle_snapshot.yml b/.github/workflows/gradle_snapshot.yml
index 5a66712f73..5dca06b523 100644
--- a/.github/workflows/gradle_snapshot.yml
+++ b/.github/workflows/gradle_snapshot.yml
@@ -21,14 +21,14 @@ jobs:
# ------------------------------------------------------------------------------
CI_BUILD_NUMBER: ${{ github.run_number }}
steps:
- - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- - name: Set up JDK 8
- uses: actions/setup-java@c5195efecf7bdfc987ee8bae7a71cb8b11521c00 # v4.7.1
+ - uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1
+ - name: Set up JDK 11
+ uses: actions/setup-java@f2beeb24e141e01a676f977032f5a29d81c9e27e # v5.1.0
with:
distribution: 'zulu'
- java-version: '8'
+ java-version: '11'
- name: Cache Gradle packages
- uses: actions/cache@5a3ec84eff668545956fd18022155c47e93e2684 # v4.2.3
+ uses: actions/cache@9255dc7a253b0ccc959486e2bca901246202afeb # v5.0.1
with:
path: ~/.gradle/caches
key: ${{ runner.os }}-gradle-${{ secrets.CACHE_VERSION }}-${{ hashFiles('**/*.gradle') }}
@@ -40,14 +40,14 @@ jobs:
- name: Build RxJava
run: ./gradlew build --stacktrace --no-daemon
- name: Upload Snapshot
- run: ./gradlew -PreleaseMode=branch publish --no-daemon --no-parallel --stacktrace
+ run: ./gradlew -PreleaseMode=branch publishAllPublicationsToMavenCentralRepository --no-daemon --no-parallel --stacktrace
env:
# Define secrets at https://github.com/ReactiveX/RxJava/settings/secrets/actions
# ------------------------------------------------------------------------------
ORG_GRADLE_PROJECT_mavenCentralUsername: ${{ secrets.SONATYPE_USER }}
ORG_GRADLE_PROJECT_mavenCentralPassword: ${{ secrets.SONATYPE_PASSWORD }}
- name: Upload to Codecov
- uses: codecov/codecov-action@b9fd7d16f6d7d1b5d2bec1a2887e65ceed900238 # v4.6.0
+ uses: codecov/codecov-action@671740ac38dd9b0130fbe1cec585b89eea48d3de # v5.5.2
- name: Push Javadoc
run: ./push_javadoc.sh
# Define secrets at https://github.com/ReactiveX/RxJava/settings/secrets/actions
diff --git a/.github/workflows/scorecard.yml b/.github/workflows/scorecard.yml
index 2fe557cec1..1ebd21eb28 100644
--- a/.github/workflows/scorecard.yml
+++ b/.github/workflows/scorecard.yml
@@ -24,12 +24,12 @@ jobs:
steps:
- name: "Checkout code"
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
+ uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1
with:
persist-credentials: false
- name: "Run analysis"
- uses: ossf/scorecard-action@f49aabe0b5af0936a0987cfb85d86b75731b0186 # v2.4.1
+ uses: ossf/scorecard-action@4eaacf0543bb3f2c246792bd56e8cdeffafb205a # v2.4.3
with:
results_file: results.sarif
results_format: sarif
@@ -46,7 +46,7 @@ jobs:
# Upload the results as artifacts (optional). Commenting out will disable uploads of run results in SARIF
# format to the repository Actions tab.
- name: "Upload artifact"
- uses: actions/upload-artifact@ea165f8d65b6e75b540449e92b4886f43607fa02 # v4.6.2
+ uses: actions/upload-artifact@b7c566a772e6b6bfb58ed0dc250532a479d7789f # v6.0.0
with:
name: SARIF file
path: results.sarif
@@ -54,6 +54,6 @@ jobs:
# Upload the results to GitHub's code scanning dashboard.
- name: "Upload to code-scanning"
- uses: github/codeql-action/upload-sarif@45775bd8235c68ba998cffa5171334d58593da47 # v3.28.15
+ uses: github/codeql-action/upload-sarif@5d4e8d1aca955e8d8589aabd499c5cae939e33c7 # v3.29.5
with:
sarif_file: results.sarif
diff --git a/README.md b/README.md
index 1456d2b68c..5276c0bd37 100644
--- a/README.md
+++ b/README.md
@@ -2,7 +2,7 @@
[](https://codecov.io/gh/ReactiveX/RxJava/branch/3.x)
-[](https://maven-badges.herokuapp.com/maven-central/io.reactivex.rxjava3/rxjava)
+[](https://maven-badges.sml.io/sonatype-central/io.reactivex.rxjava3/rxjava)
[](https://gitpod.io/#https://github.com/ReactiveX/RxJava)
[](https://securityscorecards.dev/viewer/?uri=github.com/ReactiveX/RxJava)
@@ -48,7 +48,7 @@ The first step is to include RxJava 3 into your project, for example, as a Gradl
implementation "io.reactivex.rxjava3:rxjava:3.x.y"
```
-(Please replace `x` and `y` with the latest version numbers: [](https://maven-badges.herokuapp.com/maven-central/io.reactivex.rxjava3/rxjava)
+(Please replace `x` and `y` with the latest version numbers: [](https://maven-badges.sml.io/sonatype-central/io.reactivex.rxjava3/rxjava)
)
### Hello World
@@ -510,7 +510,7 @@ For further details, consult the [wiki](https://github.com/ReactiveX/RxJava/wiki
- Google Group: [RxJava](http://groups.google.com/d/forum/rxjava)
- Twitter: [@RxJava](http://twitter.com/RxJava)
- [GitHub Issues](https://github.com/ReactiveX/RxJava/issues)
-- StackOverflow: [rx-java](http://stackoverflow.com/questions/tagged/rx-java) and [rx-java2](http://stackoverflow.com/questions/tagged/rx-java2)
+- StackOverflow: [rx-java](http://stackoverflow.com/questions/tagged/rx-java), [rx-java2](http://stackoverflow.com/questions/tagged/rx-java2) and [rx-java3](http://stackoverflow.com/questions/tagged/rx-java3)
- [Gitter.im](https://gitter.im/ReactiveX/RxJava)
## Versioning
@@ -571,11 +571,11 @@ and for Ivy:
### Snapshots
-Snapshots after May 1st, 2021 are available via https://oss.sonatype.org/content/repositories/snapshots/io/reactivex/rxjava3/rxjava/
+Snapshots after May 19st, 2025 are available via https://central.sonatype.com/repository/maven-snapshots/io/reactivex/rxjava3/rxjava/
```groovy
repositories {
- maven { url 'https://oss.sonatype.org/content/repositories/snapshots' }
+ maven { url 'https://central.sonatype.com/repository/maven-snapshots' }
}
dependencies {
@@ -583,7 +583,7 @@ dependencies {
}
```
-JavaDoc snapshots are available at http://reactivex.io/RxJava/3.x/javadoc/snapshot
+JavaDoc snapshots are available at https://reactivex.io/RxJava/3.x/javadoc/snapshot
## Build
diff --git a/build.gradle b/build.gradle
index 39703ec95e..8bcfddb717 100644
--- a/build.gradle
+++ b/build.gradle
@@ -4,12 +4,13 @@ plugins {
id("eclipse")
id("jacoco")
id("maven-publish")
- id("ru.vyarus.animalsniffer") version "2.0.0"
- id("me.champeau.gradle.jmh") version "0.5.3"
+ id("ru.vyarus.animalsniffer") version "2.0.1"
+ id("me.champeau.jmh") version "0.7.3"
id("com.github.hierynomus.license") version "0.16.1"
id("biz.aQute.bnd.builder") version "6.4.0"
- id("com.vanniktech.maven.publish") version "0.19.0"
- id("org.beryx.jar") version "1.2.0"
+ id("com.vanniktech.maven.publish") version "0.33.0"
+ id("org.beryx.jar") version "2.0.0"
+ id("signing")
}
ext {
@@ -18,7 +19,7 @@ ext {
testNgVersion = "7.5"
mockitoVersion = "4.11.0"
jmhLibVersion = "1.21"
- guavaVersion = "33.4.8-jre"
+ guavaVersion = "33.5.0-jre"
}
def releaseTag = System.getenv("BUILD_TAG")
@@ -49,7 +50,16 @@ dependencies {
testImplementation "com.google.guava:guava:$guavaVersion"
}
+def buildWith11 = System.getenv("BUILD_WITH_11")
java {
+ toolchain {
+ vendor = JvmVendorSpec.ADOPTIUM
+ if ("true".equals(buildWith11)) {
+ languageVersion = JavaLanguageVersion.of(11)
+ } else {
+ languageVersion = JavaLanguageVersion.of(8)
+ }
+ }
sourceCompatibility = JavaVersion.VERSION_1_8
targetCompatibility = JavaVersion.VERSION_1_8
}
@@ -86,12 +96,19 @@ animalsniffer {
annotation = "io.reactivex.rxjava3.internal.util.SuppressAnimalSniffer"
}
+moduleConfig {
+ moduleInfoPath = 'src/main/module/module-info.java'
+ multiReleaseVersion = 9
+ version = project.version
+}
+
jar {
from('.') {
include 'LICENSE'
include 'COPYRIGHT'
into('META-INF/')
}
+ exclude("module-info.class")
// Cover for bnd still not supporting MR Jars: https://github.com/bndtools/bnd/issues/2227
bnd('-fixupmessages': '^Classes found in the wrong directory: \\\\{META-INF/versions/9/module-info\\\\.class=module-info}$')
@@ -106,8 +123,6 @@ jar {
"Bundle-SymbolicName": "io.reactivex.rxjava3.rxjava",
"Multi-Release": "true"
)
-
- moduleInfoPath = 'src/main/module/module-info.java'
}
license {
@@ -126,8 +141,8 @@ jmh {
jvmArgsAppend = ["-Djmh.separateClasspathJAR=true"]
if (project.hasProperty("jmh")) {
- include = [".*" + project.jmh + ".*"]
- logger.info("JMH: {}", include)
+ includes = [".*" + project.jmh + ".*"]
+ logger.info("JMH: {}", includes)
}
}
@@ -166,8 +181,9 @@ jacocoTestReport {
dependsOn testNG
reports {
- xml.enabled = true
- html.enabled = true
+ xml.required.set(true)
+ csv.required.set(false)
+ html.required.set(true)
}
}
@@ -179,44 +195,25 @@ checkstyle {
"checkstyle.suppressions.file": project.file("config/checkstyle/suppressions.xml"),
"checkstyle.header.file" : project.file("config/license/HEADER_JAVA")
]
+ checkstyleMain.exclude '**/module-info.java'
}
if (project.hasProperty("releaseMode")) {
logger.lifecycle("ReleaseMode: {}", project.releaseMode)
- /*
- if ("branch" == project.releaseMode) {
-
- if (version.endsWith("-SNAPSHOT")) {
- publishing {
- repositories {
- maven {
- url = "https://s01.oss.sonatype.org/content/repositories/snapshots/"
- }
- }
- }
-
- mavenPublish {
- nexus {
- stagingProfile = "io.reactivex"
- }
- }
- }
- }
- */
if ("full" == project.releaseMode) {
signing {
if (project.hasProperty("SIGNING_PRIVATE_KEY") && project.hasProperty("SIGNING_PASSWORD")) {
useInMemoryPgpKeys(project.getProperty("SIGNING_PRIVATE_KEY"), project.getProperty("SIGNING_PASSWORD"))
+ sign(publishing.publications)
}
}
- /*
- mavenPublish {
- nexus {
- stagingProfile = "io.reactivex"
- }
- }
- */
+ }
+ mavenPublishing {
+ // or when publishing to https://central.sonatype.com/
+ publishToMavenCentral(com.vanniktech.maven.publish.SonatypeHost.CENTRAL_PORTAL)
+
+ // signAllPublications()
}
}
diff --git a/docs/Additional-Reading.md b/docs/Additional-Reading.md
index 85e7d47077..4badd81308 100644
--- a/docs/Additional-Reading.md
+++ b/docs/Additional-Reading.md
@@ -3,7 +3,7 @@ A more complete and up-to-date list of resources can be found at the [reactivex.
# Introducing Reactive Programming
* [Introduction to Rx](http://www.introtorx.com/): a free, on-line book by Lee Campbell **(1.x)**
* [The introduction to Reactive Programming you've been missing](https://gist.github.com/staltz/868e7e9bc2a7b8c1f754) by Andre Staltz
-* [Mastering Observables](http://docs.couchbase.com/developer/java-2.0/observables.html) from the Couchbase documentation **(1.x)**
+* [Mastering Observables](https://docs.huihoo.com/couchbase/developer-guide/java-2.0/observables.html) from the Couchbase documentation **(1.x)**
* [Reactive Programming in Java 8 With RxJava](http://pluralsight.com/training/Courses/TableOfContents/reactive-programming-java-8-rxjava), a course designed by Russell Elledge **(1.x)**
* [33rd Degree Reactive Java](http://www.slideshare.net/tkowalcz/33rd-degree-reactive-java) by Tomasz Kowalczewski **(1.x)**
* [What Every Hipster Should Know About Functional Reactive Programming](http://www.infoq.com/presentations/game-functional-reactive-programming) - Bodil Stokke demos the creation of interactive game mechanics in RxJS
diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties
index c7d437bbb4..3c44eb1b6f 100644
--- a/gradle/wrapper/gradle-wrapper.properties
+++ b/gradle/wrapper/gradle-wrapper.properties
@@ -1,6 +1,6 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
-distributionUrl=https\://services.gradle.org/distributions/gradle-7.6.4-bin.zip
+distributionUrl=https\://services.gradle.org/distributions/gradle-8.14-bin.zip
networkTimeout=10000
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
diff --git a/src/jmh/java/io/reactivex/rxjava3/core/BinaryFlatMapPerf.java b/src/jmh/java/io/reactivex/rxjava3/core/BinaryFlatMapPerf.java
index c4a0a385a5..6e6ae7e3c6 100644
--- a/src/jmh/java/io/reactivex/rxjava3/core/BinaryFlatMapPerf.java
+++ b/src/jmh/java/io/reactivex/rxjava3/core/BinaryFlatMapPerf.java
@@ -139,9 +139,9 @@ public Observable extends Integer> apply(Integer v) {
}
});
- singleFlatMapHideObservable = Single.just(1).flatMapObservable(new Function>() {
+ singleFlatMapHideObservable = Single.just(1).flatMapObservable(new Function>() {
@Override
- public Observable extends Integer> apply(Integer v) {
+ public Observable apply(Integer v) {
return arrayObservableHide;
}
});
@@ -153,16 +153,16 @@ public Iterable extends Integer> apply(Integer v) {
}
});
- maybeFlatMapObservable = Maybe.just(1).flatMapObservable(new Function>() {
+ maybeFlatMapObservable = Maybe.just(1).flatMapObservable(new Function>() {
@Override
- public Observable extends Integer> apply(Integer v) {
+ public Observable apply(Integer v) {
return arrayObservable;
}
});
- maybeFlatMapHideObservable = Maybe.just(1).flatMapObservable(new Function>() {
+ maybeFlatMapHideObservable = Maybe.just(1).flatMapObservable(new Function>() {
@Override
- public Observable extends Integer> apply(Integer v) {
+ public Observable apply(Integer v) {
return arrayObservableHide;
}
});
diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableConcatMapMaybePerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableConcatMapMaybePerf.java
index 87ee5a07e4..4310ea2e95 100644
--- a/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableConcatMapMaybePerf.java
+++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableConcatMapMaybePerf.java
@@ -60,9 +60,9 @@ public Publisher extends Integer> apply(Integer v) {
}
});
- flowableDedicated = source.concatMapMaybe(new Function>() {
+ flowableDedicated = source.concatMapMaybe(new Function>() {
@Override
- public Maybe extends Integer> apply(Integer v) {
+ public Maybe apply(Integer v) {
return Maybe.just(v);
}
});
diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableFlatMapMaybeEmptyPerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableFlatMapMaybeEmptyPerf.java
index 8ab19a00c6..699f76c074 100644
--- a/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableFlatMapMaybeEmptyPerf.java
+++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableFlatMapMaybeEmptyPerf.java
@@ -60,9 +60,9 @@ public Publisher extends Integer> apply(Integer v) {
}
});
- flowableDedicated = source.flatMapMaybe(new Function>() {
+ flowableDedicated = source.flatMapMaybe(new Function>() {
@Override
- public Maybe extends Integer> apply(Integer v) {
+ public Maybe apply(Integer v) {
return Maybe.empty();
}
});
diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableFlatMapMaybePerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableFlatMapMaybePerf.java
index d0f3730b42..f81ed10ec3 100644
--- a/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableFlatMapMaybePerf.java
+++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableFlatMapMaybePerf.java
@@ -60,9 +60,9 @@ public Publisher extends Integer> apply(Integer v) {
}
});
- flowableDedicated = source.flatMapMaybe(new Function>() {
+ flowableDedicated = source.flatMapMaybe(new Function>() {
@Override
- public Maybe extends Integer> apply(Integer v) {
+ public Maybe apply(Integer v) {
return Maybe.just(v);
}
});
diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableFlatMapSinglePerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableFlatMapSinglePerf.java
index 4f50938647..5a92bf20ff 100644
--- a/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableFlatMapSinglePerf.java
+++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableFlatMapSinglePerf.java
@@ -60,9 +60,9 @@ public Publisher extends Integer> apply(Integer v) {
}
});
- flowableDedicated = source.flatMapSingle(new Function>() {
+ flowableDedicated = source.flatMapSingle(new Function>() {
@Override
- public Single extends Integer> apply(Integer v) {
+ public Single apply(Integer v) {
return Single.just(v);
}
});
diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableSwitchMapMaybeEmptyPerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableSwitchMapMaybeEmptyPerf.java
index 83ad00e0f9..46ce694f6d 100644
--- a/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableSwitchMapMaybeEmptyPerf.java
+++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableSwitchMapMaybeEmptyPerf.java
@@ -60,9 +60,9 @@ public Publisher extends Integer> apply(Integer v) {
}
});
- flowableDedicated = source.switchMapMaybe(new Function>() {
+ flowableDedicated = source.switchMapMaybe(new Function>() {
@Override
- public Maybe extends Integer> apply(Integer v) {
+ public Maybe apply(Integer v) {
return Maybe.empty();
}
});
diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableSwitchMapMaybePerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableSwitchMapMaybePerf.java
index e36b49c4d3..e96bbc3919 100644
--- a/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableSwitchMapMaybePerf.java
+++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableSwitchMapMaybePerf.java
@@ -60,9 +60,9 @@ public Publisher extends Integer> apply(Integer v) {
}
});
- flowableDedicated = source.switchMapMaybe(new Function>() {
+ flowableDedicated = source.switchMapMaybe(new Function>() {
@Override
- public Maybe extends Integer> apply(Integer v) {
+ public Maybe apply(Integer v) {
return Maybe.just(v);
}
});
diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableSwitchMapSinglePerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableSwitchMapSinglePerf.java
index 0da6941895..ef06ebfa66 100644
--- a/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableSwitchMapSinglePerf.java
+++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableSwitchMapSinglePerf.java
@@ -60,9 +60,9 @@ public Publisher extends Integer> apply(Integer v) {
}
});
- flowableDedicated = source.switchMapSingle(new Function>() {
+ flowableDedicated = source.switchMapSingle(new Function>() {
@Override
- public Single extends Integer> apply(Integer v) {
+ public Single apply(Integer v) {
return Single.just(v);
}
});
diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableConcatMapCompletablePerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableConcatMapCompletablePerf.java
index 48b20dc005..2229eed77a 100644
--- a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableConcatMapCompletablePerf.java
+++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableConcatMapCompletablePerf.java
@@ -45,16 +45,16 @@ public void setup() {
Observable source = Observable.fromArray(sourceArray);
- observablePlain = source.concatMap(new Function>() {
+ observablePlain = source.concatMap(new Function>() {
@Override
- public Observable extends Integer> apply(Integer v) {
+ public Observable apply(Integer v) {
return Observable.empty();
}
});
- observableConvert = source.concatMap(new Function>() {
+ observableConvert = source.concatMap(new Function>() {
@Override
- public Observable extends Integer> apply(Integer v) {
+ public Observable apply(Integer v) {
return Completable.complete().toObservable();
}
});
diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableConcatMapMaybeEmptyPerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableConcatMapMaybeEmptyPerf.java
index 4528c90b50..cfde5183e5 100644
--- a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableConcatMapMaybeEmptyPerf.java
+++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableConcatMapMaybeEmptyPerf.java
@@ -45,23 +45,23 @@ public void setup() {
Observable source = Observable.fromArray(sourceArray);
- observablePlain = source.concatMap(new Function>() {
+ observablePlain = source.concatMap(new Function>() {
@Override
- public Observable extends Integer> apply(Integer v) {
+ public Observable apply(Integer v) {
return Observable.empty();
}
});
- concatMapToObservableEmpty = source.concatMap(new Function>() {
+ concatMapToObservableEmpty = source.concatMap(new Function>() {
@Override
- public Observable extends Integer> apply(Integer v) {
+ public Observable apply(Integer v) {
return Maybe.empty().toObservable();
}
});
- observableDedicated = source.concatMapMaybe(new Function>() {
+ observableDedicated = source.concatMapMaybe(new Function>() {
@Override
- public Maybe extends Integer> apply(Integer v) {
+ public Maybe apply(Integer v) {
return Maybe.empty();
}
});
diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableConcatMapMaybePerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableConcatMapMaybePerf.java
index 204020abfe..75e7506724 100644
--- a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableConcatMapMaybePerf.java
+++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableConcatMapMaybePerf.java
@@ -45,23 +45,23 @@ public void setup() {
Observable source = Observable.fromArray(sourceArray);
- observablePlain = source.concatMap(new Function>() {
+ observablePlain = source.concatMap(new Function>() {
@Override
- public Observable extends Integer> apply(Integer v) {
+ public Observable apply(Integer v) {
return Observable.just(v);
}
});
- observableConvert = source.concatMap(new Function>() {
+ observableConvert = source.concatMap(new Function>() {
@Override
- public Observable extends Integer> apply(Integer v) {
+ public Observable apply(Integer v) {
return Maybe.just(v).toObservable();
}
});
- observableDedicated = source.concatMapMaybe(new Function>() {
+ observableDedicated = source.concatMapMaybe(new Function>() {
@Override
- public Maybe extends Integer> apply(Integer v) {
+ public Maybe apply(Integer v) {
return Maybe.just(v);
}
});
diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableConcatMapSinglePerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableConcatMapSinglePerf.java
index e2e34b24f5..4227791222 100644
--- a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableConcatMapSinglePerf.java
+++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableConcatMapSinglePerf.java
@@ -45,23 +45,23 @@ public void setup() {
Observable source = Observable.fromArray(sourceArray);
- observablePlain = source.concatMap(new Function>() {
+ observablePlain = source.concatMap(new Function>() {
@Override
- public Observable extends Integer> apply(Integer v) {
+ public Observable apply(Integer v) {
return Observable.just(v);
}
});
- observableConvert = source.concatMap(new Function>() {
+ observableConvert = source.concatMap(new Function>() {
@Override
- public Observable extends Integer> apply(Integer v) {
+ public Observable apply(Integer v) {
return Single.just(v).toObservable();
}
});
- observableDedicated = source.concatMapSingle(new Function>() {
+ observableDedicated = source.concatMapSingle(new Function>() {
@Override
- public Single extends Integer> apply(Integer v) {
+ public Single apply(Integer v) {
return Single.just(v);
}
});
diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableFlatMapCompletablePerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableFlatMapCompletablePerf.java
index b6daa57eb6..6a916a68f1 100644
--- a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableFlatMapCompletablePerf.java
+++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableFlatMapCompletablePerf.java
@@ -45,16 +45,16 @@ public void setup() {
Observable source = Observable.fromArray(sourceArray);
- observablePlain = source.flatMap(new Function>() {
+ observablePlain = source.flatMap(new Function>() {
@Override
- public Observable extends Integer> apply(Integer v) {
+ public Observable apply(Integer v) {
return Observable.empty();
}
});
- observableConvert = source.flatMap(new Function>() {
+ observableConvert = source.flatMap(new Function>() {
@Override
- public Observable extends Integer> apply(Integer v) {
+ public Observable apply(Integer v) {
return Completable.complete().toObservable();
}
});
diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableFlatMapMaybeEmptyPerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableFlatMapMaybeEmptyPerf.java
index 5d0327fa46..377a8bba93 100644
--- a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableFlatMapMaybeEmptyPerf.java
+++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableFlatMapMaybeEmptyPerf.java
@@ -45,23 +45,23 @@ public void setup() {
Observable source = Observable.fromArray(sourceArray);
- observablePlain = source.flatMap(new Function>() {
+ observablePlain = source.flatMap(new Function>() {
@Override
- public Observable extends Integer> apply(Integer v) {
+ public Observable apply(Integer v) {
return Observable.empty();
}
});
- observableConvert = source.flatMap(new Function>() {
+ observableConvert = source.flatMap(new Function>() {
@Override
- public Observable extends Integer> apply(Integer v) {
+ public Observable apply(Integer v) {
return Maybe.empty().toObservable();
}
});
- observableDedicated = source.flatMapMaybe(new Function>() {
+ observableDedicated = source.flatMapMaybe(new Function>() {
@Override
- public Maybe extends Integer> apply(Integer v) {
+ public Maybe apply(Integer v) {
return Maybe.empty();
}
});
diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableFlatMapMaybePerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableFlatMapMaybePerf.java
index e2a7c43bea..248ca98112 100644
--- a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableFlatMapMaybePerf.java
+++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableFlatMapMaybePerf.java
@@ -45,23 +45,23 @@ public void setup() {
Observable source = Observable.fromArray(sourceArray);
- observablePlain = source.flatMap(new Function>() {
+ observablePlain = source.flatMap(new Function>() {
@Override
- public Observable extends Integer> apply(Integer v) {
+ public Observable apply(Integer v) {
return Observable.just(v);
}
});
- observableConvert = source.flatMap(new Function>() {
+ observableConvert = source.flatMap(new Function>() {
@Override
- public Observable extends Integer> apply(Integer v) {
+ public Observable apply(Integer v) {
return Maybe.just(v).toObservable();
}
});
- observableDedicated = source.flatMapMaybe(new Function>() {
+ observableDedicated = source.flatMapMaybe(new Function>() {
@Override
- public Maybe extends Integer> apply(Integer v) {
+ public Maybe apply(Integer v) {
return Maybe.just(v);
}
});
diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableFlatMapSinglePerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableFlatMapSinglePerf.java
index add0cd310c..880da95f5a 100644
--- a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableFlatMapSinglePerf.java
+++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableFlatMapSinglePerf.java
@@ -45,23 +45,23 @@ public void setup() {
Observable source = Observable.fromArray(sourceArray);
- observablePlain = source.flatMap(new Function>() {
+ observablePlain = source.flatMap(new Function>() {
@Override
- public Observable extends Integer> apply(Integer v) {
+ public Observable apply(Integer v) {
return Observable.just(v);
}
});
- observableConvert = source.flatMap(new Function>() {
+ observableConvert = source.flatMap(new Function>() {
@Override
- public Observable extends Integer> apply(Integer v) {
+ public Observable apply(Integer v) {
return Single.just(v).toObservable();
}
});
- observableDedicated = source.flatMapSingle(new Function>() {
+ observableDedicated = source.flatMapSingle(new Function>() {
@Override
- public Single extends Integer> apply(Integer v) {
+ public Single apply(Integer v) {
return Single.just(v);
}
});
diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableSwitchMapCompletablePerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableSwitchMapCompletablePerf.java
index 69b8e71f18..41964c3dbd 100644
--- a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableSwitchMapCompletablePerf.java
+++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableSwitchMapCompletablePerf.java
@@ -45,16 +45,16 @@ public void setup() {
Observable source = Observable.fromArray(sourceArray);
- observablePlain = source.switchMap(new Function>() {
+ observablePlain = source.switchMap(new Function>() {
@Override
- public Observable extends Integer> apply(Integer v) {
+ public Observable apply(Integer v) {
return Observable.empty();
}
});
- observableConvert = source.switchMap(new Function>() {
+ observableConvert = source.switchMap(new Function>() {
@Override
- public Observable extends Integer> apply(Integer v) {
+ public Observable apply(Integer v) {
return Completable.complete().toObservable();
}
});
diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableSwitchMapMaybeEmptyPerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableSwitchMapMaybeEmptyPerf.java
index 3930534eb8..6a4ea5c73b 100644
--- a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableSwitchMapMaybeEmptyPerf.java
+++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableSwitchMapMaybeEmptyPerf.java
@@ -45,23 +45,23 @@ public void setup() {
Observable source = Observable.fromArray(sourceArray);
- observablePlain = source.switchMap(new Function>() {
+ observablePlain = source.switchMap(new Function>() {
@Override
- public Observable extends Integer> apply(Integer v) {
+ public Observable apply(Integer v) {
return Observable.empty();
}
});
- observableConvert = source.switchMap(new Function>() {
+ observableConvert = source.switchMap(new Function>() {
@Override
- public Observable extends Integer> apply(Integer v) {
+ public Observable apply(Integer v) {
return Maybe.empty().toObservable();
}
});
- observableDedicated = source.switchMapMaybe(new Function>() {
+ observableDedicated = source.switchMapMaybe(new Function>() {
@Override
- public Maybe extends Integer> apply(Integer v) {
+ public Maybe apply(Integer v) {
return Maybe.empty();
}
});
diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableSwitchMapMaybePerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableSwitchMapMaybePerf.java
index 30158d012d..f0c3285890 100644
--- a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableSwitchMapMaybePerf.java
+++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableSwitchMapMaybePerf.java
@@ -45,23 +45,23 @@ public void setup() {
Observable source = Observable.fromArray(sourceArray);
- observablePlain = source.switchMap(new Function>() {
+ observablePlain = source.switchMap(new Function>() {
@Override
- public Observable extends Integer> apply(Integer v) {
+ public Observable apply(Integer v) {
return Observable.just(v);
}
});
- observableConvert = source.switchMap(new Function>() {
+ observableConvert = source.switchMap(new Function>() {
@Override
- public Observable extends Integer> apply(Integer v) {
+ public Observable apply(Integer v) {
return Maybe.just(v).toObservable();
}
});
- observableDedicated = source.switchMapMaybe(new Function>() {
+ observableDedicated = source.switchMapMaybe(new Function>() {
@Override
- public Maybe extends Integer> apply(Integer v) {
+ public Maybe apply(Integer v) {
return Maybe.just(v);
}
});
diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableSwitchMapSinglePerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableSwitchMapSinglePerf.java
index 75aeb504f9..087f32c8e3 100644
--- a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableSwitchMapSinglePerf.java
+++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableSwitchMapSinglePerf.java
@@ -45,23 +45,23 @@ public void setup() {
Observable source = Observable.fromArray(sourceArray);
- observablePlain = source.switchMap(new Function>() {
+ observablePlain = source.switchMap(new Function>() {
@Override
- public Observable extends Integer> apply(Integer v) {
+ public Observable apply(Integer v) {
return Observable.just(v);
}
});
- observableConvert = source.switchMap(new Function>() {
+ observableConvert = source.switchMap(new Function>() {
@Override
- public Observable extends Integer> apply(Integer v) {
+ public Observable apply(Integer v) {
return Single.just(v).toObservable();
}
});
- observableDedicated = source.switchMapSingle(new Function>() {
+ observableDedicated = source.switchMapSingle(new Function>() {
@Override
- public Single extends Integer> apply(Integer v) {
+ public Single apply(Integer v) {
return Single.just(v);
}
});
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableDelay.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableDelay.java
index 469d0dd48b..a7de73213a 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableDelay.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableDelay.java
@@ -111,7 +111,9 @@ final class OnNext implements Runnable {
@Override
public void run() {
- downstream.onNext(t);
+ if (!w.isDisposed()) {
+ downstream.onNext(t);
+ }
}
}
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableCache.java b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableCache.java
index daa9edd533..99e11259a6 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableCache.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableCache.java
@@ -24,23 +24,7 @@
*
* @param the source element type
*/
-public final class ObservableCache extends AbstractObservableWithUpstream
-implements Observer {
-
- /**
- * The subscription to the source should happen at most once.
- */
- final AtomicBoolean once;
-
- /**
- * The number of items per cached nodes.
- */
- final int capacityHint;
-
- /**
- * The current known array of observer state to notify.
- */
- final AtomicReference[]> observers;
+public final class ObservableCache extends AbstractObservableWithUpstream {
/**
* A shared instance of an empty array of observers to avoid creating
@@ -56,61 +40,49 @@ public final class ObservableCache extends AbstractObservableWithUpstream head;
-
- /**
- * The current tail of the linked structure holding the items.
- */
- Node tail;
-
- /**
- * How many items have been put into the tail node so far.
+ * The subscription to the source should happen at most once.
*/
- int tailOffset;
+ final AtomicBoolean once;
/**
- * If {@link #observers} is {@link #TERMINATED}, this holds the terminal error if not null.
+ * Responsible caching events from the source and multicasting them to each downstream.
*/
- Throwable error;
+ final Multicaster multicaster;
/**
- * True if the source has terminated.
+ * The first node in a singly linked list. Each node has the capacity to hold a specific number of events, and each
+ * points exclusively to the next node (if present). When a new downstream arrives, the subscription is
+ * initialized with a reference to the "head" node, and any events present in the linked list are replayed. As
+ * events are replayed to the new downstream, its 'node' reference advances through the linked list, discarding each
+ * node reference once all events in that node have been replayed. Consequently, once {@code this} instance goes out
+ * of scope, the prefix of nodes up to the first node that is still being replayed becomes unreachable and eligible
+ * for collection.
*/
- volatile boolean done;
+ final Node head;
/**
* Constructs an empty, non-connected cache.
* @param source the source to subscribe to for the first incoming observer
* @param capacityHint the number of items expected (reduce allocation frequency)
*/
- @SuppressWarnings("unchecked")
public ObservableCache(Observable source, int capacityHint) {
super(source);
- this.capacityHint = capacityHint;
this.once = new AtomicBoolean();
Node n = new Node<>(capacityHint);
this.head = n;
- this.tail = n;
- this.observers = new AtomicReference<>(EMPTY);
+ this.multicaster = new Multicaster<>(capacityHint, n);
}
@Override
protected void subscribeActual(Observer super T> t) {
- CacheDisposable consumer = new CacheDisposable<>(t, this);
+ CacheDisposable consumer = new CacheDisposable<>(t, multicaster, head);
t.onSubscribe(consumer);
- add(consumer);
+ multicaster.add(consumer);
if (!once.get() && once.compareAndSet(false, true)) {
- source.subscribe(this);
+ source.subscribe(multicaster);
} else {
- replay(consumer);
+ multicaster.replay(consumer);
}
}
@@ -127,7 +99,7 @@ protected void subscribeActual(Observer super T> t) {
* @return true if the cache has observers
*/
/* public */ boolean hasObservers() {
- return observers.get().length != 0;
+ return multicaster.get().length != 0;
}
/**
@@ -135,194 +107,241 @@ protected void subscribeActual(Observer super T> t) {
* @return the number of currently cached event count
*/
/* public */ long cachedEventCount() {
- return size;
+ return multicaster.size;
}
- /**
- * Atomically adds the consumer to the {@link #observers} copy-on-write array
- * if the source has not yet terminated.
- * @param consumer the consumer to add
- */
- void add(CacheDisposable consumer) {
- for (;;) {
- CacheDisposable[] current = observers.get();
- if (current == TERMINATED) {
- return;
- }
- int n = current.length;
+ static final class Multicaster extends AtomicReference[]> implements Observer {
- @SuppressWarnings("unchecked")
- CacheDisposable[] next = new CacheDisposable[n + 1];
- System.arraycopy(current, 0, next, 0, n);
- next[n] = consumer;
+ /** */
+ private static final long serialVersionUID = 8514643269016498691L;
- if (observers.compareAndSet(current, next)) {
- return;
- }
- }
- }
+ /**
+ * The number of items per cached nodes.
+ */
+ final int capacityHint;
- /**
- * Atomically removes the consumer from the {@link #observers} copy-on-write array.
- * @param consumer the consumer to remove
- */
- @SuppressWarnings("unchecked")
- void remove(CacheDisposable consumer) {
- for (;;) {
- CacheDisposable[] current = observers.get();
- int n = current.length;
- if (n == 0) {
- return;
- }
+ /**
+ * The total number of elements in the list available for reads.
+ */
+ volatile long size;
- int j = -1;
- for (int i = 0; i < n; i++) {
- if (current[i] == consumer) {
- j = i;
- break;
- }
- }
+ /**
+ * The current tail of the linked structure holding the items.
+ */
+ Node tail;
- if (j < 0) {
- return;
- }
- CacheDisposable[] next;
+ /**
+ * How many items have been put into the tail node so far.
+ */
+ int tailOffset;
- if (n == 1) {
- next = EMPTY;
- } else {
- next = new CacheDisposable[n - 1];
- System.arraycopy(current, 0, next, 0, j);
- System.arraycopy(current, j + 1, next, j, n - j - 1);
- }
+ /**
+ * If the observers are {@link #TERMINATED}, this holds the terminal error if not null.
+ */
+ Throwable error;
- if (observers.compareAndSet(current, next)) {
- return;
- }
- }
- }
+ /**
+ * True if the source has terminated.
+ */
+ volatile boolean done;
- /**
- * Replays the contents of this cache to the given consumer based on its
- * current state and number of items requested by it.
- * @param consumer the consumer to continue replaying items to
- */
- void replay(CacheDisposable consumer) {
- // make sure there is only one replay going on at a time
- if (consumer.getAndIncrement() != 0) {
- return;
+ @SuppressWarnings("unchecked")
+ Multicaster(int capacityHint, final Node head) {
+ super(EMPTY);
+ this.tail = head;
+ this.capacityHint = capacityHint;
}
- // see if there were more replay request in the meantime
- int missed = 1;
- // read out state into locals upfront to avoid being re-read due to volatile reads
- long index = consumer.index;
- int offset = consumer.offset;
- Node node = consumer.node;
- Observer super T> downstream = consumer.downstream;
- int capacity = capacityHint;
-
- for (;;) {
- // if the consumer got disposed, clear the node and quit
- if (consumer.disposed) {
- consumer.node = null;
- return;
+ /**
+ * Atomically adds the consumer to the observers copy-on-write array
+ * if the source has not yet terminated.
+ * @param consumer the consumer to add
+ */
+ void add(CacheDisposable consumer) {
+ for (;;) {
+ CacheDisposable[] current = get();
+ if (current == TERMINATED) {
+ return;
+ }
+ int n = current.length;
+
+ @SuppressWarnings("unchecked")
+ CacheDisposable[] next = new CacheDisposable[n + 1];
+ System.arraycopy(current, 0, next, 0, n);
+ next[n] = consumer;
+
+ if (compareAndSet(current, next)) {
+ return;
+ }
}
+ }
- // first see if the source has terminated, read order matters!
- boolean sourceDone = done;
- // and if the number of items is the same as this consumer has received
- boolean empty = size == index;
-
- // if the source is done and we have all items so far, terminate the consumer
- if (sourceDone && empty) {
- // release the node object to avoid leaks through retained consumers
- consumer.node = null;
- // if error is not null then the source failed
- Throwable ex = error;
- if (ex != null) {
- downstream.onError(ex);
+ /**
+ * Atomically removes the consumer from the observers copy-on-write array.
+ * @param consumer the consumer to remove
+ */
+ @SuppressWarnings("unchecked")
+ void remove(CacheDisposable consumer) {
+ for (;;) {
+ CacheDisposable[] current = get();
+ int n = current.length;
+ if (n == 0) {
+ return;
+ }
+
+ int j = -1;
+ for (int i = 0; i < n; i++) {
+ if (current[i] == consumer) {
+ j = i;
+ break;
+ }
+ }
+
+ if (j < 0) {
+ return;
+ }
+ CacheDisposable[] next;
+
+ if (n == 1) {
+ next = EMPTY;
} else {
- downstream.onComplete();
+ next = new CacheDisposable[n - 1];
+ System.arraycopy(current, 0, next, 0, j);
+ System.arraycopy(current, j + 1, next, j, n - j - 1);
}
+
+ if (compareAndSet(current, next)) {
+ return;
+ }
+ }
+ }
+
+ /**
+ * Replays the contents of this cache to the given consumer based on its
+ * current state and number of items requested by it.
+ * @param consumer the consumer to continue replaying items to
+ */
+ void replay(CacheDisposable consumer) {
+ // make sure there is only one replay going on at a time
+ if (consumer.getAndIncrement() != 0) {
return;
}
- // there are still items not sent to the consumer
- if (!empty) {
- // if the offset in the current node has reached the node capacity
- if (offset == capacity) {
- // switch to the subsequent node
- node = node.next;
- // reset the in-node offset
- offset = 0;
+ // see if there were more replay request in the meantime
+ int missed = 1;
+ // read out state into locals upfront to avoid being re-read due to volatile reads
+ long index = consumer.index;
+ int offset = consumer.offset;
+ Node node = consumer.node;
+ Observer super T> downstream = consumer.downstream;
+ int capacity = capacityHint;
+
+ for (;;) {
+ // if the consumer got disposed, clear the node and quit
+ if (consumer.disposed) {
+ consumer.node = null;
+ return;
}
- // emit the cached item
- downstream.onNext(node.values[offset]);
-
- // move the node offset forward
- offset++;
- // move the total consumed item count forward
- index++;
+ // first see if the source has terminated, read order matters!
+ boolean sourceDone = done;
+ // and if the number of items is the same as this consumer has received
+ boolean empty = size == index;
+
+ // if the source is done and we have all items so far, terminate the consumer
+ if (sourceDone && empty) {
+ // release the node object to avoid leaks through retained consumers
+ consumer.node = null;
+ // if error is not null then the source failed
+ Throwable ex = error;
+ if (ex != null) {
+ downstream.onError(ex);
+ } else {
+ downstream.onComplete();
+ }
+ return;
+ }
- // retry for the next item/terminal event if any
- continue;
- }
+ // there are still items not sent to the consumer
+ if (!empty) {
+ // if the offset in the current node has reached the node capacity
+ if (offset == capacity) {
+ // switch to the subsequent node
+ node = node.next;
+ // reset the in-node offset
+ offset = 0;
+ }
+
+ // emit the cached item
+ downstream.onNext(node.values[offset]);
+
+ // move the node offset forward
+ offset++;
+ // move the total consumed item count forward
+ index++;
+
+ // retry for the next item/terminal event if any
+ continue;
+ }
- // commit the changed references back
- consumer.index = index;
- consumer.offset = offset;
- consumer.node = node;
- // release the changes and see if there were more replay request in the meantime
- missed = consumer.addAndGet(-missed);
- if (missed == 0) {
- break;
+ // commit the changed references back
+ consumer.index = index;
+ consumer.offset = offset;
+ consumer.node = node;
+ // release the changes and see if there were more replay request in the meantime
+ missed = consumer.addAndGet(-missed);
+ if (missed == 0) {
+ break;
+ }
}
}
- }
- @Override
- public void onSubscribe(Disposable d) {
- // we can't do much with the upstream disposable
- }
-
- @Override
- public void onNext(T t) {
- int tailOffset = this.tailOffset;
- // if the current tail node is full, create a fresh node
- if (tailOffset == capacityHint) {
- Node n = new Node<>(tailOffset);
- n.values[0] = t;
- this.tailOffset = 1;
- tail.next = n;
- tail = n;
- } else {
- tail.values[tailOffset] = t;
- this.tailOffset = tailOffset + 1;
+ @Override
+ public void onSubscribe(Disposable d) {
+ // we can't do much with the upstream disposable
}
- size++;
- for (CacheDisposable consumer : observers.get()) {
- replay(consumer);
+
+ @Override
+ public void onNext(T t) {
+ int tailOffset = this.tailOffset;
+ // if the current tail node is full, create a fresh node
+ if (tailOffset == capacityHint) {
+ Node n = new Node<>(tailOffset);
+ n.values[0] = t;
+ this.tailOffset = 1;
+ tail.next = n;
+ tail = n;
+ } else {
+ tail.values[tailOffset] = t;
+ this.tailOffset = tailOffset + 1;
+ }
+ size++;
+ for (CacheDisposable consumer : get()) {
+ replay(consumer);
+ }
}
- }
- @SuppressWarnings("unchecked")
- @Override
- public void onError(Throwable t) {
- error = t;
- done = true;
- for (CacheDisposable consumer : observers.getAndSet(TERMINATED)) {
- replay(consumer);
+ @SuppressWarnings("unchecked")
+ @Override
+ public void onError(Throwable t) {
+ error = t;
+ done = true;
+ // No additional events will arrive, so now we can clear the 'tail' reference
+ tail = null;
+ for (CacheDisposable consumer : getAndSet(TERMINATED)) {
+ replay(consumer);
+ }
}
- }
- @SuppressWarnings("unchecked")
- @Override
- public void onComplete() {
- done = true;
- for (CacheDisposable consumer : observers.getAndSet(TERMINATED)) {
- replay(consumer);
+ @SuppressWarnings("unchecked")
+ @Override
+ public void onComplete() {
+ done = true;
+ // No additional events will arrive, so now we can clear the 'tail' reference
+ tail = null;
+ for (CacheDisposable consumer : getAndSet(TERMINATED)) {
+ replay(consumer);
+ }
}
}
@@ -338,7 +357,7 @@ static final class CacheDisposable extends AtomicInteger
final Observer super T> downstream;
- final ObservableCache parent;
+ final Multicaster parent;
Node node;
@@ -353,11 +372,12 @@ static final class CacheDisposable extends AtomicInteger
* the parent cache object.
* @param downstream the actual consumer
* @param parent the parent that holds onto the cached items
+ * @param head the first node in the linked list
*/
- CacheDisposable(Observer super T> downstream, ObservableCache parent) {
+ CacheDisposable(Observer super T> downstream, Multicaster parent, Node head) {
this.downstream = downstream;
this.parent = parent;
- this.node = parent.head;
+ this.node = head;
}
@Override
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableDelay.java b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableDelay.java
index 1801cce1f2..7c01c23f90 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableDelay.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableDelay.java
@@ -111,7 +111,9 @@ final class OnNext implements Runnable {
@Override
public void run() {
- downstream.onNext(t);
+ if (!w.isDisposed()) {
+ downstream.onNext(t);
+ }
}
}
diff --git a/src/main/java/io/reactivex/rxjava3/subjects/ReplaySubject.java b/src/main/java/io/reactivex/rxjava3/subjects/ReplaySubject.java
index 4d5eb3335f..e103594ba5 100644
--- a/src/main/java/io/reactivex/rxjava3/subjects/ReplaySubject.java
+++ b/src/main/java/io/reactivex/rxjava3/subjects/ReplaySubject.java
@@ -652,8 +652,6 @@ static final class UnboundedReplayBuffer
final List