Skip to content

Commit 40d0507

Browse files
committed
GH-654 Fix how RoutingFunction is treated by RSocketListenerFunction
1 parent 3b8b110 commit 40d0507

File tree

1 file changed

+6
-1
lines changed

1 file changed

+6
-1
lines changed

spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketListenerFunction.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,12 @@ public Publisher<?> apply(Message<Flux<byte[]>> input) {
6969

7070
@SuppressWarnings({ "unchecked", "rawtypes" })
7171
private Mono<Void> handle(Message<Flux<byte[]>> messageToProcess) {
72-
if (this.targetFunction.isConsumer()) {
72+
if (this.targetFunction.isRoutingFunction()) {
73+
Flux<?> dataFlux = messageToProcess.getPayload()
74+
.map((payload) -> MessageBuilder.createMessage(payload, messageToProcess.getHeaders()));
75+
return dataFlux.doOnNext(this.targetFunction).then();
76+
}
77+
else if (this.targetFunction.isConsumer()) {
7378
Flux<?> dataFlux =
7479
messageToProcess.getPayload()
7580
.map((payload) -> MessageBuilder.createMessage(payload, messageToProcess.getHeaders()));

0 commit comments

Comments
 (0)