27
27
import java .nio .ByteBuffer ;
28
28
import java .util .Queue ;
29
29
import java .util .concurrent .atomic .AtomicIntegerFieldUpdater ;
30
- import java .util .concurrent .atomic .AtomicLong ;
31
30
import java .util .concurrent .atomic .AtomicLongFieldUpdater ;
32
- import java .util .function .BiConsumer ;
33
31
34
32
import org .reactivestreams .Publisher ;
35
33
import org .reactivestreams .Subscription ;
@@ -74,8 +72,7 @@ class AsyncInputStreamAdapter implements AsyncInputStream {
74
72
private volatile boolean cancelled ;
75
73
private volatile boolean allDataBuffersReceived ;
76
74
private volatile Throwable error ;
77
- private final Queue <BiConsumer <DataBuffer , Integer >> readRequests = Queues .<BiConsumer <DataBuffer , Integer >> small ()
78
- .get ();
75
+ private final Queue <ReadRequest > readRequests = Queues .<ReadRequest > small ().get ();
79
76
80
77
private final Queue <DataBuffer > bufferQueue = Queues .<DataBuffer > small ().get ();
81
78
@@ -94,52 +91,7 @@ public Publisher<Integer> read(ByteBuffer dst) {
94
91
95
92
return Flux .create (sink -> {
96
93
97
- AtomicLong written = new AtomicLong ();
98
- readRequests .offer ((db , bytecount ) -> {
99
-
100
- try {
101
-
102
- if (error != null ) {
103
- onError (sink , error );
104
- return ;
105
- }
106
-
107
- if (bytecount == -1 ) {
108
-
109
- onComplete (sink , written .get () > 0 ? written .intValue () : -1 );
110
- return ;
111
- }
112
-
113
- ByteBuffer byteBuffer = db .asByteBuffer ();
114
- int remaining = byteBuffer .remaining ();
115
- int writeCapacity = Math .min (dst .remaining (), remaining );
116
- int limit = Math .min (byteBuffer .position () + writeCapacity , byteBuffer .capacity ());
117
- int toWrite = limit - byteBuffer .position ();
118
-
119
- if (toWrite == 0 ) {
120
-
121
- onComplete (sink , written .intValue ());
122
- return ;
123
- }
124
-
125
- int oldPosition = byteBuffer .position ();
126
-
127
- byteBuffer .limit (toWrite );
128
- dst .put (byteBuffer );
129
- byteBuffer .limit (byteBuffer .capacity ());
130
- byteBuffer .position (oldPosition );
131
- db .readPosition (db .readPosition () + toWrite );
132
- written .addAndGet (toWrite );
133
-
134
- } catch (Exception e ) {
135
- onError (sink , e );
136
- } finally {
137
-
138
- if (db != null && db .readableByteCount () == 0 ) {
139
- DataBufferUtils .release (db );
140
- }
141
- }
142
- });
94
+ readRequests .offer (new ReadRequest (sink , dst ));
143
95
144
96
sink .onCancel (this ::terminatePendingReads );
145
97
sink .onDispose (this ::terminatePendingReads );
@@ -243,12 +195,12 @@ void drainLoop() {
243
195
continue ;
244
196
}
245
197
246
- BiConsumer < DataBuffer , Integer > consumer = AsyncInputStreamAdapter .this .readRequests .peek ();
198
+ ReadRequest consumer = AsyncInputStreamAdapter .this .readRequests .peek ();
247
199
if (consumer == null ) {
248
200
break ;
249
201
}
250
202
251
- consumer .accept (wip , wip .readableByteCount ());
203
+ consumer .transferBytes (wip , wip .readableByteCount ());
252
204
}
253
205
254
206
if (bufferQueue .isEmpty ()) {
@@ -269,10 +221,10 @@ void drainLoop() {
269
221
*/
270
222
void terminatePendingReads () {
271
223
272
- BiConsumer < DataBuffer , Integer > readers ;
224
+ ReadRequest readers ;
273
225
274
226
while ((readers = readRequests .poll ()) != null ) {
275
- readers .accept ( null , - 1 );
227
+ readers .onComplete ( );
276
228
}
277
229
}
278
230
@@ -299,7 +251,7 @@ public void onNext(DataBuffer dataBuffer) {
299
251
return ;
300
252
}
301
253
302
- BiConsumer < DataBuffer , Integer > readRequest = AsyncInputStreamAdapter .this .readRequests .peek ();
254
+ ReadRequest readRequest = AsyncInputStreamAdapter .this .readRequests .peek ();
303
255
304
256
if (readRequest == null ) {
305
257
@@ -336,4 +288,76 @@ public void onComplete() {
336
288
}
337
289
}
338
290
}
291
+
292
+ /**
293
+ * Request to read bytes and transfer these to the associated {@link ByteBuffer}.
294
+ */
295
+ class ReadRequest {
296
+
297
+ private final FluxSink <Integer > sink ;
298
+ private final ByteBuffer dst ;
299
+
300
+ private int writtenBytes ;
301
+
302
+ ReadRequest (FluxSink <Integer > sink , ByteBuffer dst ) {
303
+ this .sink = sink ;
304
+ this .dst = dst ;
305
+ this .writtenBytes = -1 ;
306
+ }
307
+
308
+ public void onComplete () {
309
+
310
+ if (error != null ) {
311
+ AsyncInputStreamAdapter .this .onError (sink , error );
312
+ return ;
313
+ }
314
+
315
+ AsyncInputStreamAdapter .this .onComplete (sink , writtenBytes );
316
+ }
317
+
318
+ public void transferBytes (DataBuffer db , int bytes ) {
319
+
320
+ try {
321
+
322
+ if (error != null ) {
323
+ AsyncInputStreamAdapter .this .onError (sink , error );
324
+ return ;
325
+ }
326
+
327
+ ByteBuffer byteBuffer = db .asByteBuffer ();
328
+ int remaining = byteBuffer .remaining ();
329
+ int writeCapacity = Math .min (dst .remaining (), remaining );
330
+ int limit = Math .min (byteBuffer .position () + writeCapacity , byteBuffer .capacity ());
331
+ int toWrite = limit - byteBuffer .position ();
332
+
333
+ if (toWrite == 0 ) {
334
+
335
+ AsyncInputStreamAdapter .this .onComplete (sink , writtenBytes );
336
+ return ;
337
+ }
338
+
339
+ int oldPosition = byteBuffer .position ();
340
+
341
+ byteBuffer .limit (toWrite );
342
+ dst .put (byteBuffer );
343
+ byteBuffer .limit (byteBuffer .capacity ());
344
+ byteBuffer .position (oldPosition );
345
+ db .readPosition (db .readPosition () + toWrite );
346
+
347
+ if (writtenBytes == -1 ) {
348
+ writtenBytes = bytes ;
349
+ } else {
350
+ writtenBytes += bytes ;
351
+ }
352
+
353
+ } catch (Exception e ) {
354
+ AsyncInputStreamAdapter .this .onError (sink , e );
355
+ } finally {
356
+
357
+ if (db .readableByteCount () == 0 ) {
358
+ DataBufferUtils .release (db );
359
+ }
360
+ }
361
+ }
362
+ }
339
363
}
0 commit comments