Skip to content

Commit 839aece

Browse files
mp911dechristophstrobl
authored andcommitted
DATAMONGO-2393 - Support configurable chunk size.
We now allow consuming GridFS files using a configurable chunk size. The default chunk size is now 256kb. Original Pull Request: spring-projects#799
1 parent c6592b0 commit 839aece

File tree

6 files changed

+88
-12
lines changed

6 files changed

+88
-12
lines changed

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/BinaryStreamAdapters.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -42,12 +42,14 @@ class BinaryStreamAdapters {
4242
*
4343
* @param inputStream must not be {@literal null}.
4444
* @param dataBufferFactory must not be {@literal null}.
45+
* @param bufferSize read {@code n} bytes per iteration.
4546
* @return {@link Flux} emitting {@link DataBuffer}s.
4647
* @see DataBufferFactory#allocateBuffer()
4748
*/
48-
static Flux<DataBuffer> toPublisher(AsyncInputStream inputStream, DataBufferFactory dataBufferFactory) {
49+
static Flux<DataBuffer> toPublisher(AsyncInputStream inputStream, DataBufferFactory dataBufferFactory,
50+
int bufferSize) {
4951

50-
return DataBufferPublisherAdapter.createBinaryStream(inputStream, dataBufferFactory) //
52+
return DataBufferPublisherAdapter.createBinaryStream(inputStream, dataBufferFactory, bufferSize) //
5153
.filter(it -> {
5254

5355
if (it.readableByteCount() == 0) {

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsResource.java

+29-6
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.io.FileNotFoundException;
2121
import java.io.IOException;
2222
import java.io.InputStream;
23+
import java.util.function.IntFunction;
2324

2425
import org.reactivestreams.Publisher;
2526

@@ -41,20 +42,19 @@ public class ReactiveGridFsResource extends AbstractResource {
4142

4243
private final @Nullable GridFSFile file;
4344
private final String filename;
44-
private final Flux<DataBuffer> content;
45+
private final IntFunction<Flux<DataBuffer>> contentFunction;
4546

4647
/**
4748
* Creates a new, absent {@link ReactiveGridFsResource}.
4849
*
4950
* @param filename filename of the absent resource.
5051
* @param content
51-
* @since 2.1
5252
*/
5353
private ReactiveGridFsResource(String filename, Publisher<DataBuffer> content) {
5454

5555
this.file = null;
5656
this.filename = filename;
57-
this.content = Flux.from(content);
57+
this.contentFunction = any -> Flux.from(content);
5858
}
5959

6060
/**
@@ -64,10 +64,21 @@ private ReactiveGridFsResource(String filename, Publisher<DataBuffer> content) {
6464
* @param content
6565
*/
6666
public ReactiveGridFsResource(GridFSFile file, Publisher<DataBuffer> content) {
67+
this(file, (IntFunction<Flux<DataBuffer>>) any -> Flux.from(content));
68+
}
69+
70+
/**
71+
* Creates a new {@link ReactiveGridFsResource} from the given {@link GridFSFile}.
72+
*
73+
* @param file must not be {@literal null}.
74+
* @param contentFunction
75+
* @since 2.2.1
76+
*/
77+
ReactiveGridFsResource(GridFSFile file, IntFunction<Flux<DataBuffer>> contentFunction) {
6778

6879
this.file = file;
6980
this.filename = file.getFilename();
70-
this.content = Flux.from(content);
81+
this.contentFunction = contentFunction;
7182
}
7283

7384
/**
@@ -165,16 +176,28 @@ public GridFSFile getGridFSFile() {
165176
}
166177

167178
/**
168-
* Retrieve the download stream.
179+
* Retrieve the download stream using the default chunk size of 256kb.
169180
*
170181
* @return
171182
*/
172183
public Flux<DataBuffer> getDownloadStream() {
184+
return getDownloadStream(256 * 1024); // 256kb buffers
185+
}
186+
187+
/**
188+
* Retrieve the download stream.
189+
*
190+
* @param chunkSize chunk size in bytes to use.
191+
* @return
192+
* @since 2.2.1
193+
*/
194+
public Flux<DataBuffer> getDownloadStream(int chunkSize) {
173195

174196
if (!exists()) {
175197
return Flux.error(new FileNotFoundException(String.format("%s does not exist.", getDescription())));
176198
}
177-
return this.content;
199+
200+
return contentFunction.apply(chunkSize);
178201
}
179202

180203
private void verifyExists() throws FileNotFoundException {

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplate.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -223,9 +223,11 @@ public Mono<ReactiveGridFsResource> getResource(GridFSFile file) {
223223

224224
return Mono.fromSupplier(() -> {
225225

226-
GridFSDownloadStream stream = getGridFs().openDownloadStream(file.getId());
226+
return new ReactiveGridFsResource(file, chunkSize -> {
227227

228-
return new ReactiveGridFsResource(file, BinaryStreamAdapters.toPublisher(stream, dataBufferFactory));
228+
GridFSDownloadStream stream = getGridFs().openDownloadStream(file.getId());
229+
return BinaryStreamAdapters.toPublisher(stream, dataBufferFactory, chunkSize);
230+
});
229231
});
230232
}
231233

spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/BinaryStreamAdaptersUnitTests.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.nio.ByteBuffer;
2626

2727
import org.junit.Test;
28+
2829
import org.springframework.core.io.ClassPathResource;
2930
import org.springframework.core.io.buffer.DataBuffer;
3031
import org.springframework.core.io.buffer.DataBufferUtils;
@@ -49,7 +50,7 @@ public void shouldAdaptAsyncInputStreamToDataBufferPublisher() throws IOExceptio
4950
byte[] content = StreamUtils.copyToByteArray(resource.getInputStream());
5051
AsyncInputStream inputStream = AsyncStreamHelper.toAsyncInputStream(resource.getInputStream());
5152

52-
Flux<DataBuffer> dataBuffers = BinaryStreamAdapters.toPublisher(inputStream, new DefaultDataBufferFactory());
53+
Flux<DataBuffer> dataBuffers = BinaryStreamAdapters.toPublisher(inputStream, new DefaultDataBufferFactory(), 256);
5354

5455
DataBufferUtils.join(dataBuffers) //
5556
.as(StepVerifier::create) //

spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/DataBufferPublisherAdapterUnitTests.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public void adapterShouldPropagateErrors() {
4444
when(asyncInput.read(any())).thenReturn(Mono.just(1), Mono.error(new IllegalStateException()));
4545
when(asyncInput.close()).thenReturn(Mono.empty());
4646

47-
Flux<DataBuffer> binaryStream = DataBufferPublisherAdapter.createBinaryStream(asyncInput, factory);
47+
Flux<DataBuffer> binaryStream = DataBufferPublisherAdapter.createBinaryStream(asyncInput, factory, 256);
4848

4949
StepVerifier.create(binaryStream, 0) //
5050
.thenRequest(1) //

spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplateTests.java

+48
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import reactor.test.StepVerifier;
2525

2626
import java.io.IOException;
27+
import java.nio.ByteBuffer;
2728
import java.util.UUID;
2829

2930
import org.bson.BsonObjectId;
@@ -40,14 +41,17 @@
4041
import org.springframework.core.io.buffer.DefaultDataBuffer;
4142
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
4243
import org.springframework.dao.IncorrectResultSizeDataAccessException;
44+
import org.springframework.data.mongodb.ReactiveMongoDatabaseFactory;
4345
import org.springframework.data.mongodb.core.SimpleMongoDbFactory;
46+
import org.springframework.data.mongodb.core.convert.MongoConverter;
4447
import org.springframework.data.mongodb.core.query.Query;
4548
import org.springframework.test.context.ContextConfiguration;
4649
import org.springframework.test.context.junit4.SpringRunner;
4750
import org.springframework.util.StreamUtils;
4851

4952
import com.mongodb.gridfs.GridFS;
5053
import com.mongodb.gridfs.GridFSInputFile;
54+
import com.mongodb.internal.HexUtils;
5155
import com.mongodb.reactivestreams.client.gridfs.AsyncInputStream;
5256
import com.mongodb.reactivestreams.client.gridfs.helpers.AsyncStreamHelper;
5357

@@ -66,6 +70,8 @@ public class ReactiveGridFsTemplateTests {
6670

6771
@Autowired ReactiveGridFsOperations operations;
6872
@Autowired SimpleMongoDbFactory mongoClient;
73+
@Autowired ReactiveMongoDatabaseFactory dbFactory;
74+
@Autowired MongoConverter mongoConverter;
6975

7076
@Before
7177
public void setUp() {
@@ -92,6 +98,48 @@ public void storesAndFindsSimpleDocument() {
9298
.verifyComplete();
9399
}
94100

101+
@Test // DATAMONGO-1855
102+
public void storesAndLoadsLargeFileCorrectly() {
103+
104+
ByteBuffer buffer = ByteBuffer.allocate(1000 * 1000 * 1); // 1 mb
105+
106+
int i = 0;
107+
while (buffer.remaining() != 0) {
108+
byte b = (byte) (i++ % 16);
109+
String string = HexUtils.toHex(new byte[] { b });
110+
buffer.put(string.getBytes());
111+
}
112+
buffer.flip();
113+
114+
DefaultDataBufferFactory factory = new DefaultDataBufferFactory();
115+
116+
ObjectId reference = operations.store(Flux.just(factory.wrap(buffer)), "large.txt").block();
117+
118+
buffer.clear();
119+
120+
// default chunk size
121+
operations.findOne(query(where("_id").is(reference))).flatMap(operations::getResource)
122+
.flatMapMany(ReactiveGridFsResource::getDownloadStream) //
123+
.transform(DataBufferUtils::join) //
124+
.as(StepVerifier::create) //
125+
.consumeNextWith(dataBuffer -> {
126+
127+
assertThat(dataBuffer.readableByteCount()).isEqualTo(buffer.remaining());
128+
assertThat(dataBuffer.asByteBuffer()).isEqualTo(buffer);
129+
}).verifyComplete();
130+
131+
// small chunk size
132+
operations.findOne(query(where("_id").is(reference))).flatMap(operations::getResource)
133+
.flatMapMany(reactiveGridFsResource -> reactiveGridFsResource.getDownloadStream(256)) //
134+
.transform(DataBufferUtils::join) //
135+
.as(StepVerifier::create) //
136+
.consumeNextWith(dataBuffer -> {
137+
138+
assertThat(dataBuffer.readableByteCount()).isEqualTo(buffer.remaining());
139+
assertThat(dataBuffer.asByteBuffer()).isEqualTo(buffer);
140+
}).verifyComplete();
141+
}
142+
95143
@Test // DATAMONGO-2392
96144
public void storesAndFindsByUUID() throws IOException {
97145

0 commit comments

Comments
 (0)