diff --git a/.github/workflows/gradle-wrapper-validation.yml b/.github/workflows/gradle-wrapper-validation.yml index 44e5c1a692..3e905f1c92 100644 --- a/.github/workflows/gradle-wrapper-validation.yml +++ b/.github/workflows/gradle-wrapper-validation.yml @@ -9,5 +9,5 @@ jobs: name: "Validation" runs-on: ubuntu-latest steps: - - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2 + - uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1 - uses: gradle/wrapper-validation-action@f9c9c575b8b21b6485636a91ffecd10e558c62f6 # v3.5.0 diff --git a/.github/workflows/gradle_branch.yml b/.github/workflows/gradle_branch.yml index 650cbe7c05..cb7231a7bf 100644 --- a/.github/workflows/gradle_branch.yml +++ b/.github/workflows/gradle_branch.yml @@ -15,14 +15,14 @@ jobs: runs-on: ubuntu-latest steps: - - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2 - - name: Set up JDK 8 - uses: actions/setup-java@c5195efecf7bdfc987ee8bae7a71cb8b11521c00 # v4.7.1 + - uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1 + - name: Set up JDK 11 + uses: actions/setup-java@f2beeb24e141e01a676f977032f5a29d81c9e27e # v5.1.0 with: distribution: 'zulu' - java-version: '8' + java-version: '11' - name: Cache Gradle packages - uses: actions/cache@5a3ec84eff668545956fd18022155c47e93e2684 # v4.2.3 + uses: actions/cache@9255dc7a253b0ccc959486e2bca901246202afeb # v5.0.1 with: path: ~/.gradle/caches key: ${{ runner.os }}-gradle-${{ secrets.CACHE_VERSION }}-${{ hashFiles('**/*.gradle') }} @@ -32,6 +32,6 @@ jobs: - name: Build RxJava run: ./gradlew build --stacktrace - name: Upload to Codecov - uses: codecov/codecov-action@b9fd7d16f6d7d1b5d2bec1a2887e65ceed900238 # v4.6.0 + uses: codecov/codecov-action@671740ac38dd9b0130fbe1cec585b89eea48d3de # v5.5.2 - name: Generate Javadoc run: ./gradlew javadoc --stacktrace diff --git a/.github/workflows/gradle_jdk11.yml b/.github/workflows/gradle_jdk11.yml index 33c781fca9..771a1afe0f 100644 --- a/.github/workflows/gradle_jdk11.yml +++ b/.github/workflows/gradle_jdk11.yml @@ -12,19 +12,22 @@ on: permissions: contents: read +env: + BUILD_WITH_11: true + jobs: build: runs-on: ubuntu-latest steps: - - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2 + - uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1 - name: Set up JDK 11 - uses: actions/setup-java@c5195efecf7bdfc987ee8bae7a71cb8b11521c00 # v4.7.1 + uses: actions/setup-java@f2beeb24e141e01a676f977032f5a29d81c9e27e # v5.1.0 with: distribution: 'zulu' java-version: '11' - name: Cache Gradle packages - uses: actions/cache@5a3ec84eff668545956fd18022155c47e93e2684 # v4.2.3 + uses: actions/cache@9255dc7a253b0ccc959486e2bca901246202afeb # v5.0.1 with: path: ~/.gradle/caches key: ${{ runner.os }}-gradle-1-${{ hashFiles('**/*.gradle') }} @@ -35,5 +38,5 @@ jobs: run: ./gradlew -PjavaCompatibility=9 jar - name: Build RxJava run: ./gradlew build --stacktrace - - name: Generate Javadoc - run: ./gradlew javadoc --stacktrace +# - name: Generate Javadoc +# run: ./gradlew javadoc --stacktrace diff --git a/.github/workflows/gradle_pr.yml b/.github/workflows/gradle_pr.yml index 712139379f..f76409cd46 100644 --- a/.github/workflows/gradle_pr.yml +++ b/.github/workflows/gradle_pr.yml @@ -15,14 +15,14 @@ jobs: runs-on: ubuntu-latest steps: - - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2 - - name: Set up JDK 8 - uses: actions/setup-java@c5195efecf7bdfc987ee8bae7a71cb8b11521c00 # v4.7.1 + - uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1 + - name: Set up JDK 11 + uses: actions/setup-java@f2beeb24e141e01a676f977032f5a29d81c9e27e # v5.1.0 with: distribution: 'zulu' - java-version: '8' + java-version: '11' - name: Cache Gradle packages - uses: actions/cache@5a3ec84eff668545956fd18022155c47e93e2684 # v4.2.3 + uses: actions/cache@9255dc7a253b0ccc959486e2bca901246202afeb # v5.0.1 with: path: ~/.gradle/caches key: ${{ runner.os }}-gradle-1-${{ hashFiles('**/*.gradle') }} @@ -32,6 +32,6 @@ jobs: - name: Build RxJava run: ./gradlew build --stacktrace - name: Upload to Codecov - uses: codecov/codecov-action@b9fd7d16f6d7d1b5d2bec1a2887e65ceed900238 # v4.6.0 + uses: codecov/codecov-action@671740ac38dd9b0130fbe1cec585b89eea48d3de # v5.5.2 - name: Generate Javadoc run: ./gradlew javadoc --stacktrace diff --git a/.github/workflows/gradle_release.yml b/.github/workflows/gradle_release.yml index a341c27ffc..b08c3fe08e 100644 --- a/.github/workflows/gradle_release.yml +++ b/.github/workflows/gradle_release.yml @@ -22,14 +22,14 @@ jobs: env: CI_BUILD_NUMBER: ${{ github.run_number }} steps: - - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2 - - name: Set up JDK 8 - uses: actions/setup-java@c5195efecf7bdfc987ee8bae7a71cb8b11521c00 # v4.7.1 + - uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1 + - name: Set up JDK 11 + uses: actions/setup-java@f2beeb24e141e01a676f977032f5a29d81c9e27e # v5.1.0 with: distribution: 'zulu' - java-version: '8' + java-version: '11' - name: Cache Gradle packages - uses: actions/cache@5a3ec84eff668545956fd18022155c47e93e2684 # v4.2.3 + uses: actions/cache@9255dc7a253b0ccc959486e2bca901246202afeb # v5.0.1 with: path: ~/.gradle/caches key: ${{ runner.os }}-gradle-${{ secrets.CACHE_VERSION }}-${{ hashFiles('**/*.gradle') }} @@ -43,9 +43,18 @@ jobs: - name: Build RxJava run: ./gradlew build --stacktrace --no-daemon - name: Upload to Codecov - uses: codecov/codecov-action@b9fd7d16f6d7d1b5d2bec1a2887e65ceed900238 # v4.6.0 - - name: Upload release - run: ./gradlew -PreleaseMode=full publish --no-daemon --no-parallel --stacktrace + uses: codecov/codecov-action@671740ac38dd9b0130fbe1cec585b89eea48d3de # v5.5.2 +# - name: Upload release +# run: ./gradlew -PreleaseMode=full publish --no-daemon --no-parallel --stacktrace +# env: +# # Define secrets at https://github.com/ReactiveX/RxJava/settings/secrets/actions +# # ------------------------------------------------------------------------------ +# ORG_GRADLE_PROJECT_mavenCentralUsername: ${{ secrets.SONATYPE_USER }} +# ORG_GRADLE_PROJECT_mavenCentralPassword: ${{ secrets.SONATYPE_PASSWORD }} +# ORG_GRADLE_PROJECT_SIGNING_PRIVATE_KEY: ${{ secrets.SIGNING_PRIVATE_KEY }} +# ORG_GRADLE_PROJECT_SIGNING_PASSWORD: ${{ secrets.SIGNING_PASSWORD }} + - name: Publish release + run: ./gradlew -PreleaseMode=full publishAndReleaseToMavenCentral --no-configuration-cache --no-daemon --no-parallel --stacktrace env: # Define secrets at https://github.com/ReactiveX/RxJava/settings/secrets/actions # ------------------------------------------------------------------------------ @@ -53,13 +62,6 @@ jobs: ORG_GRADLE_PROJECT_mavenCentralPassword: ${{ secrets.SONATYPE_PASSWORD }} ORG_GRADLE_PROJECT_SIGNING_PRIVATE_KEY: ${{ secrets.SIGNING_PRIVATE_KEY }} ORG_GRADLE_PROJECT_SIGNING_PASSWORD: ${{ secrets.SIGNING_PASSWORD }} - - name: Publish release - run: ./gradlew -PreleaseMode=full closeAndReleaseRepository --no-daemon --no-parallel --stacktrace - env: - # Define secrets at https://github.com/ReactiveX/RxJava/settings/secrets/actions - # ------------------------------------------------------------------------------ - ORG_GRADLE_PROJECT_mavenCentralUsername: ${{ secrets.SONATYPE_USER }} - ORG_GRADLE_PROJECT_mavenCentralPassword: ${{ secrets.SONATYPE_PASSWORD }} - name: Push Javadoc run: ./push_javadoc.sh env: diff --git a/.github/workflows/gradle_snapshot.yml b/.github/workflows/gradle_snapshot.yml index 5a66712f73..5dca06b523 100644 --- a/.github/workflows/gradle_snapshot.yml +++ b/.github/workflows/gradle_snapshot.yml @@ -21,14 +21,14 @@ jobs: # ------------------------------------------------------------------------------ CI_BUILD_NUMBER: ${{ github.run_number }} steps: - - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2 - - name: Set up JDK 8 - uses: actions/setup-java@c5195efecf7bdfc987ee8bae7a71cb8b11521c00 # v4.7.1 + - uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1 + - name: Set up JDK 11 + uses: actions/setup-java@f2beeb24e141e01a676f977032f5a29d81c9e27e # v5.1.0 with: distribution: 'zulu' - java-version: '8' + java-version: '11' - name: Cache Gradle packages - uses: actions/cache@5a3ec84eff668545956fd18022155c47e93e2684 # v4.2.3 + uses: actions/cache@9255dc7a253b0ccc959486e2bca901246202afeb # v5.0.1 with: path: ~/.gradle/caches key: ${{ runner.os }}-gradle-${{ secrets.CACHE_VERSION }}-${{ hashFiles('**/*.gradle') }} @@ -40,14 +40,14 @@ jobs: - name: Build RxJava run: ./gradlew build --stacktrace --no-daemon - name: Upload Snapshot - run: ./gradlew -PreleaseMode=branch publish --no-daemon --no-parallel --stacktrace + run: ./gradlew -PreleaseMode=branch publishAllPublicationsToMavenCentralRepository --no-daemon --no-parallel --stacktrace env: # Define secrets at https://github.com/ReactiveX/RxJava/settings/secrets/actions # ------------------------------------------------------------------------------ ORG_GRADLE_PROJECT_mavenCentralUsername: ${{ secrets.SONATYPE_USER }} ORG_GRADLE_PROJECT_mavenCentralPassword: ${{ secrets.SONATYPE_PASSWORD }} - name: Upload to Codecov - uses: codecov/codecov-action@b9fd7d16f6d7d1b5d2bec1a2887e65ceed900238 # v4.6.0 + uses: codecov/codecov-action@671740ac38dd9b0130fbe1cec585b89eea48d3de # v5.5.2 - name: Push Javadoc run: ./push_javadoc.sh # Define secrets at https://github.com/ReactiveX/RxJava/settings/secrets/actions diff --git a/.github/workflows/scorecard.yml b/.github/workflows/scorecard.yml index 2fe557cec1..1ebd21eb28 100644 --- a/.github/workflows/scorecard.yml +++ b/.github/workflows/scorecard.yml @@ -24,12 +24,12 @@ jobs: steps: - name: "Checkout code" - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2 + uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1 with: persist-credentials: false - name: "Run analysis" - uses: ossf/scorecard-action@f49aabe0b5af0936a0987cfb85d86b75731b0186 # v2.4.1 + uses: ossf/scorecard-action@4eaacf0543bb3f2c246792bd56e8cdeffafb205a # v2.4.3 with: results_file: results.sarif results_format: sarif @@ -46,7 +46,7 @@ jobs: # Upload the results as artifacts (optional). Commenting out will disable uploads of run results in SARIF # format to the repository Actions tab. - name: "Upload artifact" - uses: actions/upload-artifact@ea165f8d65b6e75b540449e92b4886f43607fa02 # v4.6.2 + uses: actions/upload-artifact@b7c566a772e6b6bfb58ed0dc250532a479d7789f # v6.0.0 with: name: SARIF file path: results.sarif @@ -54,6 +54,6 @@ jobs: # Upload the results to GitHub's code scanning dashboard. - name: "Upload to code-scanning" - uses: github/codeql-action/upload-sarif@45775bd8235c68ba998cffa5171334d58593da47 # v3.28.15 + uses: github/codeql-action/upload-sarif@5d4e8d1aca955e8d8589aabd499c5cae939e33c7 # v3.29.5 with: sarif_file: results.sarif diff --git a/README.md b/README.md index 1456d2b68c..5276c0bd37 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ [![codecov.io](http://codecov.io/github/ReactiveX/RxJava/coverage.svg?branch=3.x)](https://codecov.io/gh/ReactiveX/RxJava/branch/3.x) -[![Maven Central](https://maven-badges.herokuapp.com/maven-central/io.reactivex.rxjava3/rxjava/badge.svg)](https://maven-badges.herokuapp.com/maven-central/io.reactivex.rxjava3/rxjava) +[![Maven Central](https://maven-badges.sml.io/sonatype-central/io.reactivex.rxjava3/rxjava/badge.svg)](https://maven-badges.sml.io/sonatype-central/io.reactivex.rxjava3/rxjava) [![Contribute with Gitpod](https://img.shields.io/badge/Contribute%20with-Gitpod-908a85?logo=gitpod)](https://gitpod.io/#https://github.com/ReactiveX/RxJava) [![OpenSSF Scorecard](https://api.securityscorecards.dev/projects/github.com/ReactiveX/RxJava/badge)](https://securityscorecards.dev/viewer/?uri=github.com/ReactiveX/RxJava) @@ -48,7 +48,7 @@ The first step is to include RxJava 3 into your project, for example, as a Gradl implementation "io.reactivex.rxjava3:rxjava:3.x.y" ``` -(Please replace `x` and `y` with the latest version numbers: [![Maven Central](https://maven-badges.herokuapp.com/maven-central/io.reactivex.rxjava3/rxjava/badge.svg)](https://maven-badges.herokuapp.com/maven-central/io.reactivex.rxjava3/rxjava) +(Please replace `x` and `y` with the latest version numbers: [![Maven Central](https://maven-badges.sml.io/sonatype-central/io.reactivex.rxjava3/rxjava/badge.svg)](https://maven-badges.sml.io/sonatype-central/io.reactivex.rxjava3/rxjava) ) ### Hello World @@ -510,7 +510,7 @@ For further details, consult the [wiki](https://github.com/ReactiveX/RxJava/wiki - Google Group: [RxJava](http://groups.google.com/d/forum/rxjava) - Twitter: [@RxJava](http://twitter.com/RxJava) - [GitHub Issues](https://github.com/ReactiveX/RxJava/issues) -- StackOverflow: [rx-java](http://stackoverflow.com/questions/tagged/rx-java) and [rx-java2](http://stackoverflow.com/questions/tagged/rx-java2) +- StackOverflow: [rx-java](http://stackoverflow.com/questions/tagged/rx-java), [rx-java2](http://stackoverflow.com/questions/tagged/rx-java2) and [rx-java3](http://stackoverflow.com/questions/tagged/rx-java3) - [Gitter.im](https://gitter.im/ReactiveX/RxJava) ## Versioning @@ -571,11 +571,11 @@ and for Ivy: ### Snapshots -Snapshots after May 1st, 2021 are available via https://oss.sonatype.org/content/repositories/snapshots/io/reactivex/rxjava3/rxjava/ +Snapshots after May 19st, 2025 are available via https://central.sonatype.com/repository/maven-snapshots/io/reactivex/rxjava3/rxjava/ ```groovy repositories { - maven { url 'https://oss.sonatype.org/content/repositories/snapshots' } + maven { url 'https://central.sonatype.com/repository/maven-snapshots' } } dependencies { @@ -583,7 +583,7 @@ dependencies { } ``` -JavaDoc snapshots are available at http://reactivex.io/RxJava/3.x/javadoc/snapshot +JavaDoc snapshots are available at https://reactivex.io/RxJava/3.x/javadoc/snapshot ## Build diff --git a/build.gradle b/build.gradle index 39703ec95e..8bcfddb717 100644 --- a/build.gradle +++ b/build.gradle @@ -4,12 +4,13 @@ plugins { id("eclipse") id("jacoco") id("maven-publish") - id("ru.vyarus.animalsniffer") version "2.0.0" - id("me.champeau.gradle.jmh") version "0.5.3" + id("ru.vyarus.animalsniffer") version "2.0.1" + id("me.champeau.jmh") version "0.7.3" id("com.github.hierynomus.license") version "0.16.1" id("biz.aQute.bnd.builder") version "6.4.0" - id("com.vanniktech.maven.publish") version "0.19.0" - id("org.beryx.jar") version "1.2.0" + id("com.vanniktech.maven.publish") version "0.33.0" + id("org.beryx.jar") version "2.0.0" + id("signing") } ext { @@ -18,7 +19,7 @@ ext { testNgVersion = "7.5" mockitoVersion = "4.11.0" jmhLibVersion = "1.21" - guavaVersion = "33.4.8-jre" + guavaVersion = "33.5.0-jre" } def releaseTag = System.getenv("BUILD_TAG") @@ -49,7 +50,16 @@ dependencies { testImplementation "com.google.guava:guava:$guavaVersion" } +def buildWith11 = System.getenv("BUILD_WITH_11") java { + toolchain { + vendor = JvmVendorSpec.ADOPTIUM + if ("true".equals(buildWith11)) { + languageVersion = JavaLanguageVersion.of(11) + } else { + languageVersion = JavaLanguageVersion.of(8) + } + } sourceCompatibility = JavaVersion.VERSION_1_8 targetCompatibility = JavaVersion.VERSION_1_8 } @@ -86,12 +96,19 @@ animalsniffer { annotation = "io.reactivex.rxjava3.internal.util.SuppressAnimalSniffer" } +moduleConfig { + moduleInfoPath = 'src/main/module/module-info.java' + multiReleaseVersion = 9 + version = project.version +} + jar { from('.') { include 'LICENSE' include 'COPYRIGHT' into('META-INF/') } + exclude("module-info.class") // Cover for bnd still not supporting MR Jars: https://github.com/bndtools/bnd/issues/2227 bnd('-fixupmessages': '^Classes found in the wrong directory: \\\\{META-INF/versions/9/module-info\\\\.class=module-info}$') @@ -106,8 +123,6 @@ jar { "Bundle-SymbolicName": "io.reactivex.rxjava3.rxjava", "Multi-Release": "true" ) - - moduleInfoPath = 'src/main/module/module-info.java' } license { @@ -126,8 +141,8 @@ jmh { jvmArgsAppend = ["-Djmh.separateClasspathJAR=true"] if (project.hasProperty("jmh")) { - include = [".*" + project.jmh + ".*"] - logger.info("JMH: {}", include) + includes = [".*" + project.jmh + ".*"] + logger.info("JMH: {}", includes) } } @@ -166,8 +181,9 @@ jacocoTestReport { dependsOn testNG reports { - xml.enabled = true - html.enabled = true + xml.required.set(true) + csv.required.set(false) + html.required.set(true) } } @@ -179,44 +195,25 @@ checkstyle { "checkstyle.suppressions.file": project.file("config/checkstyle/suppressions.xml"), "checkstyle.header.file" : project.file("config/license/HEADER_JAVA") ] + checkstyleMain.exclude '**/module-info.java' } if (project.hasProperty("releaseMode")) { logger.lifecycle("ReleaseMode: {}", project.releaseMode) - /* - if ("branch" == project.releaseMode) { - - if (version.endsWith("-SNAPSHOT")) { - publishing { - repositories { - maven { - url = "https://s01.oss.sonatype.org/content/repositories/snapshots/" - } - } - } - - mavenPublish { - nexus { - stagingProfile = "io.reactivex" - } - } - } - } - */ if ("full" == project.releaseMode) { signing { if (project.hasProperty("SIGNING_PRIVATE_KEY") && project.hasProperty("SIGNING_PASSWORD")) { useInMemoryPgpKeys(project.getProperty("SIGNING_PRIVATE_KEY"), project.getProperty("SIGNING_PASSWORD")) + sign(publishing.publications) } } - /* - mavenPublish { - nexus { - stagingProfile = "io.reactivex" - } - } - */ + } + mavenPublishing { + // or when publishing to https://central.sonatype.com/ + publishToMavenCentral(com.vanniktech.maven.publish.SonatypeHost.CENTRAL_PORTAL) + + // signAllPublications() } } diff --git a/docs/Additional-Reading.md b/docs/Additional-Reading.md index 85e7d47077..4badd81308 100644 --- a/docs/Additional-Reading.md +++ b/docs/Additional-Reading.md @@ -3,7 +3,7 @@ A more complete and up-to-date list of resources can be found at the [reactivex. # Introducing Reactive Programming * [Introduction to Rx](http://www.introtorx.com/): a free, on-line book by Lee Campbell **(1.x)** * [The introduction to Reactive Programming you've been missing](https://gist.github.com/staltz/868e7e9bc2a7b8c1f754) by Andre Staltz -* [Mastering Observables](http://docs.couchbase.com/developer/java-2.0/observables.html) from the Couchbase documentation **(1.x)** +* [Mastering Observables](https://docs.huihoo.com/couchbase/developer-guide/java-2.0/observables.html) from the Couchbase documentation **(1.x)** * [Reactive Programming in Java 8 With RxJava](http://pluralsight.com/training/Courses/TableOfContents/reactive-programming-java-8-rxjava), a course designed by Russell Elledge **(1.x)** * [33rd Degree Reactive Java](http://www.slideshare.net/tkowalcz/33rd-degree-reactive-java) by Tomasz Kowalczewski **(1.x)** * [What Every Hipster Should Know About Functional Reactive Programming](http://www.infoq.com/presentations/game-functional-reactive-programming) - Bodil Stokke demos the creation of interactive game mechanics in RxJS diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index c7d437bbb4..3c44eb1b6f 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,6 +1,6 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-7.6.4-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-8.14-bin.zip networkTimeout=10000 zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists diff --git a/src/jmh/java/io/reactivex/rxjava3/core/BinaryFlatMapPerf.java b/src/jmh/java/io/reactivex/rxjava3/core/BinaryFlatMapPerf.java index c4a0a385a5..6e6ae7e3c6 100644 --- a/src/jmh/java/io/reactivex/rxjava3/core/BinaryFlatMapPerf.java +++ b/src/jmh/java/io/reactivex/rxjava3/core/BinaryFlatMapPerf.java @@ -139,9 +139,9 @@ public Observable apply(Integer v) { } }); - singleFlatMapHideObservable = Single.just(1).flatMapObservable(new Function>() { + singleFlatMapHideObservable = Single.just(1).flatMapObservable(new Function>() { @Override - public Observable apply(Integer v) { + public Observable apply(Integer v) { return arrayObservableHide; } }); @@ -153,16 +153,16 @@ public Iterable apply(Integer v) { } }); - maybeFlatMapObservable = Maybe.just(1).flatMapObservable(new Function>() { + maybeFlatMapObservable = Maybe.just(1).flatMapObservable(new Function>() { @Override - public Observable apply(Integer v) { + public Observable apply(Integer v) { return arrayObservable; } }); - maybeFlatMapHideObservable = Maybe.just(1).flatMapObservable(new Function>() { + maybeFlatMapHideObservable = Maybe.just(1).flatMapObservable(new Function>() { @Override - public Observable apply(Integer v) { + public Observable apply(Integer v) { return arrayObservableHide; } }); diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableConcatMapMaybePerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableConcatMapMaybePerf.java index 87ee5a07e4..4310ea2e95 100644 --- a/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableConcatMapMaybePerf.java +++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableConcatMapMaybePerf.java @@ -60,9 +60,9 @@ public Publisher apply(Integer v) { } }); - flowableDedicated = source.concatMapMaybe(new Function>() { + flowableDedicated = source.concatMapMaybe(new Function>() { @Override - public Maybe apply(Integer v) { + public Maybe apply(Integer v) { return Maybe.just(v); } }); diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableFlatMapMaybeEmptyPerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableFlatMapMaybeEmptyPerf.java index 8ab19a00c6..699f76c074 100644 --- a/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableFlatMapMaybeEmptyPerf.java +++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableFlatMapMaybeEmptyPerf.java @@ -60,9 +60,9 @@ public Publisher apply(Integer v) { } }); - flowableDedicated = source.flatMapMaybe(new Function>() { + flowableDedicated = source.flatMapMaybe(new Function>() { @Override - public Maybe apply(Integer v) { + public Maybe apply(Integer v) { return Maybe.empty(); } }); diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableFlatMapMaybePerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableFlatMapMaybePerf.java index d0f3730b42..f81ed10ec3 100644 --- a/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableFlatMapMaybePerf.java +++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableFlatMapMaybePerf.java @@ -60,9 +60,9 @@ public Publisher apply(Integer v) { } }); - flowableDedicated = source.flatMapMaybe(new Function>() { + flowableDedicated = source.flatMapMaybe(new Function>() { @Override - public Maybe apply(Integer v) { + public Maybe apply(Integer v) { return Maybe.just(v); } }); diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableFlatMapSinglePerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableFlatMapSinglePerf.java index 4f50938647..5a92bf20ff 100644 --- a/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableFlatMapSinglePerf.java +++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableFlatMapSinglePerf.java @@ -60,9 +60,9 @@ public Publisher apply(Integer v) { } }); - flowableDedicated = source.flatMapSingle(new Function>() { + flowableDedicated = source.flatMapSingle(new Function>() { @Override - public Single apply(Integer v) { + public Single apply(Integer v) { return Single.just(v); } }); diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableSwitchMapMaybeEmptyPerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableSwitchMapMaybeEmptyPerf.java index 83ad00e0f9..46ce694f6d 100644 --- a/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableSwitchMapMaybeEmptyPerf.java +++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableSwitchMapMaybeEmptyPerf.java @@ -60,9 +60,9 @@ public Publisher apply(Integer v) { } }); - flowableDedicated = source.switchMapMaybe(new Function>() { + flowableDedicated = source.switchMapMaybe(new Function>() { @Override - public Maybe apply(Integer v) { + public Maybe apply(Integer v) { return Maybe.empty(); } }); diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableSwitchMapMaybePerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableSwitchMapMaybePerf.java index e36b49c4d3..e96bbc3919 100644 --- a/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableSwitchMapMaybePerf.java +++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableSwitchMapMaybePerf.java @@ -60,9 +60,9 @@ public Publisher apply(Integer v) { } }); - flowableDedicated = source.switchMapMaybe(new Function>() { + flowableDedicated = source.switchMapMaybe(new Function>() { @Override - public Maybe apply(Integer v) { + public Maybe apply(Integer v) { return Maybe.just(v); } }); diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableSwitchMapSinglePerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableSwitchMapSinglePerf.java index 0da6941895..ef06ebfa66 100644 --- a/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableSwitchMapSinglePerf.java +++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableSwitchMapSinglePerf.java @@ -60,9 +60,9 @@ public Publisher apply(Integer v) { } }); - flowableDedicated = source.switchMapSingle(new Function>() { + flowableDedicated = source.switchMapSingle(new Function>() { @Override - public Single apply(Integer v) { + public Single apply(Integer v) { return Single.just(v); } }); diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableConcatMapCompletablePerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableConcatMapCompletablePerf.java index 48b20dc005..2229eed77a 100644 --- a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableConcatMapCompletablePerf.java +++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableConcatMapCompletablePerf.java @@ -45,16 +45,16 @@ public void setup() { Observable source = Observable.fromArray(sourceArray); - observablePlain = source.concatMap(new Function>() { + observablePlain = source.concatMap(new Function>() { @Override - public Observable apply(Integer v) { + public Observable apply(Integer v) { return Observable.empty(); } }); - observableConvert = source.concatMap(new Function>() { + observableConvert = source.concatMap(new Function>() { @Override - public Observable apply(Integer v) { + public Observable apply(Integer v) { return Completable.complete().toObservable(); } }); diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableConcatMapMaybeEmptyPerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableConcatMapMaybeEmptyPerf.java index 4528c90b50..cfde5183e5 100644 --- a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableConcatMapMaybeEmptyPerf.java +++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableConcatMapMaybeEmptyPerf.java @@ -45,23 +45,23 @@ public void setup() { Observable source = Observable.fromArray(sourceArray); - observablePlain = source.concatMap(new Function>() { + observablePlain = source.concatMap(new Function>() { @Override - public Observable apply(Integer v) { + public Observable apply(Integer v) { return Observable.empty(); } }); - concatMapToObservableEmpty = source.concatMap(new Function>() { + concatMapToObservableEmpty = source.concatMap(new Function>() { @Override - public Observable apply(Integer v) { + public Observable apply(Integer v) { return Maybe.empty().toObservable(); } }); - observableDedicated = source.concatMapMaybe(new Function>() { + observableDedicated = source.concatMapMaybe(new Function>() { @Override - public Maybe apply(Integer v) { + public Maybe apply(Integer v) { return Maybe.empty(); } }); diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableConcatMapMaybePerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableConcatMapMaybePerf.java index 204020abfe..75e7506724 100644 --- a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableConcatMapMaybePerf.java +++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableConcatMapMaybePerf.java @@ -45,23 +45,23 @@ public void setup() { Observable source = Observable.fromArray(sourceArray); - observablePlain = source.concatMap(new Function>() { + observablePlain = source.concatMap(new Function>() { @Override - public Observable apply(Integer v) { + public Observable apply(Integer v) { return Observable.just(v); } }); - observableConvert = source.concatMap(new Function>() { + observableConvert = source.concatMap(new Function>() { @Override - public Observable apply(Integer v) { + public Observable apply(Integer v) { return Maybe.just(v).toObservable(); } }); - observableDedicated = source.concatMapMaybe(new Function>() { + observableDedicated = source.concatMapMaybe(new Function>() { @Override - public Maybe apply(Integer v) { + public Maybe apply(Integer v) { return Maybe.just(v); } }); diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableConcatMapSinglePerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableConcatMapSinglePerf.java index e2e34b24f5..4227791222 100644 --- a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableConcatMapSinglePerf.java +++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableConcatMapSinglePerf.java @@ -45,23 +45,23 @@ public void setup() { Observable source = Observable.fromArray(sourceArray); - observablePlain = source.concatMap(new Function>() { + observablePlain = source.concatMap(new Function>() { @Override - public Observable apply(Integer v) { + public Observable apply(Integer v) { return Observable.just(v); } }); - observableConvert = source.concatMap(new Function>() { + observableConvert = source.concatMap(new Function>() { @Override - public Observable apply(Integer v) { + public Observable apply(Integer v) { return Single.just(v).toObservable(); } }); - observableDedicated = source.concatMapSingle(new Function>() { + observableDedicated = source.concatMapSingle(new Function>() { @Override - public Single apply(Integer v) { + public Single apply(Integer v) { return Single.just(v); } }); diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableFlatMapCompletablePerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableFlatMapCompletablePerf.java index b6daa57eb6..6a916a68f1 100644 --- a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableFlatMapCompletablePerf.java +++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableFlatMapCompletablePerf.java @@ -45,16 +45,16 @@ public void setup() { Observable source = Observable.fromArray(sourceArray); - observablePlain = source.flatMap(new Function>() { + observablePlain = source.flatMap(new Function>() { @Override - public Observable apply(Integer v) { + public Observable apply(Integer v) { return Observable.empty(); } }); - observableConvert = source.flatMap(new Function>() { + observableConvert = source.flatMap(new Function>() { @Override - public Observable apply(Integer v) { + public Observable apply(Integer v) { return Completable.complete().toObservable(); } }); diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableFlatMapMaybeEmptyPerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableFlatMapMaybeEmptyPerf.java index 5d0327fa46..377a8bba93 100644 --- a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableFlatMapMaybeEmptyPerf.java +++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableFlatMapMaybeEmptyPerf.java @@ -45,23 +45,23 @@ public void setup() { Observable source = Observable.fromArray(sourceArray); - observablePlain = source.flatMap(new Function>() { + observablePlain = source.flatMap(new Function>() { @Override - public Observable apply(Integer v) { + public Observable apply(Integer v) { return Observable.empty(); } }); - observableConvert = source.flatMap(new Function>() { + observableConvert = source.flatMap(new Function>() { @Override - public Observable apply(Integer v) { + public Observable apply(Integer v) { return Maybe.empty().toObservable(); } }); - observableDedicated = source.flatMapMaybe(new Function>() { + observableDedicated = source.flatMapMaybe(new Function>() { @Override - public Maybe apply(Integer v) { + public Maybe apply(Integer v) { return Maybe.empty(); } }); diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableFlatMapMaybePerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableFlatMapMaybePerf.java index e2a7c43bea..248ca98112 100644 --- a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableFlatMapMaybePerf.java +++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableFlatMapMaybePerf.java @@ -45,23 +45,23 @@ public void setup() { Observable source = Observable.fromArray(sourceArray); - observablePlain = source.flatMap(new Function>() { + observablePlain = source.flatMap(new Function>() { @Override - public Observable apply(Integer v) { + public Observable apply(Integer v) { return Observable.just(v); } }); - observableConvert = source.flatMap(new Function>() { + observableConvert = source.flatMap(new Function>() { @Override - public Observable apply(Integer v) { + public Observable apply(Integer v) { return Maybe.just(v).toObservable(); } }); - observableDedicated = source.flatMapMaybe(new Function>() { + observableDedicated = source.flatMapMaybe(new Function>() { @Override - public Maybe apply(Integer v) { + public Maybe apply(Integer v) { return Maybe.just(v); } }); diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableFlatMapSinglePerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableFlatMapSinglePerf.java index add0cd310c..880da95f5a 100644 --- a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableFlatMapSinglePerf.java +++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableFlatMapSinglePerf.java @@ -45,23 +45,23 @@ public void setup() { Observable source = Observable.fromArray(sourceArray); - observablePlain = source.flatMap(new Function>() { + observablePlain = source.flatMap(new Function>() { @Override - public Observable apply(Integer v) { + public Observable apply(Integer v) { return Observable.just(v); } }); - observableConvert = source.flatMap(new Function>() { + observableConvert = source.flatMap(new Function>() { @Override - public Observable apply(Integer v) { + public Observable apply(Integer v) { return Single.just(v).toObservable(); } }); - observableDedicated = source.flatMapSingle(new Function>() { + observableDedicated = source.flatMapSingle(new Function>() { @Override - public Single apply(Integer v) { + public Single apply(Integer v) { return Single.just(v); } }); diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableSwitchMapCompletablePerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableSwitchMapCompletablePerf.java index 69b8e71f18..41964c3dbd 100644 --- a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableSwitchMapCompletablePerf.java +++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableSwitchMapCompletablePerf.java @@ -45,16 +45,16 @@ public void setup() { Observable source = Observable.fromArray(sourceArray); - observablePlain = source.switchMap(new Function>() { + observablePlain = source.switchMap(new Function>() { @Override - public Observable apply(Integer v) { + public Observable apply(Integer v) { return Observable.empty(); } }); - observableConvert = source.switchMap(new Function>() { + observableConvert = source.switchMap(new Function>() { @Override - public Observable apply(Integer v) { + public Observable apply(Integer v) { return Completable.complete().toObservable(); } }); diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableSwitchMapMaybeEmptyPerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableSwitchMapMaybeEmptyPerf.java index 3930534eb8..6a4ea5c73b 100644 --- a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableSwitchMapMaybeEmptyPerf.java +++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableSwitchMapMaybeEmptyPerf.java @@ -45,23 +45,23 @@ public void setup() { Observable source = Observable.fromArray(sourceArray); - observablePlain = source.switchMap(new Function>() { + observablePlain = source.switchMap(new Function>() { @Override - public Observable apply(Integer v) { + public Observable apply(Integer v) { return Observable.empty(); } }); - observableConvert = source.switchMap(new Function>() { + observableConvert = source.switchMap(new Function>() { @Override - public Observable apply(Integer v) { + public Observable apply(Integer v) { return Maybe.empty().toObservable(); } }); - observableDedicated = source.switchMapMaybe(new Function>() { + observableDedicated = source.switchMapMaybe(new Function>() { @Override - public Maybe apply(Integer v) { + public Maybe apply(Integer v) { return Maybe.empty(); } }); diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableSwitchMapMaybePerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableSwitchMapMaybePerf.java index 30158d012d..f0c3285890 100644 --- a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableSwitchMapMaybePerf.java +++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableSwitchMapMaybePerf.java @@ -45,23 +45,23 @@ public void setup() { Observable source = Observable.fromArray(sourceArray); - observablePlain = source.switchMap(new Function>() { + observablePlain = source.switchMap(new Function>() { @Override - public Observable apply(Integer v) { + public Observable apply(Integer v) { return Observable.just(v); } }); - observableConvert = source.switchMap(new Function>() { + observableConvert = source.switchMap(new Function>() { @Override - public Observable apply(Integer v) { + public Observable apply(Integer v) { return Maybe.just(v).toObservable(); } }); - observableDedicated = source.switchMapMaybe(new Function>() { + observableDedicated = source.switchMapMaybe(new Function>() { @Override - public Maybe apply(Integer v) { + public Maybe apply(Integer v) { return Maybe.just(v); } }); diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableSwitchMapSinglePerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableSwitchMapSinglePerf.java index 75aeb504f9..087f32c8e3 100644 --- a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableSwitchMapSinglePerf.java +++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableSwitchMapSinglePerf.java @@ -45,23 +45,23 @@ public void setup() { Observable source = Observable.fromArray(sourceArray); - observablePlain = source.switchMap(new Function>() { + observablePlain = source.switchMap(new Function>() { @Override - public Observable apply(Integer v) { + public Observable apply(Integer v) { return Observable.just(v); } }); - observableConvert = source.switchMap(new Function>() { + observableConvert = source.switchMap(new Function>() { @Override - public Observable apply(Integer v) { + public Observable apply(Integer v) { return Single.just(v).toObservable(); } }); - observableDedicated = source.switchMapSingle(new Function>() { + observableDedicated = source.switchMapSingle(new Function>() { @Override - public Single apply(Integer v) { + public Single apply(Integer v) { return Single.just(v); } }); diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableDelay.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableDelay.java index 469d0dd48b..a7de73213a 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableDelay.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableDelay.java @@ -111,7 +111,9 @@ final class OnNext implements Runnable { @Override public void run() { - downstream.onNext(t); + if (!w.isDisposed()) { + downstream.onNext(t); + } } } diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableCache.java b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableCache.java index daa9edd533..99e11259a6 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableCache.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableCache.java @@ -24,23 +24,7 @@ * * @param the source element type */ -public final class ObservableCache extends AbstractObservableWithUpstream -implements Observer { - - /** - * The subscription to the source should happen at most once. - */ - final AtomicBoolean once; - - /** - * The number of items per cached nodes. - */ - final int capacityHint; - - /** - * The current known array of observer state to notify. - */ - final AtomicReference[]> observers; +public final class ObservableCache extends AbstractObservableWithUpstream { /** * A shared instance of an empty array of observers to avoid creating @@ -56,61 +40,49 @@ public final class ObservableCache extends AbstractObservableWithUpstream head; - - /** - * The current tail of the linked structure holding the items. - */ - Node tail; - - /** - * How many items have been put into the tail node so far. + * The subscription to the source should happen at most once. */ - int tailOffset; + final AtomicBoolean once; /** - * If {@link #observers} is {@link #TERMINATED}, this holds the terminal error if not null. + * Responsible caching events from the source and multicasting them to each downstream. */ - Throwable error; + final Multicaster multicaster; /** - * True if the source has terminated. + * The first node in a singly linked list. Each node has the capacity to hold a specific number of events, and each + * points exclusively to the next node (if present). When a new downstream arrives, the subscription is + * initialized with a reference to the "head" node, and any events present in the linked list are replayed. As + * events are replayed to the new downstream, its 'node' reference advances through the linked list, discarding each + * node reference once all events in that node have been replayed. Consequently, once {@code this} instance goes out + * of scope, the prefix of nodes up to the first node that is still being replayed becomes unreachable and eligible + * for collection. */ - volatile boolean done; + final Node head; /** * Constructs an empty, non-connected cache. * @param source the source to subscribe to for the first incoming observer * @param capacityHint the number of items expected (reduce allocation frequency) */ - @SuppressWarnings("unchecked") public ObservableCache(Observable source, int capacityHint) { super(source); - this.capacityHint = capacityHint; this.once = new AtomicBoolean(); Node n = new Node<>(capacityHint); this.head = n; - this.tail = n; - this.observers = new AtomicReference<>(EMPTY); + this.multicaster = new Multicaster<>(capacityHint, n); } @Override protected void subscribeActual(Observer t) { - CacheDisposable consumer = new CacheDisposable<>(t, this); + CacheDisposable consumer = new CacheDisposable<>(t, multicaster, head); t.onSubscribe(consumer); - add(consumer); + multicaster.add(consumer); if (!once.get() && once.compareAndSet(false, true)) { - source.subscribe(this); + source.subscribe(multicaster); } else { - replay(consumer); + multicaster.replay(consumer); } } @@ -127,7 +99,7 @@ protected void subscribeActual(Observer t) { * @return true if the cache has observers */ /* public */ boolean hasObservers() { - return observers.get().length != 0; + return multicaster.get().length != 0; } /** @@ -135,194 +107,241 @@ protected void subscribeActual(Observer t) { * @return the number of currently cached event count */ /* public */ long cachedEventCount() { - return size; + return multicaster.size; } - /** - * Atomically adds the consumer to the {@link #observers} copy-on-write array - * if the source has not yet terminated. - * @param consumer the consumer to add - */ - void add(CacheDisposable consumer) { - for (;;) { - CacheDisposable[] current = observers.get(); - if (current == TERMINATED) { - return; - } - int n = current.length; + static final class Multicaster extends AtomicReference[]> implements Observer { - @SuppressWarnings("unchecked") - CacheDisposable[] next = new CacheDisposable[n + 1]; - System.arraycopy(current, 0, next, 0, n); - next[n] = consumer; + /** */ + private static final long serialVersionUID = 8514643269016498691L; - if (observers.compareAndSet(current, next)) { - return; - } - } - } + /** + * The number of items per cached nodes. + */ + final int capacityHint; - /** - * Atomically removes the consumer from the {@link #observers} copy-on-write array. - * @param consumer the consumer to remove - */ - @SuppressWarnings("unchecked") - void remove(CacheDisposable consumer) { - for (;;) { - CacheDisposable[] current = observers.get(); - int n = current.length; - if (n == 0) { - return; - } + /** + * The total number of elements in the list available for reads. + */ + volatile long size; - int j = -1; - for (int i = 0; i < n; i++) { - if (current[i] == consumer) { - j = i; - break; - } - } + /** + * The current tail of the linked structure holding the items. + */ + Node tail; - if (j < 0) { - return; - } - CacheDisposable[] next; + /** + * How many items have been put into the tail node so far. + */ + int tailOffset; - if (n == 1) { - next = EMPTY; - } else { - next = new CacheDisposable[n - 1]; - System.arraycopy(current, 0, next, 0, j); - System.arraycopy(current, j + 1, next, j, n - j - 1); - } + /** + * If the observers are {@link #TERMINATED}, this holds the terminal error if not null. + */ + Throwable error; - if (observers.compareAndSet(current, next)) { - return; - } - } - } + /** + * True if the source has terminated. + */ + volatile boolean done; - /** - * Replays the contents of this cache to the given consumer based on its - * current state and number of items requested by it. - * @param consumer the consumer to continue replaying items to - */ - void replay(CacheDisposable consumer) { - // make sure there is only one replay going on at a time - if (consumer.getAndIncrement() != 0) { - return; + @SuppressWarnings("unchecked") + Multicaster(int capacityHint, final Node head) { + super(EMPTY); + this.tail = head; + this.capacityHint = capacityHint; } - // see if there were more replay request in the meantime - int missed = 1; - // read out state into locals upfront to avoid being re-read due to volatile reads - long index = consumer.index; - int offset = consumer.offset; - Node node = consumer.node; - Observer downstream = consumer.downstream; - int capacity = capacityHint; - - for (;;) { - // if the consumer got disposed, clear the node and quit - if (consumer.disposed) { - consumer.node = null; - return; + /** + * Atomically adds the consumer to the observers copy-on-write array + * if the source has not yet terminated. + * @param consumer the consumer to add + */ + void add(CacheDisposable consumer) { + for (;;) { + CacheDisposable[] current = get(); + if (current == TERMINATED) { + return; + } + int n = current.length; + + @SuppressWarnings("unchecked") + CacheDisposable[] next = new CacheDisposable[n + 1]; + System.arraycopy(current, 0, next, 0, n); + next[n] = consumer; + + if (compareAndSet(current, next)) { + return; + } } + } - // first see if the source has terminated, read order matters! - boolean sourceDone = done; - // and if the number of items is the same as this consumer has received - boolean empty = size == index; - - // if the source is done and we have all items so far, terminate the consumer - if (sourceDone && empty) { - // release the node object to avoid leaks through retained consumers - consumer.node = null; - // if error is not null then the source failed - Throwable ex = error; - if (ex != null) { - downstream.onError(ex); + /** + * Atomically removes the consumer from the observers copy-on-write array. + * @param consumer the consumer to remove + */ + @SuppressWarnings("unchecked") + void remove(CacheDisposable consumer) { + for (;;) { + CacheDisposable[] current = get(); + int n = current.length; + if (n == 0) { + return; + } + + int j = -1; + for (int i = 0; i < n; i++) { + if (current[i] == consumer) { + j = i; + break; + } + } + + if (j < 0) { + return; + } + CacheDisposable[] next; + + if (n == 1) { + next = EMPTY; } else { - downstream.onComplete(); + next = new CacheDisposable[n - 1]; + System.arraycopy(current, 0, next, 0, j); + System.arraycopy(current, j + 1, next, j, n - j - 1); } + + if (compareAndSet(current, next)) { + return; + } + } + } + + /** + * Replays the contents of this cache to the given consumer based on its + * current state and number of items requested by it. + * @param consumer the consumer to continue replaying items to + */ + void replay(CacheDisposable consumer) { + // make sure there is only one replay going on at a time + if (consumer.getAndIncrement() != 0) { return; } - // there are still items not sent to the consumer - if (!empty) { - // if the offset in the current node has reached the node capacity - if (offset == capacity) { - // switch to the subsequent node - node = node.next; - // reset the in-node offset - offset = 0; + // see if there were more replay request in the meantime + int missed = 1; + // read out state into locals upfront to avoid being re-read due to volatile reads + long index = consumer.index; + int offset = consumer.offset; + Node node = consumer.node; + Observer downstream = consumer.downstream; + int capacity = capacityHint; + + for (;;) { + // if the consumer got disposed, clear the node and quit + if (consumer.disposed) { + consumer.node = null; + return; } - // emit the cached item - downstream.onNext(node.values[offset]); - - // move the node offset forward - offset++; - // move the total consumed item count forward - index++; + // first see if the source has terminated, read order matters! + boolean sourceDone = done; + // and if the number of items is the same as this consumer has received + boolean empty = size == index; + + // if the source is done and we have all items so far, terminate the consumer + if (sourceDone && empty) { + // release the node object to avoid leaks through retained consumers + consumer.node = null; + // if error is not null then the source failed + Throwable ex = error; + if (ex != null) { + downstream.onError(ex); + } else { + downstream.onComplete(); + } + return; + } - // retry for the next item/terminal event if any - continue; - } + // there are still items not sent to the consumer + if (!empty) { + // if the offset in the current node has reached the node capacity + if (offset == capacity) { + // switch to the subsequent node + node = node.next; + // reset the in-node offset + offset = 0; + } + + // emit the cached item + downstream.onNext(node.values[offset]); + + // move the node offset forward + offset++; + // move the total consumed item count forward + index++; + + // retry for the next item/terminal event if any + continue; + } - // commit the changed references back - consumer.index = index; - consumer.offset = offset; - consumer.node = node; - // release the changes and see if there were more replay request in the meantime - missed = consumer.addAndGet(-missed); - if (missed == 0) { - break; + // commit the changed references back + consumer.index = index; + consumer.offset = offset; + consumer.node = node; + // release the changes and see if there were more replay request in the meantime + missed = consumer.addAndGet(-missed); + if (missed == 0) { + break; + } } } - } - @Override - public void onSubscribe(Disposable d) { - // we can't do much with the upstream disposable - } - - @Override - public void onNext(T t) { - int tailOffset = this.tailOffset; - // if the current tail node is full, create a fresh node - if (tailOffset == capacityHint) { - Node n = new Node<>(tailOffset); - n.values[0] = t; - this.tailOffset = 1; - tail.next = n; - tail = n; - } else { - tail.values[tailOffset] = t; - this.tailOffset = tailOffset + 1; + @Override + public void onSubscribe(Disposable d) { + // we can't do much with the upstream disposable } - size++; - for (CacheDisposable consumer : observers.get()) { - replay(consumer); + + @Override + public void onNext(T t) { + int tailOffset = this.tailOffset; + // if the current tail node is full, create a fresh node + if (tailOffset == capacityHint) { + Node n = new Node<>(tailOffset); + n.values[0] = t; + this.tailOffset = 1; + tail.next = n; + tail = n; + } else { + tail.values[tailOffset] = t; + this.tailOffset = tailOffset + 1; + } + size++; + for (CacheDisposable consumer : get()) { + replay(consumer); + } } - } - @SuppressWarnings("unchecked") - @Override - public void onError(Throwable t) { - error = t; - done = true; - for (CacheDisposable consumer : observers.getAndSet(TERMINATED)) { - replay(consumer); + @SuppressWarnings("unchecked") + @Override + public void onError(Throwable t) { + error = t; + done = true; + // No additional events will arrive, so now we can clear the 'tail' reference + tail = null; + for (CacheDisposable consumer : getAndSet(TERMINATED)) { + replay(consumer); + } } - } - @SuppressWarnings("unchecked") - @Override - public void onComplete() { - done = true; - for (CacheDisposable consumer : observers.getAndSet(TERMINATED)) { - replay(consumer); + @SuppressWarnings("unchecked") + @Override + public void onComplete() { + done = true; + // No additional events will arrive, so now we can clear the 'tail' reference + tail = null; + for (CacheDisposable consumer : getAndSet(TERMINATED)) { + replay(consumer); + } } } @@ -338,7 +357,7 @@ static final class CacheDisposable extends AtomicInteger final Observer downstream; - final ObservableCache parent; + final Multicaster parent; Node node; @@ -353,11 +372,12 @@ static final class CacheDisposable extends AtomicInteger * the parent cache object. * @param downstream the actual consumer * @param parent the parent that holds onto the cached items + * @param head the first node in the linked list */ - CacheDisposable(Observer downstream, ObservableCache parent) { + CacheDisposable(Observer downstream, Multicaster parent, Node head) { this.downstream = downstream; this.parent = parent; - this.node = parent.head; + this.node = head; } @Override diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableDelay.java b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableDelay.java index 1801cce1f2..7c01c23f90 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableDelay.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableDelay.java @@ -111,7 +111,9 @@ final class OnNext implements Runnable { @Override public void run() { - downstream.onNext(t); + if (!w.isDisposed()) { + downstream.onNext(t); + } } } diff --git a/src/main/java/io/reactivex/rxjava3/subjects/ReplaySubject.java b/src/main/java/io/reactivex/rxjava3/subjects/ReplaySubject.java index 4d5eb3335f..e103594ba5 100644 --- a/src/main/java/io/reactivex/rxjava3/subjects/ReplaySubject.java +++ b/src/main/java/io/reactivex/rxjava3/subjects/ReplaySubject.java @@ -652,8 +652,6 @@ static final class UnboundedReplayBuffer final List buffer; - volatile boolean done; - volatile int size; UnboundedReplayBuffer(int capacityHint) { @@ -671,7 +669,6 @@ public void addFinal(Object notificationLite) { buffer.add(notificationLite); trimHead(); size++; - done = true; } @Override @@ -772,20 +769,17 @@ public void replay(ReplayDisposable rs) { Object o = b.get(index); - if (done) { - if (index + 1 == s) { - s = size; - if (index + 1 == s) { - if (NotificationLite.isComplete(o)) { - a.onComplete(); - } else { - a.onError(NotificationLite.getError(o)); - } - rs.index = null; - rs.cancelled = true; - return; - } - } + if (NotificationLite.isComplete(o)) { + a.onComplete(); + rs.index = null; + rs.cancelled = true; + return; + } else + if (NotificationLite.isError(o)) { + a.onError(NotificationLite.getError(o)); + rs.index = null; + rs.cancelled = true; + return; } a.onNext((T)o); @@ -856,8 +850,6 @@ static final class SizeBoundReplayBuffer Node tail; - volatile boolean done; - SizeBoundReplayBuffer(int maxSize) { this.maxSize = maxSize; Node h = new Node<>(null); @@ -895,7 +887,6 @@ public void addFinal(Object notificationLite) { t.lazySet(n); // releases both the tail and size trimHead(); - done = true; } /** @@ -1000,18 +991,17 @@ public void replay(ReplayDisposable rs) { Object o = n.value; - if (done) { - if (n.get() == null) { - - if (NotificationLite.isComplete(o)) { - a.onComplete(); - } else { - a.onError(NotificationLite.getError(o)); - } - rs.index = null; - rs.cancelled = true; - return; - } + if (NotificationLite.isComplete(o)) { + a.onComplete(); + rs.index = null; + rs.cancelled = true; + return; + } else + if (NotificationLite.isError(o)) { + a.onError(NotificationLite.getError(o)); + rs.index = null; + rs.cancelled = true; + return; } a.onNext((T)o); @@ -1069,8 +1059,6 @@ static final class SizeAndTimeBoundReplayBuffer TimedNode tail; - volatile boolean done; - SizeAndTimeBoundReplayBuffer(int maxSize, long maxAge, TimeUnit unit, Scheduler scheduler) { this.maxSize = maxSize; this.maxAge = maxAge; @@ -1163,8 +1151,6 @@ public void addFinal(Object notificationLite) { size++; t.lazySet(n); // releases both the tail and size trimFinal(); - - done = true; } /** @@ -1290,18 +1276,17 @@ public void replay(ReplayDisposable rs) { Object o = n.value; - if (done) { - if (n.get() == null) { - - if (NotificationLite.isComplete(o)) { - a.onComplete(); - } else { - a.onError(NotificationLite.getError(o)); - } - rs.index = null; - rs.cancelled = true; - return; - } + if (NotificationLite.isComplete(o)) { + a.onComplete(); + rs.index = null; + rs.cancelled = true; + return; + } else + if (NotificationLite.isError(o)) { + a.onError(NotificationLite.getError(o)); + rs.index = null; + rs.cancelled = true; + return; } a.onNext((T)o); diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableAmbTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableAmbTest.java index ba86c69719..1a1ceca926 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableAmbTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableAmbTest.java @@ -402,7 +402,8 @@ public void disposed() { @Test public void manySources() { - Flowable[] a = new Flowable[32]; + @SuppressWarnings("unchecked") + Flowable[] a = new Flowable[32]; Arrays.fill(a, Flowable.never()); a[31] = Flowable.just(1); diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableDelayTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableDelayTest.java index 3160f61173..4d67594a17 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableDelayTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableDelayTest.java @@ -20,6 +20,7 @@ import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; +import java.util.concurrent.locks.LockSupport; import org.junit.*; import org.mockito.InOrder; @@ -28,6 +29,7 @@ import io.reactivex.rxjava3.core.*; import io.reactivex.rxjava3.exceptions.TestException; import io.reactivex.rxjava3.functions.*; +import io.reactivex.rxjava3.internal.disposables.SequentialDisposable; import io.reactivex.rxjava3.internal.functions.Functions; import io.reactivex.rxjava3.processors.PublishProcessor; import io.reactivex.rxjava3.schedulers.*; @@ -1030,4 +1032,38 @@ public Publisher apply(Integer t) throws Exception { .to(TestHelper.testConsumer()) .assertFailureAndMessage(NullPointerException.class, "The itemDelay returned a null Publisher"); } + + @Test + public void cancelShouldPreventRandomSubsequentEmissions() { + for (int attempt = 1; attempt < 100; attempt ++) { + + SequentialDisposable disposable = new SequentialDisposable(); + ConcurrentLinkedQueue sink = new ConcurrentLinkedQueue<>(); + + disposable.replace( + Flowable.range(1, 10) + .delay(1, TimeUnit.MICROSECONDS, Schedulers.computation(), true) + .doOnNext(v -> { + if (v == 1) { + Schedulers.computation().scheduleDirect(disposable::dispose); + } + sink.offer(v); + }) + .subscribe()); + + LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(1)); + + Integer last = null; + + while (!sink.isEmpty()) { + Integer current = sink.poll(); + + if (last != null && last + 1 != current) { + fail("Emission hole: " + last + " -> " + current); + } + + last = current; + } + } + } } diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableZipTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableZipTest.java index 47bbd491bd..8d3e10dbd2 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableZipTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableZipTest.java @@ -1870,7 +1870,7 @@ public Integer apply(Integer a, Integer b) throws Exception { public void firstErrorPreventsSecondSubscription() { final AtomicInteger counter = new AtomicInteger(); - List> flowableList = new ArrayList<>(); + List> flowableList = new ArrayList<>(); flowableList.add(Flowable.create(new FlowableOnSubscribe() { @Override public void subscribe(FlowableEmitter e) diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableAmbTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableAmbTest.java index 0a349cd417..5ec9129d9f 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableAmbTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableAmbTest.java @@ -235,7 +235,8 @@ public void ambArraySingleElement() { @Test public void manySources() { - Observable[] a = new Observable[32]; + @SuppressWarnings("unchecked") + Observable[] a = new Observable[32]; Arrays.fill(a, Observable.never()); a[31] = Observable.just(1); diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableBufferTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableBufferTest.java index 8264a66494..7085720f01 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableBufferTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableBufferTest.java @@ -1278,9 +1278,9 @@ public Integer apply(Integer integer, Long aLong) { } }) .buffer(Observable.interval(0, 200, TimeUnit.MILLISECONDS), - new Function>() { + new Function>() { @Override - public Observable apply(Long a) { + public Observable apply(Long a) { return Observable.just(a).delay(100, TimeUnit.MILLISECONDS); } }) @@ -1301,9 +1301,9 @@ public Integer apply(Integer integer, Long aLong) { } }) .buffer(Observable.interval(0, 100, TimeUnit.MILLISECONDS), - new Function>() { + new Function>() { @Override - public Observable apply(Long a) { + public Observable apply(Long a) { return Observable.just(a).delay(200, TimeUnit.MILLISECONDS); } }) diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableCacheTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableCacheTest.java index 7f47ec95d8..74d17c062b 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableCacheTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableCacheTest.java @@ -16,10 +16,15 @@ import static org.junit.Assert.*; import static org.mockito.Mockito.*; +import java.lang.management.ManagementFactory; +import java.lang.management.MemoryMXBean; +import java.lang.management.MemoryUsage; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import io.reactivex.rxjava3.observables.ConnectableObservable; import org.junit.Test; import io.reactivex.rxjava3.core.*; @@ -355,4 +360,52 @@ public void addRemoveRace() { ); } } + + @Test + public void valuesAreReclaimable() throws Exception { + ConnectableObservable source = + Observable.range(0, 200) + .map($ -> new byte[1024 * 1024]) + .publish(); + + System.out.println("Bounded Replay Leak check: Wait before GC"); + Thread.sleep(1000); + + System.out.println("Bounded Replay Leak check: GC"); + System.gc(); + + Thread.sleep(500); + + final MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean(); + MemoryUsage memHeap = memoryMXBean.getHeapMemoryUsage(); + long initial = memHeap.getUsed(); + + System.out.printf("Bounded Replay Leak check: Starting: %.3f MB%n", initial / 1024.0 / 1024.0); + + final AtomicLong after = new AtomicLong(); + + source.cache().lastElement().subscribe(new Consumer() { + @Override + public void accept(byte[] v) throws Exception { + System.out.println("Bounded Replay Leak check: Wait before GC 2"); + Thread.sleep(1000); + + System.out.println("Bounded Replay Leak check: GC 2"); + System.gc(); + + Thread.sleep(500); + + after.set(memoryMXBean.getHeapMemoryUsage().getUsed()); + } + }); + + source.connect(); + + System.out.printf("Bounded Replay Leak check: After: %.3f MB%n", after.get() / 1024.0 / 1024.0); + + if (initial + 100 * 1024 * 1024 < after.get()) { + fail("Bounded Replay Leak check: Memory leak detected: " + (initial / 1024.0 / 1024.0) + + " -> " + after.get() / 1024.0 / 1024.0); + } + } } diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableDelayTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableDelayTest.java index ea8d51d996..778d2d4e64 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableDelayTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableDelayTest.java @@ -20,6 +20,7 @@ import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.LockSupport; import org.junit.*; import org.mockito.InOrder; @@ -29,6 +30,7 @@ import io.reactivex.rxjava3.core.Observer; import io.reactivex.rxjava3.exceptions.TestException; import io.reactivex.rxjava3.functions.*; +import io.reactivex.rxjava3.internal.disposables.SequentialDisposable; import io.reactivex.rxjava3.internal.functions.Functions; import io.reactivex.rxjava3.observers.*; import io.reactivex.rxjava3.schedulers.*; @@ -978,4 +980,37 @@ public Observable apply(Integer t) throws Exception { .to(TestHelper.testConsumer()) .assertFailureAndMessage(NullPointerException.class, "The itemDelay returned a null ObservableSource"); } -} + + @Test + public void cancelShouldPreventRandomSubsequentEmissions() { + for (int attempt = 1; attempt < 100; attempt ++) { + + SequentialDisposable disposable = new SequentialDisposable(); + ConcurrentLinkedQueue sink = new ConcurrentLinkedQueue<>(); + + disposable.replace( + Observable.range(1, 10) + .delay(1, TimeUnit.MICROSECONDS, Schedulers.computation(), true) + .doOnNext(v -> { + if (v == 1) { + Schedulers.computation().scheduleDirect(disposable::dispose); + } + sink.offer(v); + }) + .subscribe()); + + LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(1)); + + Integer last = null; + + while (!sink.isEmpty()) { + Integer current = sink.poll(); + + if (last != null && last + 1 != current) { + fail("Emission hole: " + last + " -> " + current); + } + + last = current; + } + } + }} diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableZipTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableZipTest.java index ae7de66d6f..faebee3fa5 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableZipTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableZipTest.java @@ -1403,7 +1403,7 @@ public Integer apply(Integer t1, Integer t2) throws Exception { public void firstErrorPreventsSecondSubscription() { final AtomicInteger counter = new AtomicInteger(); - List> observableList = new ArrayList<>(); + List> observableList = new ArrayList<>(); observableList.add(Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(ObservableEmitter e) diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleAmbTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleAmbTest.java index 083ee46238..8086aa7c1c 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleAmbTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleAmbTest.java @@ -249,7 +249,8 @@ public void run() { @Test public void manySources() { - Single[] sources = new Single[32]; + @SuppressWarnings("unchecked") + Single[] sources = new Single[32]; Arrays.fill(sources, Single.never()); sources[31] = Single.just(31); diff --git a/src/test/java/io/reactivex/rxjava3/processors/ReplayProcessorTest.java b/src/test/java/io/reactivex/rxjava3/processors/ReplayProcessorTest.java index aa7026a9ff..35a877911b 100644 --- a/src/test/java/io/reactivex/rxjava3/processors/ReplayProcessorTest.java +++ b/src/test/java/io/reactivex/rxjava3/processors/ReplayProcessorTest.java @@ -1832,4 +1832,69 @@ public void timeAndSizeRemoveCorrectNumberOfOld() { rp.test().assertValuesOnly(4, 5); } -} + + @Test + public void terminationSubscriptionRaceUnbounded() throws Throwable { + for (int i = 1; i <= 10000; i++) { + ReplayProcessor source = ReplayProcessor.create(); + PublishProcessor sink = PublishProcessor.create(); + TestSubscriber subscriber = sink.test(); + Schedulers.computation().scheduleDirect(() -> { + // issue signals to the source in adherence to the reactive streams specification + source.onSubscribe(new BooleanSubscription()); + source.onNext("hello"); + source.onNext("world"); + source.onComplete(); + }); + Schedulers.computation().scheduleDirect(() -> { + // connect the source to the sink in parallel with the signals issued to the source + // note the cast() operator, which is here to detect non-String escapees + source.cast(String.class).subscribe(sink); + }); + subscriber.await().assertValues("hello", "world").assertComplete(); + } + } + + @Test + public void terminationSubscriptionRaceSizeBound() throws Throwable { + for (int i = 1; i <= 10000; i++) { + ReplayProcessor source = ReplayProcessor.createWithSize(20); + PublishProcessor sink = PublishProcessor.create(); + TestSubscriber subscriber = sink.test(); + Schedulers.computation().scheduleDirect(() -> { + // issue signals to the source in adherence to the reactive streams specification + source.onSubscribe(new BooleanSubscription()); + source.onNext("hello"); + source.onNext("world"); + source.onComplete(); + }); + Schedulers.computation().scheduleDirect(() -> { + // connect the source to the sink in parallel with the signals issued to the source + // note the cast() operator, which is here to detect non-String escapees + source.cast(String.class).subscribe(sink); + }); + subscriber.await().assertValues("hello", "world").assertComplete(); + } + } + + @Test + public void terminationSubscriptionRaceTimeBound() throws Throwable { + for (int i = 1; i <= 10000; i++) { + ReplayProcessor source = ReplayProcessor.createWithTime(20, TimeUnit.MINUTES, Schedulers.computation()); + PublishProcessor sink = PublishProcessor.create(); + TestSubscriber subscriber = sink.test(); + Schedulers.computation().scheduleDirect(() -> { + // issue signals to the source in adherence to the reactive streams specification + source.onSubscribe(new BooleanSubscription()); + source.onNext("hello"); + source.onNext("world"); + source.onComplete(); + }); + Schedulers.computation().scheduleDirect(() -> { + // connect the source to the sink in parallel with the signals issued to the source + // note the cast() operator, which is here to detect non-String escapees + source.cast(String.class).subscribe(sink); + }); + subscriber.await().assertValues("hello", "world").assertComplete(); + } + }} diff --git a/src/test/java/io/reactivex/rxjava3/subjects/ReplaySubjectTest.java b/src/test/java/io/reactivex/rxjava3/subjects/ReplaySubjectTest.java index e752278b2f..8417b53081 100644 --- a/src/test/java/io/reactivex/rxjava3/subjects/ReplaySubjectTest.java +++ b/src/test/java/io/reactivex/rxjava3/subjects/ReplaySubjectTest.java @@ -1378,4 +1378,70 @@ public void timeAndSizeRemoveCorrectNumberOfOld() { rs.test().assertValuesOnly(4, 5); } + + @Test + public void terminationSubscriptionRaceUnbounded() throws Throwable { + for (int i = 1; i <= 10000; i++) { + Subject source = ReplaySubject.create(); + Subject sink = PublishSubject.create(); + TestObserver observer = sink.test(); + Schedulers.computation().scheduleDirect(() -> { + // issue signals to the source in adherence to the reactive streams specification + source.onSubscribe(Disposable.empty()); + source.onNext("hello"); + source.onNext("world"); + source.onComplete(); + }); + Schedulers.computation().scheduleDirect(() -> { + // connect the source to the sink in parallel with the signals issued to the source + // note the cast() operator, which is here to detect non-String escapees + source.cast(String.class).subscribe(sink); + }); + observer.await().assertValues("hello", "world").assertComplete(); + } + } + + @Test + public void terminationSubscriptionRaceSizeBound() throws Throwable { + for (int i = 1; i <= 10000; i++) { + Subject source = ReplaySubject.createWithSize(20); + Subject sink = PublishSubject.create(); + TestObserver observer = sink.test(); + Schedulers.computation().scheduleDirect(() -> { + // issue signals to the source in adherence to the reactive streams specification + source.onSubscribe(Disposable.empty()); + source.onNext("hello"); + source.onNext("world"); + source.onComplete(); + }); + Schedulers.computation().scheduleDirect(() -> { + // connect the source to the sink in parallel with the signals issued to the source + // note the cast() operator, which is here to detect non-String escapees + source.cast(String.class).subscribe(sink); + }); + observer.await().assertValues("hello", "world").assertComplete(); + } + } + + @Test + public void terminationSubscriptionRaceTimeBound() throws Throwable { + for (int i = 1; i <= 10000; i++) { + Subject source = ReplaySubject.createWithTime(20, TimeUnit.MINUTES, Schedulers.computation()); + Subject sink = PublishSubject.create(); + TestObserver observer = sink.test(); + Schedulers.computation().scheduleDirect(() -> { + // issue signals to the source in adherence to the reactive streams specification + source.onSubscribe(Disposable.empty()); + source.onNext("hello"); + source.onNext("world"); + source.onComplete(); + }); + Schedulers.computation().scheduleDirect(() -> { + // connect the source to the sink in parallel with the signals issued to the source + // note the cast() operator, which is here to detect non-String escapees + source.cast(String.class).subscribe(sink); + }); + observer.await().assertValues("hello", "world").assertComplete(); + } + } }