Skip to content

Commit 93ce61f

Browse files
committed
RSocket - Add Cbor test, fixed consumer test and minor cleanup
polish
1 parent dd00e15 commit 93ce61f

File tree

5 files changed

+108
-49
lines changed

5 files changed

+108
-49
lines changed

spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/FunctionRSocketMessageHandler.java

+4-3
Original file line numberDiff line numberDiff line change
@@ -264,23 +264,24 @@ public Object resolveArgumentValue(MethodParameter parameter,
264264
return MessageBuilder.withPayload(structure).build();
265265
}
266266
}
267+
else {
268+
return MessageBuilder.withPayload(structure).build();
269+
}
267270
}
268271
return MessageBuilder.withPayload(bytePayload).copyHeadersIfAbsent(message.getHeaders()).build();
269272
});
270273
return MessageBuilder.createMessage(argument, message.getHeaders());
271274
}
272-
else {
275+
else { // delegate to the existing argument resolvers
273276
for (HandlerMethodArgumentResolver handlerMethodArgumentResolver : this.resolvers) {
274277
if (handlerMethodArgumentResolver.supportsParameter(parameter)) {
275278
Publisher<?> arg = handlerMethodArgumentResolver.resolveArgument(parameter, message);
276279
return MessageBuilder.withPayload(arg).copyHeadersIfAbsent(message.getHeaders()).build();
277280
}
278-
279281
}
280282
return message;
281283
}
282284
}
283-
284285
}
285286

