Skip to content

Commit ef550c4

Browse files
committed
Flush JSON stream after each element
Issue: SPR-15104
1 parent 45770d7 commit ef550c4

File tree

5 files changed

+48
-10
lines changed

5 files changed

+48
-10
lines changed

Diff for: spring-core/src/main/java/org/springframework/core/codec/AbstractEncoder.java

+7
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,13 @@
3131
*/
3232
public abstract class AbstractEncoder<T> implements Encoder<T> {
3333

34+
/**
35+
* Hint key to use with a {@link FlushingStrategy} value.
36+
*/
37+
public static final String FLUSHING_STRATEGY_HINT = AbstractEncoder.class.getName() + ".flushingStrategy";
38+
39+
public enum FlushingStrategy { AUTO, AFTER_EACH_ELEMENT }
40+
3441
private final List<MimeType> encodableMimeTypes;
3542

3643

Diff for: spring-web/src/main/java/org/springframework/http/codec/EncoderHttpMessageWriter.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import java.util.Map;
2222

2323
import org.reactivestreams.Publisher;
24+
import static org.springframework.core.codec.AbstractEncoder.FLUSHING_STRATEGY_HINT;
25+
import static org.springframework.core.codec.AbstractEncoder.FlushingStrategy.AFTER_EACH_ELEMENT;
2426
import reactor.core.publisher.Flux;
2527
import reactor.core.publisher.Mono;
2628

@@ -102,7 +104,8 @@ else if (MediaType.APPLICATION_OCTET_STREAM.equals(mediaType)) {
102104

103105
DataBufferFactory bufferFactory = outputMessage.bufferFactory();
104106
Flux<DataBuffer> body = this.encoder.encode(inputStream, bufferFactory, elementType, mediaType, hints);
105-
return outputMessage.writeWith(body);
107+
return (hints.get(FLUSHING_STRATEGY_HINT) == AFTER_EACH_ELEMENT ?
108+
outputMessage.writeAndFlushWith(body.map(Flux::just)) : outputMessage.writeWith(body));
106109
}
107110

108111
/**

Diff for: spring-web/src/main/java/org/springframework/http/codec/Jackson2ServerHttpMessageWriter.java

+32-4
Original file line numberDiff line numberDiff line change
@@ -21,19 +21,24 @@
2121
import java.util.Map;
2222

2323
import com.fasterxml.jackson.annotation.JsonView;
24+
import org.reactivestreams.Publisher;
25+
import static org.springframework.core.codec.AbstractEncoder.FLUSHING_STRATEGY_HINT;
26+
import static org.springframework.core.codec.AbstractEncoder.FlushingStrategy.AFTER_EACH_ELEMENT;
27+
import reactor.core.publisher.Mono;
2428

2529
import org.springframework.core.MethodParameter;
2630
import org.springframework.core.ResolvableType;
31+
import org.springframework.core.codec.AbstractEncoder;
2732
import org.springframework.core.codec.Encoder;
2833
import org.springframework.http.MediaType;
34+
import org.springframework.http.ReactiveHttpOutputMessage;
2935
import org.springframework.http.codec.json.AbstractJackson2Codec;
3036
import org.springframework.http.server.reactive.ServerHttpRequest;
37+
import org.springframework.http.server.reactive.ServerHttpResponse;
3138

3239
/**
33-
* {@link ServerHttpMessageWriter} that resolves those annotation or request based Jackson 2 hints:
34-
* <ul>
35-
* <li>{@code @JsonView} annotated handler method</li>
36-
* </ul>
40+
* Jackson {@link ServerHttpMessageWriter} that resolves {@code @JsonView} annotated handler
41+
* method and deals with {@link AbstractEncoder#FLUSHING_STRATEGY_HINT}.
3742
*
3843
* @author Sebastien Deleuze
3944
* @since 5.0
@@ -72,4 +77,27 @@ protected Map<String, Object> resolveWriteHints(ResolvableType streamType,
7277
return hints;
7378
}
7479

80+
@Override
81+
public Mono<Void> write(Publisher<?> inputStream, ResolvableType elementType, MediaType mediaType,
82+
ReactiveHttpOutputMessage outputMessage, Map<String, Object> hints) {
83+
84+
if ((mediaType != null) && mediaType.isCompatibleWith(MediaType.APPLICATION_STREAM_JSON)) {
85+
Map<String, Object> hintsWithFlush = new HashMap<>(hints);
86+
hintsWithFlush.put(FLUSHING_STRATEGY_HINT, AFTER_EACH_ELEMENT);
87+
return super.write(inputStream, elementType, mediaType, outputMessage, hintsWithFlush);
88+
}
89+
return super.write(inputStream, elementType, mediaType, outputMessage, hints);
90+
}
91+
92+
@Override
93+
public Mono<Void> write(Publisher<?> inputStream, ResolvableType streamType, ResolvableType elementType,
94+
MediaType mediaType, ServerHttpRequest request, ServerHttpResponse response, Map<String, Object> hints) {
95+
96+
if ((mediaType != null) && mediaType.isCompatibleWith(MediaType.APPLICATION_STREAM_JSON)) {
97+
Map<String, Object> hintsWithFlush = new HashMap<>(hints);
98+
hintsWithFlush.put(FLUSHING_STRATEGY_HINT, AFTER_EACH_ELEMENT);
99+
return super.write(inputStream, streamType, elementType, mediaType, request, response, hintsWithFlush);
100+
}
101+
return super.write(inputStream, streamType, elementType, mediaType, request, response, hints);
102+
}
75103
}

Diff for: spring-webflux/src/main/java/org/springframework/web/reactive/function/server/DefaultHandlerStrategiesBuilder.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.springframework.http.codec.FormHttpMessageReader;
3838
import org.springframework.http.codec.HttpMessageReader;
3939
import org.springframework.http.codec.HttpMessageWriter;
40+
import org.springframework.http.codec.Jackson2ServerHttpMessageWriter;
4041
import org.springframework.http.codec.ResourceHttpMessageWriter;
4142
import org.springframework.http.codec.ServerSentEventHttpMessageWriter;
4243
import org.springframework.http.codec.json.Jackson2JsonDecoder;
@@ -97,7 +98,7 @@ public void defaultConfiguration() {
9798
if (jackson2Present) {
9899
messageReader(new DecoderHttpMessageReader<>(new Jackson2JsonDecoder()));
99100
Jackson2JsonEncoder jsonEncoder = new Jackson2JsonEncoder();
100-
messageWriter(new EncoderHttpMessageWriter<>(jsonEncoder));
101+
messageWriter(new Jackson2ServerHttpMessageWriter(jsonEncoder));
101102
messageWriter(
102103
new ServerSentEventHttpMessageWriter(Collections.singletonList(jsonEncoder)));
103104
}

Diff for: spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/JsonStreamingIntegrationTests.java

+3-4
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import java.time.Duration;
2020

2121
import org.junit.Before;
22-
import org.junit.Ignore;
2322
import org.junit.Test;
2423
import reactor.core.publisher.Flux;
2524
import reactor.test.StepVerifier;
@@ -42,7 +41,6 @@
4241
/**
4342
* @author Sebastien Deleuze
4443
*/
45-
@Ignore
4644
public class JsonStreamingIntegrationTests extends AbstractHttpHandlerIntegrationTests {
4745

4846
private AnnotationConfigApplicationContext wac;
@@ -78,7 +76,8 @@ public void jsonStreaming() throws Exception {
7876
StepVerifier.create(result)
7977
.expectNext(new Person("foo 0"))
8078
.expectNext(new Person("foo 1"))
81-
.verifyComplete();
79+
.thenCancel()
80+
.verify();
8281
}
8382

8483
@RestController
@@ -87,7 +86,7 @@ static class JsonStreamingController {
8786

8887
@RequestMapping(value = "/stream", produces = APPLICATION_STREAM_JSON_VALUE)
8988
Flux<Person> person() {
90-
return Flux.interval(Duration.ofMillis(100)).map(l -> new Person("foo " + l)).take(2);
89+
return Flux.interval(Duration.ofMillis(100)).map(l -> new Person("foo " + l));
9190
}
9291

9392
}

0 commit comments

Comments
 (0)