diff --git a/.github/workflows/gradle-wrapper-validation.yml b/.github/workflows/gradle-wrapper-validation.yml index 5e850d7975..44e5c1a692 100644 --- a/.github/workflows/gradle-wrapper-validation.yml +++ b/.github/workflows/gradle-wrapper-validation.yml @@ -9,5 +9,5 @@ jobs: name: "Validation" runs-on: ubuntu-latest steps: - - uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 # v4.1.7 + - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2 - uses: gradle/wrapper-validation-action@f9c9c575b8b21b6485636a91ffecd10e558c62f6 # v3.5.0 diff --git a/.github/workflows/gradle_branch.yml b/.github/workflows/gradle_branch.yml index e34ef65e00..f791da3db8 100644 --- a/.github/workflows/gradle_branch.yml +++ b/.github/workflows/gradle_branch.yml @@ -15,14 +15,14 @@ jobs: runs-on: ubuntu-latest steps: - - uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 # v4.1.7 - - name: Set up JDK 8 - uses: actions/setup-java@6a0805fcefea3d4657a47ac4c165951e33482018 # v4.2.2 + - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2 + - name: Set up JDK 11 + uses: actions/setup-java@c5195efecf7bdfc987ee8bae7a71cb8b11521c00 # v4.7.1 with: distribution: 'zulu' - java-version: '8' + java-version: '11' - name: Cache Gradle packages - uses: actions/cache@0c45773b623bea8c8e75f6c82b208c3cf94ea4f9 # v4.0.2 + uses: actions/cache@5a3ec84eff668545956fd18022155c47e93e2684 # v4.2.3 with: path: ~/.gradle/caches key: ${{ runner.os }}-gradle-${{ secrets.CACHE_VERSION }}-${{ hashFiles('**/*.gradle') }} @@ -32,6 +32,6 @@ jobs: - name: Build RxJava run: ./gradlew build --stacktrace - name: Upload to Codecov - uses: codecov/codecov-action@e28ff129e5465c2c0dcc6f003fc735cb6ae0c673 # v4.5.0 + uses: codecov/codecov-action@b9fd7d16f6d7d1b5d2bec1a2887e65ceed900238 # v4.6.0 - name: Generate Javadoc run: ./gradlew javadoc --stacktrace diff --git a/.github/workflows/gradle_jdk11.yml b/.github/workflows/gradle_jdk11.yml index 9549026590..829b45be6a 100644 --- a/.github/workflows/gradle_jdk11.yml +++ b/.github/workflows/gradle_jdk11.yml @@ -12,19 +12,22 @@ on: permissions: contents: read +env: + BUILD_WITH_11: true + jobs: build: runs-on: ubuntu-latest steps: - - uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 # v4.1.7 + - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2 - name: Set up JDK 11 - uses: actions/setup-java@6a0805fcefea3d4657a47ac4c165951e33482018 # v4.2.2 + uses: actions/setup-java@c5195efecf7bdfc987ee8bae7a71cb8b11521c00 # v4.7.1 with: distribution: 'zulu' java-version: '11' - name: Cache Gradle packages - uses: actions/cache@0c45773b623bea8c8e75f6c82b208c3cf94ea4f9 # v4.0.2 + uses: actions/cache@5a3ec84eff668545956fd18022155c47e93e2684 # v4.2.3 with: path: ~/.gradle/caches key: ${{ runner.os }}-gradle-1-${{ hashFiles('**/*.gradle') }} @@ -35,5 +38,5 @@ jobs: run: ./gradlew -PjavaCompatibility=9 jar - name: Build RxJava run: ./gradlew build --stacktrace - - name: Generate Javadoc - run: ./gradlew javadoc --stacktrace +# - name: Generate Javadoc +# run: ./gradlew javadoc --stacktrace diff --git a/.github/workflows/gradle_pr.yml b/.github/workflows/gradle_pr.yml index d1c17aa766..b0198af0a1 100644 --- a/.github/workflows/gradle_pr.yml +++ b/.github/workflows/gradle_pr.yml @@ -15,14 +15,14 @@ jobs: runs-on: ubuntu-latest steps: - - uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 # v4.1.7 - - name: Set up JDK 8 - uses: actions/setup-java@6a0805fcefea3d4657a47ac4c165951e33482018 # v4.2.2 + - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2 + - name: Set up JDK 11 + uses: actions/setup-java@c5195efecf7bdfc987ee8bae7a71cb8b11521c00 # v4.7.1 with: distribution: 'zulu' - java-version: '8' + java-version: '11' - name: Cache Gradle packages - uses: actions/cache@0c45773b623bea8c8e75f6c82b208c3cf94ea4f9 # v4.0.2 + uses: actions/cache@5a3ec84eff668545956fd18022155c47e93e2684 # v4.2.3 with: path: ~/.gradle/caches key: ${{ runner.os }}-gradle-1-${{ hashFiles('**/*.gradle') }} @@ -32,6 +32,6 @@ jobs: - name: Build RxJava run: ./gradlew build --stacktrace - name: Upload to Codecov - uses: codecov/codecov-action@e28ff129e5465c2c0dcc6f003fc735cb6ae0c673 # v4.5.0 + uses: codecov/codecov-action@b9fd7d16f6d7d1b5d2bec1a2887e65ceed900238 # v4.6.0 - name: Generate Javadoc run: ./gradlew javadoc --stacktrace diff --git a/.github/workflows/gradle_release.yml b/.github/workflows/gradle_release.yml index 62a36633e1..6b1337d6d1 100644 --- a/.github/workflows/gradle_release.yml +++ b/.github/workflows/gradle_release.yml @@ -22,14 +22,14 @@ jobs: env: CI_BUILD_NUMBER: ${{ github.run_number }} steps: - - uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 # v4.1.7 - - name: Set up JDK 8 - uses: actions/setup-java@6a0805fcefea3d4657a47ac4c165951e33482018 # v4.2.2 + - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2 + - name: Set up JDK 11 + uses: actions/setup-java@c5195efecf7bdfc987ee8bae7a71cb8b11521c00 # v4.7.1 with: distribution: 'zulu' - java-version: '8' + java-version: '11' - name: Cache Gradle packages - uses: actions/cache@0c45773b623bea8c8e75f6c82b208c3cf94ea4f9 # v4.0.2 + uses: actions/cache@5a3ec84eff668545956fd18022155c47e93e2684 # v4.2.3 with: path: ~/.gradle/caches key: ${{ runner.os }}-gradle-${{ secrets.CACHE_VERSION }}-${{ hashFiles('**/*.gradle') }} @@ -43,9 +43,18 @@ jobs: - name: Build RxJava run: ./gradlew build --stacktrace --no-daemon - name: Upload to Codecov - uses: codecov/codecov-action@e28ff129e5465c2c0dcc6f003fc735cb6ae0c673 # v4.5.0 - - name: Upload release - run: ./gradlew -PreleaseMode=full publish --no-daemon --no-parallel --stacktrace + uses: codecov/codecov-action@b9fd7d16f6d7d1b5d2bec1a2887e65ceed900238 # v4.6.0 +# - name: Upload release +# run: ./gradlew -PreleaseMode=full publish --no-daemon --no-parallel --stacktrace +# env: +# # Define secrets at https://github.com/ReactiveX/RxJava/settings/secrets/actions +# # ------------------------------------------------------------------------------ +# ORG_GRADLE_PROJECT_mavenCentralUsername: ${{ secrets.SONATYPE_USER }} +# ORG_GRADLE_PROJECT_mavenCentralPassword: ${{ secrets.SONATYPE_PASSWORD }} +# ORG_GRADLE_PROJECT_SIGNING_PRIVATE_KEY: ${{ secrets.SIGNING_PRIVATE_KEY }} +# ORG_GRADLE_PROJECT_SIGNING_PASSWORD: ${{ secrets.SIGNING_PASSWORD }} + - name: Publish release + run: ./gradlew -PreleaseMode=full publishAndReleaseToMavenCentral --no-configuration-cache --no-daemon --no-parallel --stacktrace env: # Define secrets at https://github.com/ReactiveX/RxJava/settings/secrets/actions # ------------------------------------------------------------------------------ @@ -53,13 +62,6 @@ jobs: ORG_GRADLE_PROJECT_mavenCentralPassword: ${{ secrets.SONATYPE_PASSWORD }} ORG_GRADLE_PROJECT_SIGNING_PRIVATE_KEY: ${{ secrets.SIGNING_PRIVATE_KEY }} ORG_GRADLE_PROJECT_SIGNING_PASSWORD: ${{ secrets.SIGNING_PASSWORD }} - - name: Publish release - run: ./gradlew -PreleaseMode=full closeAndReleaseRepository --no-daemon --no-parallel --stacktrace - env: - # Define secrets at https://github.com/ReactiveX/RxJava/settings/secrets/actions - # ------------------------------------------------------------------------------ - ORG_GRADLE_PROJECT_mavenCentralUsername: ${{ secrets.SONATYPE_USER }} - ORG_GRADLE_PROJECT_mavenCentralPassword: ${{ secrets.SONATYPE_PASSWORD }} - name: Push Javadoc run: ./push_javadoc.sh env: diff --git a/.github/workflows/gradle_snapshot.yml b/.github/workflows/gradle_snapshot.yml index 57a7bea53e..168e213955 100644 --- a/.github/workflows/gradle_snapshot.yml +++ b/.github/workflows/gradle_snapshot.yml @@ -21,14 +21,14 @@ jobs: # ------------------------------------------------------------------------------ CI_BUILD_NUMBER: ${{ github.run_number }} steps: - - uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 # v4.1.7 - - name: Set up JDK 8 - uses: actions/setup-java@6a0805fcefea3d4657a47ac4c165951e33482018 # v4.2.2 + - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2 + - name: Set up JDK 11 + uses: actions/setup-java@c5195efecf7bdfc987ee8bae7a71cb8b11521c00 # v4.7.1 with: distribution: 'zulu' - java-version: '8' + java-version: '11' - name: Cache Gradle packages - uses: actions/cache@0c45773b623bea8c8e75f6c82b208c3cf94ea4f9 # v4.0.2 + uses: actions/cache@5a3ec84eff668545956fd18022155c47e93e2684 # v4.2.3 with: path: ~/.gradle/caches key: ${{ runner.os }}-gradle-${{ secrets.CACHE_VERSION }}-${{ hashFiles('**/*.gradle') }} @@ -40,14 +40,14 @@ jobs: - name: Build RxJava run: ./gradlew build --stacktrace --no-daemon - name: Upload Snapshot - run: ./gradlew -PreleaseMode=branch publish --no-daemon --no-parallel --stacktrace + run: ./gradlew -PreleaseMode=branch publishAllPublicationsToMavenCentralRepository --no-daemon --no-parallel --stacktrace env: # Define secrets at https://github.com/ReactiveX/RxJava/settings/secrets/actions # ------------------------------------------------------------------------------ ORG_GRADLE_PROJECT_mavenCentralUsername: ${{ secrets.SONATYPE_USER }} ORG_GRADLE_PROJECT_mavenCentralPassword: ${{ secrets.SONATYPE_PASSWORD }} - name: Upload to Codecov - uses: codecov/codecov-action@e28ff129e5465c2c0dcc6f003fc735cb6ae0c673 # v4.5.0 + uses: codecov/codecov-action@b9fd7d16f6d7d1b5d2bec1a2887e65ceed900238 # v4.6.0 - name: Push Javadoc run: ./push_javadoc.sh # Define secrets at https://github.com/ReactiveX/RxJava/settings/secrets/actions diff --git a/.github/workflows/scorecard.yml b/.github/workflows/scorecard.yml index 0663af59e7..c6b1e82325 100644 --- a/.github/workflows/scorecard.yml +++ b/.github/workflows/scorecard.yml @@ -24,12 +24,12 @@ jobs: steps: - name: "Checkout code" - uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 # v4.1.7 + uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2 with: persist-credentials: false - name: "Run analysis" - uses: ossf/scorecard-action@62b2cac7ed8198b15735ed49ab1e5cf35480ba46 # v2.4.0 + uses: ossf/scorecard-action@05b42c624433fc40578a4040d5cf5e36ddca8cde # v2.4.2 with: results_file: results.sarif results_format: sarif @@ -46,7 +46,7 @@ jobs: # Upload the results as artifacts (optional). Commenting out will disable uploads of run results in SARIF # format to the repository Actions tab. - name: "Upload artifact" - uses: actions/upload-artifact@834a144ee995460fba8ed112a2fc961b36a5ec5a # v4.3.6 + uses: actions/upload-artifact@ea165f8d65b6e75b540449e92b4886f43607fa02 # v4.6.2 with: name: SARIF file path: results.sarif @@ -54,6 +54,6 @@ jobs: # Upload the results to GitHub's code scanning dashboard. - name: "Upload to code-scanning" - uses: github/codeql-action/upload-sarif@eb055d739abdc2e8de2e5f4ba1a8b246daa779aa # v3.26.0 + uses: github/codeql-action/upload-sarif@fca7ace96b7d713c7035871441bd52efbe39e27e # v3.28.19 with: sarif_file: results.sarif diff --git a/README.md b/README.md index 1456d2b68c..9963ffee46 100644 --- a/README.md +++ b/README.md @@ -510,7 +510,7 @@ For further details, consult the [wiki](https://github.com/ReactiveX/RxJava/wiki - Google Group: [RxJava](http://groups.google.com/d/forum/rxjava) - Twitter: [@RxJava](http://twitter.com/RxJava) - [GitHub Issues](https://github.com/ReactiveX/RxJava/issues) -- StackOverflow: [rx-java](http://stackoverflow.com/questions/tagged/rx-java) and [rx-java2](http://stackoverflow.com/questions/tagged/rx-java2) +- StackOverflow: [rx-java](http://stackoverflow.com/questions/tagged/rx-java), [rx-java2](http://stackoverflow.com/questions/tagged/rx-java2) and [rx-java3](http://stackoverflow.com/questions/tagged/rx-java3) - [Gitter.im](https://gitter.im/ReactiveX/RxJava) ## Versioning @@ -571,11 +571,11 @@ and for Ivy: ### Snapshots -Snapshots after May 1st, 2021 are available via https://oss.sonatype.org/content/repositories/snapshots/io/reactivex/rxjava3/rxjava/ +Snapshots after May 19st, 2025 are available via https://central.sonatype.com/repository/maven-snapshots/io/reactivex/rxjava3/rxjava/ ```groovy repositories { - maven { url 'https://oss.sonatype.org/content/repositories/snapshots' } + maven { url 'https://central.sonatype.com/repository/maven-snapshots' } } dependencies { @@ -583,7 +583,7 @@ dependencies { } ``` -JavaDoc snapshots are available at http://reactivex.io/RxJava/3.x/javadoc/snapshot +JavaDoc snapshots are available at https://reactivex.io/RxJava/3.x/javadoc/snapshot ## Build diff --git a/build.gradle b/build.gradle index c948453034..37dbd5320e 100644 --- a/build.gradle +++ b/build.gradle @@ -4,12 +4,13 @@ plugins { id("eclipse") id("jacoco") id("maven-publish") - id("ru.vyarus.animalsniffer") version "1.7.1" - id("me.champeau.gradle.jmh") version "0.5.3" + id("ru.vyarus.animalsniffer") version "2.0.1" + id("me.champeau.jmh") version "0.7.3" id("com.github.hierynomus.license") version "0.16.1" id("biz.aQute.bnd.builder") version "6.4.0" - id("com.vanniktech.maven.publish") version "0.19.0" - id("org.beryx.jar") version "1.2.0" + id("com.vanniktech.maven.publish") version "0.32.0" + id("org.beryx.jar") version "2.0.0" + id("signing") } ext { @@ -18,7 +19,7 @@ ext { testNgVersion = "7.5" mockitoVersion = "4.11.0" jmhLibVersion = "1.21" - guavaVersion = "33.2.1-jre" + guavaVersion = "33.4.8-jre" } def releaseTag = System.getenv("BUILD_TAG") @@ -49,7 +50,16 @@ dependencies { testImplementation "com.google.guava:guava:$guavaVersion" } +def buildWith11 = System.getenv("BUILD_WITH_11") java { + toolchain { + vendor = JvmVendorSpec.ADOPTIUM + if ("true".equals(buildWith11)) { + languageVersion = JavaLanguageVersion.of(11) + } else { + languageVersion = JavaLanguageVersion.of(8) + } + } sourceCompatibility = JavaVersion.VERSION_1_8 targetCompatibility = JavaVersion.VERSION_1_8 } @@ -86,12 +96,18 @@ animalsniffer { annotation = "io.reactivex.rxjava3.internal.util.SuppressAnimalSniffer" } +moduleConfig { + moduleInfoPath = 'src/main/module/module-info.java' + multiReleaseVersion = 9 +} + jar { from('.') { include 'LICENSE' include 'COPYRIGHT' into('META-INF/') } + exclude("module-info.class") // Cover for bnd still not supporting MR Jars: https://github.com/bndtools/bnd/issues/2227 bnd('-fixupmessages': '^Classes found in the wrong directory: \\\\{META-INF/versions/9/module-info\\\\.class=module-info}$') @@ -106,8 +122,6 @@ jar { "Bundle-SymbolicName": "io.reactivex.rxjava3.rxjava", "Multi-Release": "true" ) - - moduleInfoPath = 'src/main/module/module-info.java' } license { @@ -126,8 +140,8 @@ jmh { jvmArgsAppend = ["-Djmh.separateClasspathJAR=true"] if (project.hasProperty("jmh")) { - include = [".*" + project.jmh + ".*"] - logger.info("JMH: {}", include) + includes = [".*" + project.jmh + ".*"] + logger.info("JMH: {}", includes) } } @@ -166,8 +180,9 @@ jacocoTestReport { dependsOn testNG reports { - xml.enabled = true - html.enabled = true + xml.required.set(true) + csv.required.set(false) + html.required.set(true) } } @@ -179,44 +194,25 @@ checkstyle { "checkstyle.suppressions.file": project.file("config/checkstyle/suppressions.xml"), "checkstyle.header.file" : project.file("config/license/HEADER_JAVA") ] + checkstyleMain.exclude '**/module-info.java' } if (project.hasProperty("releaseMode")) { logger.lifecycle("ReleaseMode: {}", project.releaseMode) - /* - if ("branch" == project.releaseMode) { - - if (version.endsWith("-SNAPSHOT")) { - publishing { - repositories { - maven { - url = "https://s01.oss.sonatype.org/content/repositories/snapshots/" - } - } - } - - mavenPublish { - nexus { - stagingProfile = "io.reactivex" - } - } - } - } - */ if ("full" == project.releaseMode) { signing { if (project.hasProperty("SIGNING_PRIVATE_KEY") && project.hasProperty("SIGNING_PASSWORD")) { useInMemoryPgpKeys(project.getProperty("SIGNING_PRIVATE_KEY"), project.getProperty("SIGNING_PASSWORD")) + sign(publishing.publications) } } - /* - mavenPublish { - nexus { - stagingProfile = "io.reactivex" - } - } - */ + } + mavenPublishing { + // or when publishing to https://central.sonatype.com/ + publishToMavenCentral(com.vanniktech.maven.publish.SonatypeHost.CENTRAL_PORTAL) + + // signAllPublications() } } diff --git a/docs/Backpressure-(2.0).md b/docs/Backpressure-(2.0).md index 61361d21c4..6b2f2860af 100644 --- a/docs/Backpressure-(2.0).md +++ b/docs/Backpressure-(2.0).md @@ -172,7 +172,7 @@ If some of the values can be safely ignored, one can use the sampling (with time } ``` -Note hovewer that these operators only reduce the rate of value reception by the downstream and thus they may still lead to `MissingBackpressureException`. +Note however that these operators only reduce the rate of value reception by the downstream and thus they may still lead to `MissingBackpressureException`. ## onBackpressureBuffer() @@ -229,7 +229,7 @@ Note that the last two strategies cause discontinuity in the stream as they drop ## onBackpressureDrop() -Whenever the downstream is not ready to receive values, this operator will drop that elemenet from the sequence. One can think of it as a 0 capacity `onBackpressureBuffer` with strategy `ON_OVERFLOW_DROP_LATEST`. +Whenever the downstream is not ready to receive values, this operator will drop that element from the sequence. One can think of it as a 0 capacity `onBackpressureBuffer` with strategy `ON_OVERFLOW_DROP_LATEST`. This operator is useful when one can safely ignore values from a source (such as mouse moves or current GPS location signals) as there will be more up-to-date values later on. diff --git a/docs/Filtering-Observables.md b/docs/Filtering-Observables.md index 620800dc8c..512b69ba8a 100644 --- a/docs/Filtering-Observables.md +++ b/docs/Filtering-Observables.md @@ -259,7 +259,7 @@ firstOrError.subscribe( **ReactiveX documentation:** [http://reactivex.io/documentation/operators/ignoreelements.html](http://reactivex.io/documentation/operators/ignoreelements.html) -Ignores the single item emitted by a `Single` or `Maybe` source, and returns a `Completable` that signals only the error or completion event from the the source. +Ignores the single item emitted by a `Single` or `Maybe` source, and returns a `Completable` that signals only the error or completion event from the source. ### ignoreElement example diff --git a/docs/What's-different-in-2.0.md b/docs/What's-different-in-2.0.md index fac50df56d..edc681fa7f 100644 --- a/docs/What's-different-in-2.0.md +++ b/docs/What's-different-in-2.0.md @@ -450,12 +450,12 @@ Before 2.0.7, the operator `strict()` had to be applied in order to achieve the As one of the primary goals of RxJava 2, the design focuses on performance and in order enable it, RxJava 2.0.7 adds a custom `io.reactivex.FlowableSubscriber` interface (extends `org.reactivestreams.Subscriber`) but adds no new methods to it. The new interface is **constrained to RxJava 2** and represents a consumer to `Flowable` that is able to work in a mode that relaxes the Reactive-Streams version 1.0.0 specification in rules §1.3, §2.3, §2.12 and §3.9: - - §1.3 relaxation: `onSubscribe` may run concurrently with `onNext` in case the `FlowableSubscriber` calls `request()` from inside `onSubscribe` and it is the resposibility of `FlowableSubscriber` to ensure thread-safety between the remaining instructions in `onSubscribe` and `onNext`. + - §1.3 relaxation: `onSubscribe` may run concurrently with `onNext` in case the `FlowableSubscriber` calls `request()` from inside `onSubscribe` and it is the responsibility of `FlowableSubscriber` to ensure thread-safety between the remaining instructions in `onSubscribe` and `onNext`. - §2.3 relaxation: calling `Subscription.cancel` and `Subscription.request` from `FlowableSubscriber.onComplete()` or `FlowableSubscriber.onError()` is considered a no-operation. - §2.12 relaxation: if the same `FlowableSubscriber` instance is subscribed to multiple sources, it must ensure its `onXXX` methods remain thread safe. - §3.9 relaxation: issuing a non-positive `request()` will not stop the current stream but signal an error via `RxJavaPlugins.onError`. -From a user's perspective, if one was using the the `subscribe` methods other than `Flowable.subscribe(Subscriber)`, there is no need to do anything regarding this change and there is no extra penalty for it. +From a user's perspective, if one was using the `subscribe` methods other than `Flowable.subscribe(Subscriber)`, there is no need to do anything regarding this change and there is no extra penalty for it. If one was using `Flowable.subscribe(Subscriber)` with the built-in RxJava `Subscriber` implementations such as `DisposableSubscriber`, `TestSubscriber` and `ResourceSubscriber`, there is a small runtime overhead (one `instanceof` check) associated when the code is not recompiled against 2.0.7. diff --git a/docs/Writing-operators-for-2.0.md b/docs/Writing-operators-for-2.0.md index e8486564b1..1a51664880 100644 --- a/docs/Writing-operators-for-2.0.md +++ b/docs/Writing-operators-for-2.0.md @@ -565,7 +565,7 @@ Version 2.0.7 introduced a new interface, `FlowableSubscriber` that extends `Sub The rule relaxations are as follows: -- §1.3 relaxation: `onSubscribe` may run concurrently with onNext in case the `FlowableSubscriber` calls `request()` from inside `onSubscribe` and it is the resposibility of `FlowableSubscriber` to ensure thread-safety between the remaining instructions in `onSubscribe` and `onNext`. +- §1.3 relaxation: `onSubscribe` may run concurrently with onNext in case the `FlowableSubscriber` calls `request()` from inside `onSubscribe` and it is the responsibility of `FlowableSubscriber` to ensure thread-safety between the remaining instructions in `onSubscribe` and `onNext`. - §2.3 relaxation: calling `Subscription.cancel` and `Subscription.request` from `FlowableSubscriber.onComplete()` or `FlowableSubscriber.onError()` is considered a no-operation. - §2.12 relaxation: if the same `FlowableSubscriber` instance is subscribed to multiple sources, it must ensure its `onXXX` methods remain thread safe. - §3.9 relaxation: issuing a non-positive `request()` will not stop the current stream but signal an error via `RxJavaPlugins.onError`. diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index c7d437bbb4..3c44eb1b6f 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,6 +1,6 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-7.6.4-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-8.14-bin.zip networkTimeout=10000 zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists diff --git a/src/main/java/io/reactivex/rxjava3/core/Scheduler.java b/src/main/java/io/reactivex/rxjava3/core/Scheduler.java index 580e2739cb..3aa001127a 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Scheduler.java +++ b/src/main/java/io/reactivex/rxjava3/core/Scheduler.java @@ -350,7 +350,7 @@ public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initial * }); * * - * Slowing down the rate to no more than than 1 a second. This suffers from + * Slowing down the rate to no more than 1 a second. This suffers from * the same problem as the one above I could find an {@link Flowable} * operator that limits the rate without dropping the values (aka leaky * bucket algorithm). diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/BlockingFlowableIterable.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/BlockingFlowableIterable.java index 36436d1370..11239d04a7 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/BlockingFlowableIterable.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/BlockingFlowableIterable.java @@ -138,9 +138,12 @@ public void onSubscribe(Subscription s) { @Override public void onNext(T t) { if (!queue.offer(t)) { + // Error must be set first before calling cancel to avoid race + // with hasNext(), which checks for cancel first before checking + // for error. + error = new QueueOverflowException(); SubscriptionHelper.cancel(this); - - onError(new QueueOverflowException()); + onComplete(); } else { signalConsumer(); } diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableDelay.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableDelay.java index 469d0dd48b..a7de73213a 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableDelay.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableDelay.java @@ -111,7 +111,9 @@ final class OnNext implements Runnable { @Override public void run() { - downstream.onNext(t); + if (!w.isDisposed()) { + downstream.onNext(t); + } } } diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableDelay.java b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableDelay.java index 1801cce1f2..7c01c23f90 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableDelay.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableDelay.java @@ -111,7 +111,9 @@ final class OnNext implements Runnable { @Override public void run() { - downstream.onNext(t); + if (!w.isDisposed()) { + downstream.onNext(t); + } } } diff --git a/src/main/java/io/reactivex/rxjava3/internal/queue/MpscLinkedQueue.java b/src/main/java/io/reactivex/rxjava3/internal/queue/MpscLinkedQueue.java index d735ff43c0..e8d19c633e 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/queue/MpscLinkedQueue.java +++ b/src/main/java/io/reactivex/rxjava3/internal/queue/MpscLinkedQueue.java @@ -91,6 +91,8 @@ public T poll() { // we have to null out the value because we are going to hang on to the node final T nextValue = nextNode.getAndNullValue(); spConsumerNode(nextNode); + // unlink previous consumer to help gc + currConsumerNode.soNext(null); return nextValue; } else if (currConsumerNode != lvProducerNode()) { @@ -101,6 +103,8 @@ else if (currConsumerNode != lvProducerNode()) { // we have to null out the value because we are going to hang on to the node final T nextValue = nextNode.getAndNullValue(); spConsumerNode(nextNode); + // unlink previous consumer to help gc + currConsumerNode.soNext(null); return nextValue; } return null; diff --git a/src/main/java/io/reactivex/rxjava3/internal/schedulers/SchedulerWhen.java b/src/main/java/io/reactivex/rxjava3/internal/schedulers/SchedulerWhen.java index 0780aea610..814c971f1c 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/schedulers/SchedulerWhen.java +++ b/src/main/java/io/reactivex/rxjava3/internal/schedulers/SchedulerWhen.java @@ -80,7 +80,7 @@ * } * * - * Slowing down the rate to no more than than 1 a second. This suffers from the + * Slowing down the rate to no more than 1 a second. This suffers from the * same problem as the one above I could find an {@link Observable} operator * that limits the rate without dropping the values (aka leaky bucket * algorithm). diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableDelayTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableDelayTest.java index 3160f61173..4d67594a17 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableDelayTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableDelayTest.java @@ -20,6 +20,7 @@ import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; +import java.util.concurrent.locks.LockSupport; import org.junit.*; import org.mockito.InOrder; @@ -28,6 +29,7 @@ import io.reactivex.rxjava3.core.*; import io.reactivex.rxjava3.exceptions.TestException; import io.reactivex.rxjava3.functions.*; +import io.reactivex.rxjava3.internal.disposables.SequentialDisposable; import io.reactivex.rxjava3.internal.functions.Functions; import io.reactivex.rxjava3.processors.PublishProcessor; import io.reactivex.rxjava3.schedulers.*; @@ -1030,4 +1032,38 @@ public Publisher apply(Integer t) throws Exception { .to(TestHelper.testConsumer()) .assertFailureAndMessage(NullPointerException.class, "The itemDelay returned a null Publisher"); } + + @Test + public void cancelShouldPreventRandomSubsequentEmissions() { + for (int attempt = 1; attempt < 100; attempt ++) { + + SequentialDisposable disposable = new SequentialDisposable(); + ConcurrentLinkedQueue sink = new ConcurrentLinkedQueue<>(); + + disposable.replace( + Flowable.range(1, 10) + .delay(1, TimeUnit.MICROSECONDS, Schedulers.computation(), true) + .doOnNext(v -> { + if (v == 1) { + Schedulers.computation().scheduleDirect(disposable::dispose); + } + sink.offer(v); + }) + .subscribe()); + + LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(1)); + + Integer last = null; + + while (!sink.isEmpty()) { + Integer current = sink.poll(); + + if (last != null && last + 1 != current) { + fail("Emission hole: " + last + " -> " + current); + } + + last = current; + } + } + } } diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableDelayTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableDelayTest.java index ea8d51d996..778d2d4e64 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableDelayTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableDelayTest.java @@ -20,6 +20,7 @@ import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.LockSupport; import org.junit.*; import org.mockito.InOrder; @@ -29,6 +30,7 @@ import io.reactivex.rxjava3.core.Observer; import io.reactivex.rxjava3.exceptions.TestException; import io.reactivex.rxjava3.functions.*; +import io.reactivex.rxjava3.internal.disposables.SequentialDisposable; import io.reactivex.rxjava3.internal.functions.Functions; import io.reactivex.rxjava3.observers.*; import io.reactivex.rxjava3.schedulers.*; @@ -978,4 +980,37 @@ public Observable apply(Integer t) throws Exception { .to(TestHelper.testConsumer()) .assertFailureAndMessage(NullPointerException.class, "The itemDelay returned a null ObservableSource"); } -} + + @Test + public void cancelShouldPreventRandomSubsequentEmissions() { + for (int attempt = 1; attempt < 100; attempt ++) { + + SequentialDisposable disposable = new SequentialDisposable(); + ConcurrentLinkedQueue sink = new ConcurrentLinkedQueue<>(); + + disposable.replace( + Observable.range(1, 10) + .delay(1, TimeUnit.MICROSECONDS, Schedulers.computation(), true) + .doOnNext(v -> { + if (v == 1) { + Schedulers.computation().scheduleDirect(disposable::dispose); + } + sink.offer(v); + }) + .subscribe()); + + LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(1)); + + Integer last = null; + + while (!sink.isEmpty()) { + Integer current = sink.poll(); + + if (last != null && last + 1 != current) { + fail("Emission hole: " + last + " -> " + current); + } + + last = current; + } + } + }}