File tree 4 files changed +55
-2
lines changed
main/java/io/reactivex/internal/operators
test/java/io/reactivex/internal/operators
4 files changed +55
-2
lines changed Original file line number Diff line number Diff line change @@ -141,7 +141,11 @@ void timeout(RefConnection rc) {
141
141
if (source instanceof Disposable ) {
142
142
((Disposable )source ).dispose ();
143
143
} else if (source instanceof ResettableConnectable ) {
144
- ((ResettableConnectable )source ).resetIf (connectionObject );
144
+ if (connectionObject == null ) {
145
+ rc .disconnectedEarly = true ;
146
+ } else {
147
+ ((ResettableConnectable )source ).resetIf (connectionObject );
148
+ }
145
149
}
146
150
}
147
151
}
@@ -160,6 +164,8 @@ static final class RefConnection extends AtomicReference<Disposable>
160
164
161
165
boolean connected ;
162
166
167
+ boolean disconnectedEarly ;
168
+
163
169
RefConnection (FlowableRefCount <?> parent ) {
164
170
this .parent = parent ;
165
171
}
@@ -172,6 +178,11 @@ public void run() {
172
178
@ Override
173
179
public void accept (Disposable t ) throws Exception {
174
180
DisposableHelper .replace (this , t );
181
+ synchronized (parent ) {
182
+ if (disconnectedEarly ) {
183
+ ((ResettableConnectable )parent .source ).resetIf (t );
184
+ }
185
+ }
175
186
}
176
187
}
177
188
Original file line number Diff line number Diff line change @@ -135,10 +135,15 @@ void timeout(RefConnection rc) {
135
135
connection = null ;
136
136
Disposable connectionObject = rc .get ();
137
137
DisposableHelper .dispose (rc );
138
+
138
139
if (source instanceof Disposable ) {
139
140
((Disposable )source ).dispose ();
140
141
} else if (source instanceof ResettableConnectable ) {
141
- ((ResettableConnectable )source ).resetIf (connectionObject );
142
+ if (connectionObject == null ) {
143
+ rc .disconnectedEarly = true ;
144
+ } else {
145
+ ((ResettableConnectable )source ).resetIf (connectionObject );
146
+ }
142
147
}
143
148
}
144
149
}
@@ -157,6 +162,8 @@ static final class RefConnection extends AtomicReference<Disposable>
157
162
158
163
boolean connected ;
159
164
165
+ boolean disconnectedEarly ;
166
+
160
167
RefConnection (ObservableRefCount <?> parent ) {
161
168
this .parent = parent ;
162
169
}
@@ -169,6 +176,11 @@ public void run() {
169
176
@ Override
170
177
public void accept (Disposable t ) throws Exception {
171
178
DisposableHelper .replace (this , t );
179
+ synchronized (parent ) {
180
+ if (disconnectedEarly ) {
181
+ ((ResettableConnectable )parent .source ).resetIf (t );
182
+ }
183
+ }
172
184
}
173
185
}
174
186
Original file line number Diff line number Diff line change @@ -1394,4 +1394,19 @@ public void timeoutDisposesSource() {
1394
1394
1395
1395
assertTrue (((Disposable )o .source ).isDisposed ());
1396
1396
}
1397
+
1398
+ @ Test
1399
+ public void disconnectBeforeConnect () {
1400
+ BehaviorProcessor <Integer > processor = BehaviorProcessor .create ();
1401
+
1402
+ Flowable <Integer > flowable = processor
1403
+ .replay (1 )
1404
+ .refCount ();
1405
+
1406
+ flowable .takeUntil (Flowable .just (1 )).test ();
1407
+
1408
+ processor .onNext (2 );
1409
+
1410
+ flowable .take (1 ).test ().assertResult (2 );
1411
+ }
1397
1412
}
Original file line number Diff line number Diff line change @@ -1345,4 +1345,19 @@ public void timeoutDisposesSource() {
1345
1345
1346
1346
assertTrue (((Disposable )o .source ).isDisposed ());
1347
1347
}
1348
+
1349
+ @ Test
1350
+ public void disconnectBeforeConnect () {
1351
+ BehaviorSubject <Integer > subject = BehaviorSubject .create ();
1352
+
1353
+ Observable <Integer > observable = subject
1354
+ .replay (1 )
1355
+ .refCount ();
1356
+
1357
+ observable .takeUntil (Observable .just (1 )).test ();
1358
+
1359
+ subject .onNext (2 );
1360
+
1361
+ observable .take (1 ).test ().assertResult (2 );
1362
+ }
1348
1363
}
You can’t perform that action at this time.
0 commit comments