Skip to content

Commit 56b3c49

Browse files
committed
GH-653 Make consistent priority order of function definition in RSocket
With this commit spring.cloud.function.routing-expression property takes precedence over all, then route(<function.definition>) and then spring.cloud.function.definition property
1 parent fbb7985 commit 56b3c49

File tree

3 files changed

+51
-39
lines changed

3 files changed

+51
-39
lines changed

Diff for: spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/FunctionRSocketMessageHandler.java

+27-12
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020-2020 the original author or authors.
2+
* Copyright 2020-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -59,6 +59,7 @@
5959
import org.springframework.messaging.support.MessageBuilder;
6060
import org.springframework.util.MimeTypeUtils;
6161
import org.springframework.util.ReflectionUtils;
62+
import org.springframework.util.RouteMatcher.Route;
6263
import org.springframework.util.StringUtils;
6364
import org.springframework.web.util.pattern.PathPatternRouteMatcher;
6465

@@ -116,10 +117,7 @@ public MetadataExtractor getMetadataExtractor() {
116117
@Override
117118
public Mono<Void> handleMessage(Message<?> message) throws MessagingException {
118119
if (!FrameType.SETUP.equals(message.getHeaders().get("rsocketFrameType"))) {
119-
String destination = this.getDestination(message).value();
120-
if (!StringUtils.hasText(destination)) {
121-
destination = this.discoverAndInjectDestinationHeader(message);
122-
}
120+
String destination = this.discoverAndInjectDestinationHeader(message);
123121

124122
Set<String> mappings = this.getDestinationLookup().keySet();
125123
if (!mappings.contains(destination)) {
@@ -154,16 +152,33 @@ protected List<? extends HandlerMethodReturnValueHandler> initReturnValueHandler
154152

155153
@SuppressWarnings("unchecked")
156154
private String discoverAndInjectDestinationHeader(Message<?> message) {
157-
String destination = this.functionProperties.getDefinition();
158-
if (!StringUtils.hasText(destination) && StringUtils.hasText(this.functionProperties.getRoutingExpression())) {
155+
156+
String destination;
157+
if (StringUtils.hasText(this.functionProperties.getRoutingExpression())) {
159158
destination = RoutingFunction.FUNCTION_NAME;
159+
Map<String, Object> headersMap = (Map<String, Object>) ReflectionUtils
160+
.getField(this.headersField, message.getHeaders());
161+
PathPatternRouteMatcher matcher = new PathPatternRouteMatcher();
162+
headersMap.put(DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER, matcher.parseRoute(destination));
163+
}
164+
else {
165+
Route route = (Route) message.getHeaders().get(DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER);
166+
destination = route.value();
167+
if (!StringUtils.hasText(destination)) {
168+
destination = this.functionProperties.getDefinition();
169+
Map<String, Object> headersMap = (Map<String, Object>) ReflectionUtils
170+
.getField(this.headersField, message.getHeaders());
171+
PathPatternRouteMatcher matcher = new PathPatternRouteMatcher();
172+
headersMap.put(DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER, matcher.parseRoute(destination));
173+
}
160174
}
161-
Map<String, Object> headersMap = (Map<String, Object>) ReflectionUtils
162-
.getField(this.headersField, message.getHeaders());
163-
164-
PathPatternRouteMatcher matcher = new PathPatternRouteMatcher();
165175

166-
headersMap.put(DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER, matcher.parseRoute(destination));
176+
if (!StringUtils.hasText(destination) && logger.isDebugEnabled()) {
177+
logger.debug("Failed to discover function definition. Neither "
178+
+ "`spring.cloud.function.definition`, nor `.route(<function.definition>)`, nor "
179+
+ "`spring.cloud.function.routing-expression` were provided. Wil use empty string "
180+
+ "for lookup, which will work only if there is one function in Function Catalog");
181+
}
167182
return destination;
168183
}
169184

Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020-2020 the original author or authors.
2+
* Copyright 2020-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,22 +16,17 @@
1616

1717
package org.springframework.cloud.function.rsocket;
1818

19-
import org.springframework.beans.BeansException;
2019
import org.springframework.beans.factory.ObjectProvider;
2120
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
2221
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
2322
import org.springframework.boot.autoconfigure.rsocket.RSocketMessageHandlerCustomizer;
2423
import org.springframework.boot.context.properties.EnableConfigurationProperties;
2524
import org.springframework.cloud.function.context.FunctionCatalog;
2625
import org.springframework.cloud.function.context.FunctionProperties;
27-
import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper;
28-
import org.springframework.context.ApplicationContext;
29-
import org.springframework.context.ApplicationContextAware;
3026
import org.springframework.context.annotation.Bean;
3127
import org.springframework.context.annotation.Configuration;
3228
import org.springframework.context.annotation.Primary;
3329
import org.springframework.messaging.rsocket.RSocketStrategies;
34-
import org.springframework.util.StringUtils;
3530

3631
/**
3732
* Main configuration class for components required to support RSocket integration with
@@ -45,14 +40,8 @@
4540
@Configuration(proxyBeanMethods = false)
4641
@EnableConfigurationProperties({ FunctionProperties.class, RSocketFunctionProperties.class })
4742
@ConditionalOnProperty(name = FunctionProperties.PREFIX + ".rsocket.enabled", matchIfMissing = true)
48-
class RSocketAutoConfiguration implements ApplicationContextAware {
43+
class RSocketAutoConfiguration {
4944

50-
private ApplicationContext applicationContext;
51-
52-
@Override
53-
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
54-
this.applicationContext = applicationContext;
55-
}
5645

5746
@Bean
5847
@ConditionalOnMissingBean
@@ -64,18 +53,6 @@ public FunctionRSocketMessageHandler functionRSocketMessageHandler(RSocketStrate
6453
FunctionRSocketMessageHandler rsocketMessageHandler = new FunctionRSocketMessageHandler(functionCatalog, functionProperties);
6554
rsocketMessageHandler.setRSocketStrategies(rSocketStrategies);
6655
customizers.orderedStream().forEach((customizer) -> customizer.customize(rsocketMessageHandler));
67-
registerFunctionsWithRSocketHandler(rsocketMessageHandler, functionCatalog, functionProperties);
6856
return rsocketMessageHandler;
6957
}
70-
71-
private void registerFunctionsWithRSocketHandler(FunctionRSocketMessageHandler rsocketMessageHandler,
72-
FunctionCatalog functionCatalog, FunctionProperties functionProperties) {
73-
String definition = functionProperties.getDefinition();
74-
if (StringUtils.hasText(definition)) {
75-
FunctionInvocationWrapper function = FunctionRSocketUtils
76-
.registerFunctionForDestination(definition, functionCatalog, this.applicationContext);
77-
rsocketMessageHandler.registerFunctionHandler(new RSocketListenerFunction(function), definition);
78-
}
79-
}
80-
8158
}

Diff for: spring-cloud-function-rsocket/src/test/java/org/springframework/cloud/function/rsocket/RSocketAutoConfigurationRoutingTests.java

+22-2
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,16 @@ public void testImperativeFunctionAsRequestReplyWithDefinition() {
5757
RSocketRequester.Builder rsocketRequesterBuilder =
5858
applicationContext.getBean(RSocketRequester.Builder.class);
5959

60+
rsocketRequesterBuilder.tcp("localhost", port)
61+
.route("uppercase")
62+
.metadata("{\"func_name\":\"echo\"}", MimeTypeUtils.APPLICATION_JSON)
63+
.data("hello")
64+
.retrieveMono(String.class)
65+
.as(StepVerifier::create)
66+
.expectNext("hello")
67+
.expectComplete()
68+
.verify();
69+
6070
rsocketRequesterBuilder.tcp("localhost", port)
6171
.route("")
6272
.metadata("{\"func_name\":\"echo\"}", MimeTypeUtils.APPLICATION_JSON)
@@ -69,13 +79,23 @@ public void testImperativeFunctionAsRequestReplyWithDefinition() {
6979

7080
rsocketRequesterBuilder.tcp("localhost", port)
7181
.route(RoutingFunction.FUNCTION_NAME)
72-
.metadata("{\"func_name\":\"uppercase\"}", MimeTypeUtils.APPLICATION_JSON)
82+
.metadata("{\"func_name\":\"echo\"}", MimeTypeUtils.APPLICATION_JSON)
7383
.data("hello")
7484
.retrieveMono(String.class)
7585
.as(StepVerifier::create)
76-
.expectNext("HELLO")
86+
.expectNext("hello")
7787
.expectComplete()
7888
.verify();
89+
90+
// rsocketRequesterBuilder.tcp("localhost", port)
91+
// .route(RoutingFunction.FUNCTION_NAME)
92+
// .metadata("{\"func_name\":\"uppercase\"}", MimeTypeUtils.APPLICATION_JSON)
93+
// .data("hello")
94+
// .retrieveMono(String.class)
95+
// .as(StepVerifier::create)
96+
// .expectNext("HELLO")
97+
// .expectComplete()
98+
// .verify();
7999
}
80100
}
81101

0 commit comments

Comments
 (0)