17
17
18
18
import lombok .RequiredArgsConstructor ;
19
19
import reactor .core .CoreSubscriber ;
20
+ import reactor .core .publisher .Flux ;
21
+ import reactor .core .publisher .FluxSink ;
20
22
import reactor .core .publisher .Mono ;
21
23
import reactor .core .publisher .Operators ;
22
24
import reactor .util .concurrent .Queues ;
25
27
import java .nio .ByteBuffer ;
26
28
import java .util .Queue ;
27
29
import java .util .concurrent .atomic .AtomicIntegerFieldUpdater ;
30
+ import java .util .concurrent .atomic .AtomicLong ;
28
31
import java .util .concurrent .atomic .AtomicLongFieldUpdater ;
29
32
import java .util .function .BiConsumer ;
30
33
31
34
import org .reactivestreams .Publisher ;
32
35
import org .reactivestreams .Subscription ;
36
+
33
37
import org .springframework .core .io .buffer .DataBuffer ;
34
38
import org .springframework .core .io .buffer .DataBufferUtils ;
35
- import org .springframework .core .io .buffer .DefaultDataBufferFactory ;
36
39
37
40
import com .mongodb .reactivestreams .client .Success ;
38
41
import com .mongodb .reactivestreams .client .gridfs .AsyncInputStream ;
@@ -66,15 +69,16 @@ class AsyncInputStreamAdapter implements AsyncInputStream {
66
69
67
70
private final Publisher <? extends DataBuffer > buffers ;
68
71
private final Context subscriberContext ;
69
- private final DefaultDataBufferFactory factory = new DefaultDataBufferFactory ();
70
72
71
73
private volatile Subscription subscription ;
72
74
private volatile boolean cancelled ;
73
- private volatile boolean complete ;
75
+ private volatile boolean allDataBuffersReceived ;
74
76
private volatile Throwable error ;
75
77
private final Queue <BiConsumer <DataBuffer , Integer >> readRequests = Queues .<BiConsumer <DataBuffer , Integer >> small ()
76
78
.get ();
77
79
80
+ private final Queue <DataBuffer > bufferQueue = Queues .<DataBuffer > small ().get ();
81
+
78
82
// see DEMAND
79
83
volatile long demand ;
80
84
@@ -88,41 +92,75 @@ class AsyncInputStreamAdapter implements AsyncInputStream {
88
92
@ Override
89
93
public Publisher <Integer > read (ByteBuffer dst ) {
90
94
91
- return Mono .create (sink -> {
95
+ return Flux .create (sink -> {
92
96
97
+ AtomicLong written = new AtomicLong ();
93
98
readRequests .offer ((db , bytecount ) -> {
94
99
95
100
try {
96
101
97
102
if (error != null ) {
98
-
99
- sink .error (error );
103
+ onError (sink , error );
100
104
return ;
101
105
}
102
106
103
107
if (bytecount == -1 ) {
104
108
105
- sink . success ( -1 );
109
+ onComplete ( sink , written . get () > 0 ? written . intValue () : -1 );
106
110
return ;
107
111
}
108
112
109
113
ByteBuffer byteBuffer = db .asByteBuffer ();
110
- int toWrite = byteBuffer .remaining ();
114
+ int remaining = byteBuffer .remaining ();
115
+ int writeCapacity = Math .min (dst .remaining (), remaining );
116
+ int limit = Math .min (byteBuffer .position () + writeCapacity , byteBuffer .capacity ());
117
+ int toWrite = limit - byteBuffer .position ();
118
+
119
+ if (toWrite == 0 ) {
111
120
121
+ onComplete (sink , written .intValue ());
122
+ return ;
123
+ }
124
+
125
+ int oldPosition = byteBuffer .position ();
126
+
127
+ byteBuffer .limit (toWrite );
112
128
dst .put (byteBuffer );
113
- sink .success (toWrite );
129
+ byteBuffer .limit (byteBuffer .capacity ());
130
+ byteBuffer .position (oldPosition );
131
+ db .readPosition (db .readPosition () + toWrite );
132
+ written .addAndGet (toWrite );
114
133
115
134
} catch (Exception e ) {
116
- sink . error ( e );
135
+ onError ( sink , e );
117
136
} finally {
118
- DataBufferUtils .release (db );
137
+
138
+ if (db != null && db .readableByteCount () == 0 ) {
139
+ DataBufferUtils .release (db );
140
+ }
119
141
}
120
142
});
121
143
122
- request (1 );
144
+ sink .onCancel (this ::terminatePendingReads );
145
+ sink .onDispose (this ::terminatePendingReads );
146
+ sink .onRequest (this ::request );
123
147
});
124
148
}
125
149
150
+ void onError (FluxSink <Integer > sink , Throwable e ) {
151
+
152
+ readRequests .poll ();
153
+ sink .error (e );
154
+ }
155
+
156
+ void onComplete (FluxSink <Integer > sink , int writtenBytes ) {
157
+
158
+ readRequests .poll ();
159
+ DEMAND .decrementAndGet (this );
160
+ sink .next (writtenBytes );
161
+ sink .complete ();
162
+ }
163
+
126
164
/*
127
165
* (non-Javadoc)
128
166
* @see com.mongodb.reactivestreams.client.gridfs.AsyncInputStream#skip(long)
@@ -144,17 +182,19 @@ public Publisher<Success> close() {
144
182
cancelled = true ;
145
183
146
184
if (error != null ) {
185
+ terminatePendingReads ();
147
186
sink .error (error );
148
187
return ;
149
188
}
150
189
190
+ terminatePendingReads ();
151
191
sink .success (Success .SUCCESS );
152
192
});
153
193
}
154
194
155
- protected void request (int n ) {
195
+ protected void request (long n ) {
156
196
157
- if (complete ) {
197
+ if (allDataBuffersReceived && bufferQueue . isEmpty () ) {
158
198
159
199
terminatePendingReads ();
160
200
return ;
@@ -176,18 +216,51 @@ protected void request(int n) {
176
216
requestFromSubscription (subscription );
177
217
}
178
218
}
219
+
179
220
}
180
221
181
222
void requestFromSubscription (Subscription subscription ) {
182
223
183
- long demand = DEMAND .get (AsyncInputStreamAdapter .this );
184
-
185
224
if (cancelled ) {
186
225
subscription .cancel ();
187
226
}
188
227
189
- if (demand > 0 && DEMAND .compareAndSet (AsyncInputStreamAdapter .this , demand , demand - 1 )) {
190
- subscription .request (1 );
228
+ drainLoop ();
229
+ }
230
+
231
+ void drainLoop () {
232
+
233
+ while (DEMAND .get (AsyncInputStreamAdapter .this ) > 0 ) {
234
+
235
+ DataBuffer wip = bufferQueue .peek ();
236
+
237
+ if (wip == null ) {
238
+ break ;
239
+ }
240
+
241
+ if (wip .readableByteCount () == 0 ) {
242
+ bufferQueue .poll ();
243
+ continue ;
244
+ }
245
+
246
+ BiConsumer <DataBuffer , Integer > consumer = AsyncInputStreamAdapter .this .readRequests .peek ();
247
+ if (consumer == null ) {
248
+ break ;
249
+ }
250
+
251
+ consumer .accept (wip , wip .readableByteCount ());
252
+ }
253
+
254
+ if (bufferQueue .isEmpty ()) {
255
+
256
+ if (allDataBuffersReceived ) {
257
+ terminatePendingReads ();
258
+ return ;
259
+ }
260
+
261
+ if (demand > 0 ) {
262
+ subscription .request (1 );
263
+ }
191
264
}
192
265
}
193
266
@@ -199,7 +272,7 @@ void terminatePendingReads() {
199
272
BiConsumer <DataBuffer , Integer > readers ;
200
273
201
274
while ((readers = readRequests .poll ()) != null ) {
202
- readers .accept (factory . wrap ( new byte [ 0 ]) , -1 );
275
+ readers .accept (null , -1 );
203
276
}
204
277
}
205
278
@@ -214,53 +287,53 @@ public Context currentContext() {
214
287
public void onSubscribe (Subscription s ) {
215
288
216
289
AsyncInputStreamAdapter .this .subscription = s ;
217
-
218
- Operators .addCap (DEMAND , AsyncInputStreamAdapter .this , -1 );
219
290
s .request (1 );
220
291
}
221
292
222
293
@ Override
223
294
public void onNext (DataBuffer dataBuffer ) {
224
295
225
- if (cancelled || complete ) {
296
+ if (cancelled || allDataBuffersReceived ) {
226
297
DataBufferUtils .release (dataBuffer );
227
298
Operators .onNextDropped (dataBuffer , AsyncInputStreamAdapter .this .subscriberContext );
228
299
return ;
229
300
}
230
301
231
- BiConsumer <DataBuffer , Integer > poll = AsyncInputStreamAdapter .this .readRequests .poll ();
302
+ BiConsumer <DataBuffer , Integer > readRequest = AsyncInputStreamAdapter .this .readRequests .peek ();
232
303
233
- if (poll == null ) {
304
+ if (readRequest == null ) {
234
305
235
306
DataBufferUtils .release (dataBuffer );
236
307
Operators .onNextDropped (dataBuffer , AsyncInputStreamAdapter .this .subscriberContext );
237
308
subscription .cancel ();
238
309
return ;
239
310
}
240
311
241
- poll . accept (dataBuffer , dataBuffer . readableByteCount () );
312
+ bufferQueue . offer (dataBuffer );
242
313
243
- requestFromSubscription ( subscription );
314
+ drainLoop ( );
244
315
}
245
316
246
317
@ Override
247
318
public void onError (Throwable t ) {
248
319
249
- if (AsyncInputStreamAdapter .this .cancelled || AsyncInputStreamAdapter .this .complete ) {
320
+ if (AsyncInputStreamAdapter .this .cancelled || AsyncInputStreamAdapter .this .allDataBuffersReceived ) {
250
321
Operators .onErrorDropped (t , AsyncInputStreamAdapter .this .subscriberContext );
251
322
return ;
252
323
}
253
324
254
325
AsyncInputStreamAdapter .this .error = t ;
255
- AsyncInputStreamAdapter .this .complete = true ;
326
+ AsyncInputStreamAdapter .this .allDataBuffersReceived = true ;
256
327
terminatePendingReads ();
257
328
}
258
329
259
330
@ Override
260
331
public void onComplete () {
261
332
262
- AsyncInputStreamAdapter .this .complete = true ;
263
- terminatePendingReads ();
333
+ AsyncInputStreamAdapter .this .allDataBuffersReceived = true ;
334
+ if (bufferQueue .isEmpty ()) {
335
+ terminatePendingReads ();
336
+ }
264
337
}
265
338
}
266
339
}
0 commit comments