Skip to content

Commit 35a609d

Browse files
committed
Refactor FluxWriter to use more straight through processing
1 parent 8b89d1d commit 35a609d

File tree

1 file changed

+30
-9
lines changed
  • spring-boot-project/spring-boot/src/main/java/org/springframework/boot/web/reactive/result/view

1 file changed

+30
-9
lines changed

spring-boot-project/spring-boot/src/main/java/org/springframework/boot/web/reactive/result/view/FluxWriter.java

+30-9
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import java.io.IOException;
2020
import java.io.Writer;
2121
import java.nio.charset.Charset;
22+
import java.util.ArrayList;
23+
import java.util.List;
2224
import java.util.function.Supplier;
2325

2426
import org.reactivestreams.Publisher;
@@ -39,7 +41,9 @@ class FluxWriter extends Writer {
3941

4042
private final Charset charset;
4143

42-
private Flux<String> buffers;
44+
private List<String> current = new ArrayList<>();
45+
46+
private List<Object> accumulated = new ArrayList<>();
4347

4448
public FluxWriter(Supplier<DataBuffer> factory) {
4549
this(factory, Charset.defaultCharset());
@@ -48,17 +52,32 @@ public FluxWriter(Supplier<DataBuffer> factory) {
4852
public FluxWriter(Supplier<DataBuffer> factory, Charset charset) {
4953
this.factory = factory;
5054
this.charset = charset;
51-
this.buffers = Flux.empty();
5255
}
5356

5457
public Publisher<? extends Publisher<? extends DataBuffer>> getBuffers() {
55-
return this.buffers
56-
.map(string -> Mono.just(buffer().write(string, this.charset)));
58+
Flux<String> buffers = Flux.empty();
59+
if (!this.current.isEmpty()) {
60+
this.accumulated.add(new ArrayList<>(this.current));
61+
this.current.clear();
62+
}
63+
for (Object thing : this.accumulated) {
64+
if (thing instanceof Publisher) {
65+
@SuppressWarnings("unchecked")
66+
Publisher<String> publisher = (Publisher<String>) thing;
67+
buffers = buffers.concatWith(publisher);
68+
}
69+
else {
70+
@SuppressWarnings("unchecked")
71+
List<String> list = (List<String>) thing;
72+
buffers = buffers.concatWithValues(list.toArray(new String[0]));
73+
}
74+
}
75+
return buffers.map(string -> Mono.just(buffer().write(string, this.charset)));
5776
}
5877

5978
@Override
6079
public void write(char[] cbuf, int off, int len) throws IOException {
61-
this.buffers = this.buffers.concatWith(Mono.just(new String(cbuf, off, len)));
80+
this.current.add(new String(cbuf, off, len));
6281
}
6382

6483
@Override
@@ -79,13 +98,15 @@ private DataBuffer buffer() {
7998

8099
public void write(Object thing) {
81100
if (thing instanceof Publisher) {
82-
@SuppressWarnings("unchecked")
83-
Publisher<String> publisher = (Publisher<String>) thing;
84-
this.buffers = this.buffers.concatWith(Flux.from(publisher));
101+
if (!this.current.isEmpty()) {
102+
this.accumulated.add(new ArrayList<>(this.current));
103+
this.current.clear();
104+
}
105+
this.accumulated.add(thing);
85106
}
86107
else {
87108
if (thing instanceof String) {
88-
this.buffers = this.buffers.concatWith(Mono.just((String) thing));
109+
this.current.add((String) thing);
89110
}
90111
}
91112
}

0 commit comments

Comments
 (0)