286287
protected static final class FunctionRSocketPayloadReturnValueHandler extends RSocketPayloadReturnValueHandler {

spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/MessageAwareJsonDecoder.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ class MessageAwareJsonDecoder extends AbstractDecoder<Object> {
5555

5656
@Override
5757
public boolean canDecode(ResolvableType elementType, @Nullable MimeType mimeType) {
58-
return mimeType.isCompatibleWith(MimeTypeUtils.APPLICATION_JSON);
58+
return mimeType != null && mimeType.isCompatibleWith(MimeTypeUtils.APPLICATION_JSON);
5959
}
6060

6161
@SuppressWarnings("unchecked")

spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/MessageAwareJsonEncoder.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ class MessageAwareJsonEncoder extends AbstractEncoder<Object> {
6666

6767
@Override
6868
public boolean canEncode(ResolvableType elementType, MimeType mimeType) {
69-
boolean canEncode = mimeType.isCompatibleWith(MimeTypeUtils.APPLICATION_JSON);
69+
boolean canEncode = mimeType != null && mimeType.isCompatibleWith(MimeTypeUtils.APPLICATION_JSON);
7070
if (canEncode && this.isClient) {
7171
canEncode = (FunctionTypeUtils.isMessage(elementType.getType())
7272
|| Map.class.isAssignableFrom(FunctionTypeUtils.getRawType(elementType.getType())));

spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketListenerFunction.java

+39-42
Original file line numberDiff line numberDiff line change
@@ -46,56 +46,51 @@ class RSocketListenerFunction implements Function<Object, Publisher<?>> {
4646
private final FunctionInvocationWrapper targetFunction;
4747

4848
RSocketListenerFunction(FunctionInvocationWrapper targetFunction) {
49+
Assert.isTrue(targetFunction != null, "Failed to discover target function. \n"
50+
+ "To fix it you should either provide 'spring.cloud.function.definition' property "
51+
+ "or if you are using RSocketRequester provide valid function definition via 'route' "
52+
+ "operator (e.g., requester.route(\"echo\"))");
4953
this.targetFunction = targetFunction;
5054
}
5155

52-
/*
53-
* We need to maintain the input typeless to ensure that no encoder/decoders will attempt any conversion.
54-
* That said it will always be Message<Publisher<Object>>
55-
*/
56+
5657
@SuppressWarnings("unchecked")
5758
@Override
5859
public Publisher<?> apply(Object input) {
59-
Assert.isTrue(this.targetFunction != null, "Failed to discover target function. \n"
60-
+ "To fix it you should either provide 'spring.cloud.function.definition' property "
61-
+ "or if you are using RSocketRequester provide valid function definition via 'route' "
62-
+ "operator (e.g., requester.route(\"echo\"))");
63-
// if (input instanceof Message) {
64-
Message<Publisher<Object>> inputMessage = (Message<Publisher<Object>>) input;
65-
FrameType frameType = RSocketFrameTypeMessageCondition.getFrameType(inputMessage);
66-
switch (frameType) {
67-
case REQUEST_FNF:
68-
return handle(inputMessage);
69-
case REQUEST_RESPONSE:
70-
case REQUEST_STREAM:
71-
case REQUEST_CHANNEL:
72-
return handleAndReply(inputMessage);
73-
default:
74-
throw new UnsupportedOperationException();
75-
}
76-
// }
77-
// throw new UnsupportedOperationException("Expecting input to be of type Message<Publisher<Object>>");
60+
/*
61+
* We need to maintain the input typeless to ensure that no encoder/decoders will attempt any conversion.
62+
* That said it will always be Message<Publisher<Object>>
63+
*/
64+
Message<Publisher<Object>> inputMessage = (Message<Publisher<Object>>) input;
65+
66+
FrameType frameType = RSocketFrameTypeMessageCondition.getFrameType(inputMessage);
67+
switch (frameType) {
68+
case REQUEST_FNF:
69+
return handle(inputMessage);
70+
case REQUEST_RESPONSE:
71+
case REQUEST_STREAM:
72+
case REQUEST_CHANNEL:
73+
return handleAndReply(inputMessage);
74+
default:
75+
throw new UnsupportedOperationException();
76+
}
7877
}
7978

8079
@SuppressWarnings({ "unchecked", "rawtypes" })
8180
private Mono<Void> handle(Message<Publisher<Object>> messageToProcess) {
8281
if (this.targetFunction.isRoutingFunction()) {
8382
Flux<?> dataFlux = Flux.from(messageToProcess.getPayload())
84-
.map((payload) -> {
85-
return MessageBuilder.createMessage(payload, messageToProcess.getHeaders());
86-
});
83+
.map(payload -> MessageBuilder.createMessage(payload, messageToProcess.getHeaders()));
8784
return dataFlux.doOnNext(this.targetFunction).then();
8885
}
8986
else if (this.targetFunction.isConsumer()) {
90-
Flux<?> dataFlux =
91-
Flux.from(messageToProcess.getPayload())
92-
.map((payload) -> MessageBuilder.createMessage(payload, messageToProcess.getHeaders()));
93-
if (FunctionTypeUtils.isPublisher(this.targetFunction.getInputType())) {
94-
dataFlux = dataFlux.transform((Function) this.targetFunction);
95-
}
96-
else {
97-
dataFlux = dataFlux.doOnNext(this.targetFunction);
98-
}
87+
Flux<?> dataFlux = Flux.from(messageToProcess.getPayload())
88+
.map(payload -> this.buildReceivedMessage(payload, messageToProcess.getHeaders()));
89+
90+
dataFlux = FunctionTypeUtils.isPublisher(this.targetFunction.getInputType())
91+
? dataFlux.transform((Function) this.targetFunction)
92+
: dataFlux.doOnNext(this.targetFunction);
93+
9994
return dataFlux.then();
10095
}
10196
else {
@@ -105,13 +100,9 @@ else if (this.targetFunction.isConsumer()) {
105100

106101
@SuppressWarnings({ "unchecked", "rawtypes" })
107102
private Flux<?> handleAndReply(Message<Publisher<Object>> messageToProcess) {
108-
Flux<?> dataFlux =
109-
Flux.from(messageToProcess.getPayload())
110-
.map((payload) -> {
111-
return payload instanceof Message
112-
? MessageBuilder.fromMessage((Message<?>) payload).copyHeadersIfAbsent(messageToProcess.getHeaders()).build()
113-
: MessageBuilder.withPayload(payload).copyHeadersIfAbsent(messageToProcess.getHeaders()).build();
114-
});
103+
Flux<?> dataFlux = Flux.from(messageToProcess.getPayload())
104+
.map(payload -> this.buildReceivedMessage(payload, messageToProcess.getHeaders()));
105+
115106
if (this.targetFunction.getInputType() != null && FunctionTypeUtils.isPublisher(this.targetFunction.getInputType())) {
116107
dataFlux = dataFlux.transform((Function) this.targetFunction);
117108
}
@@ -132,6 +123,12 @@ private Flux<?> handleAndReply(Message<Publisher<Object>> messageToProcess) {
132123
return dataFlux;
133124
}
134125

126+
private Message<?> buildReceivedMessage(Object mayBeMessage, MessageHeaders messageHeaders) {
127+
return mayBeMessage instanceof Message
128+
? MessageBuilder.fromMessage((Message<?>) mayBeMessage).copyHeadersIfAbsent(messageHeaders).build()
129+
: MessageBuilder.withPayload(mayBeMessage).copyHeadersIfAbsent(messageHeaders).build();
130+
}
131+
135132
/*
136133
* This will ensure that unless CT is application/json for which we provide Message aware encoder/decoder
137134
* the payload is extracted since no other available encoders/decoders understand Message.

spring-cloud-function-rsocket/src/test/java/org/springframework/cloud/function/rsocket/RSocketAutoConfigurationTests.java

+63-2
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package org.springframework.cloud.function.rsocket;
1818

19+
import java.util.Map;
1920
import java.util.function.Consumer;
2021
import java.util.function.Function;
2122
import java.util.function.Supplier;
@@ -38,9 +39,12 @@
3839
import org.springframework.core.env.ConfigurableEnvironment;
3940
import org.springframework.messaging.rsocket.RSocketRequester;
4041
import org.springframework.test.util.ReflectionTestUtils;
42+
import org.springframework.util.MimeType;
4143
import org.springframework.util.MimeTypeUtils;
4244
import org.springframework.util.SocketUtils;
4345

46+
import static org.assertj.core.api.Assertions.assertThat;
47+
4448
/**
4549
*
4650
* @author Oleg Zhurakousky
@@ -122,6 +126,35 @@ public void testImperativeFunctionAsRequestReplyWithDefinition() {
122126
}
123127
}
124128

129+
@SuppressWarnings("unchecked")
130+
@Test
131+
public void testWithCborContentType() {
132+
int port = SocketUtils.findAvailableTcpPort();
133+
try (
134+
ConfigurableApplicationContext applicationContext =
135+
new SpringApplicationBuilder(SampleFunctionConfiguration.class)
136+
.web(WebApplicationType.NONE)
137+
.run("--logging.level.org.springframework.cloud.function=DEBUG",
138+
"--spring.cloud.function.definition=uppercase",
139+
"--spring.rsocket.server.port=" + port);
140+
) {
141+
RSocketRequester.Builder rsocketRequesterBuilder =
142+
applicationContext.getBean(RSocketRequester.Builder.class);
143+
144+
Person p = new Person();
145+
p.setAge(23);
146+
p.setName("Bob");
147+
Map<String, Object> m = rsocketRequesterBuilder
148+
.dataMimeType(MimeType.valueOf("application/cbor"))
149+
.tcp("localhost", port)
150+
.route("echoMap")
151+
.data(p)
152+
.retrieveMono(Map.class).block();
153+
assertThat(m.get("name")).isEqualTo("Bob");
154+
assertThat(m.get("age")).isEqualTo(23);
155+
}
156+
}
157+
125158
@Test
126159
@Disabled
127160
public void testImperativeFunctionAsRequestReplyWithDefinitionExplicitExpectedOutputCt() {
@@ -472,6 +505,10 @@ public void testFireAndForgetConsumer() {
472505
.run("--logging.level.org.springframework.cloud.function=DEBUG",
473506
"--spring.rsocket.server.port=" + port);
474507
) {
508+
509+
SampleFunctionConfiguration config = applicationContext.getBean(SampleFunctionConfiguration.class);
510+
511+
475512
RSocketRequester.Builder rsocketRequesterBuilder =
476513
applicationContext.getBean(RSocketRequester.Builder.class);
477514

@@ -482,6 +519,8 @@ public void testFireAndForgetConsumer() {
482519
.as(StepVerifier::create)
483520
.expectComplete()
484521
.verify();
522+
String result = config.consumerData.asMono().block();
523+
assertThat(result).isEqualTo("hello");
485524
}
486525
}
487526

@@ -550,7 +589,7 @@ public void testRoutingWithRoutingFunction() {
550589
@Configuration
551590
public static class SampleFunctionConfiguration {
552591

553-
final Sinks.One<byte[]> consumerData = Sinks.one();
592+
final Sinks.One<String> consumerData = Sinks.one();
554593

555594
@Bean
556595
public Function<String, String> uppercase() {
@@ -567,6 +606,11 @@ public Function<String, String> echo() {
567606
return v -> v;
568607
}
569608

609+
@Bean
610+
public Function<Map<String, Object>, Map<String, Object>> echoMap() {
611+
return v -> v;
612+
}
613+
570614
@Bean
571615
public Function<Flux<String>, Flux<String>> uppercaseReactive() {
572616
return flux -> flux.map(v -> {
@@ -576,7 +620,7 @@ public Function<Flux<String>, Flux<String>> uppercaseReactive() {
576620
}
577621

578622
@Bean
579-
public Consumer<byte[]> log() {
623+
public Consumer<String> log() {
580624
return this.consumerData::tryEmitValue;
581625
}
582626

@@ -612,4 +656,21 @@ public Function<String, String> echo() {
612656
}
613657
}
614658

659+
public static class Person {
660+
private String name;
661+
private int age;
662+
public String getName() {
663+
return name;
664+
}
665+
public void setName(String name) {
666+
this.name = name;
667+
}
668+
public int getAge() {
669+
return age;
670+
}
671+
public void setAge(int age) {
672+
this.age = age;
673+
}
674+
}
675+
615676
}

0 commit comments

Comments
 (0)