16
16
17
17
package org .springframework .cloud .function .rsocket ;
18
18
19
+ import java .io .ByteArrayOutputStream ;
20
+ import java .io .IOException ;
21
+ import java .io .InputStream ;
22
+ import java .io .OutputStream ;
19
23
import java .lang .reflect .Type ;
20
24
import java .util .Map ;
21
25
26
+ import org .reactivestreams .Publisher ;
22
27
import org .springframework .cloud .function .context .catalog .FunctionTypeUtils ;
23
28
import org .springframework .cloud .function .json .JsonMapper ;
24
29
import org .springframework .core .ResolvableType ;
30
+ import org .springframework .core .codec .AbstractDecoder ;
25
31
import org .springframework .core .codec .DecodingException ;
26
32
import org .springframework .core .io .buffer .DataBuffer ;
27
- import org .springframework .http . codec . json . Jackson2JsonDecoder ;
33
+ import org .springframework .core . io . buffer . DataBufferUtils ;
28
34
import org .springframework .lang .Nullable ;
29
35
import org .springframework .messaging .support .MessageBuilder ;
30
36
import org .springframework .util .MimeType ;
31
37
import org .springframework .util .MimeTypeUtils ;
32
38
39
+ import reactor .core .publisher .Flux ;
40
+
33
41
/**
34
42
*
35
43
* @author Oleg Zhurakousky
36
44
* @since 3.1
37
45
*
38
46
*/
39
- class MessageAwareJsonDecoder extends Jackson2JsonDecoder {
47
+ class MessageAwareJsonDecoder extends AbstractDecoder < Object > {
40
48
41
49
private final JsonMapper jsonMapper ;
42
50
@@ -49,22 +57,25 @@ public boolean canDecode(ResolvableType elementType, @Nullable MimeType mimeType
49
57
return mimeType .isCompatibleWith (MimeTypeUtils .APPLICATION_JSON );
50
58
}
51
59
52
-
53
60
@ SuppressWarnings ("unchecked" )
54
61
@ Override
55
62
public Object decode (DataBuffer dataBuffer , ResolvableType targetType ,
56
- @ Nullable MimeType mimeType , @ Nullable Map <String , Object > hints ) throws DecodingException {
63
+ @ Nullable MimeType mimeType , @ Nullable Map <String , Object > hints )
64
+ throws DecodingException {
57
65
58
- ResolvableType type = ResolvableType .forClassWithGenerics (Map .class , String .class , Object .class );
59
- Map <String , Object > messageMap = (Map <String , Object >) super .decode (dataBuffer , type , mimeType , hints );
66
+ ResolvableType type = ResolvableType .forClassWithGenerics (Map .class , String .class ,
67
+ Object .class );
68
+ Map <String , Object > messageMap = (Map <String , Object >) doDecode (dataBuffer , type ,
69
+ mimeType , hints );
60
70
if (messageMap .containsKey (FunctionRSocketUtils .PAYLOAD )) {
61
71
Type requestedType = FunctionTypeUtils .getGenericType (targetType .getType ());
62
- Object payload = this .jsonMapper .fromJson (messageMap .get (FunctionRSocketUtils .PAYLOAD ), requestedType );
72
+ Object payload = this .jsonMapper .fromJson (
73
+ messageMap .get (FunctionRSocketUtils .PAYLOAD ), requestedType );
63
74
64
75
if (FunctionTypeUtils .isMessage (targetType .getType ())) {
65
- return MessageBuilder .withPayload (payload )
66
- . copyHeaders ( (Map <String , ?>) messageMap .get (FunctionRSocketUtils .HEADERS ))
67
- .build ();
76
+ return MessageBuilder .withPayload (payload ). copyHeaders (
77
+ (Map <String , ?>) messageMap .get (FunctionRSocketUtils .HEADERS ))
78
+ .build ();
68
79
}
69
80
else {
70
81
return payload ;
@@ -73,6 +84,45 @@ public Object decode(DataBuffer dataBuffer, ResolvableType targetType,
73
84
else {
74
85
return messageMap ;
75
86
}
87
+ }
88
+
89
+ private Object doDecode (DataBuffer dataBuffer , ResolvableType targetType ,
90
+ @ Nullable MimeType mimeType , @ Nullable Map <String , Object > hints )
91
+ throws DecodingException {
92
+
93
+ try {
94
+ byte [] data = toByteArray (dataBuffer .asInputStream ());
95
+ return this .jsonMapper .fromJson (data , targetType .getType ());
96
+ }
97
+ catch (IOException ex ) {
98
+ throw new IllegalStateException (ex );
99
+ }
100
+ finally {
101
+ DataBufferUtils .release (dataBuffer );
102
+ }
103
+ }
104
+
105
+ private byte [] toByteArray (final InputStream input ) throws IOException {
106
+ try (ByteArrayOutputStream output = new ByteArrayOutputStream ()) {
107
+ copyLarge (input , output , new byte [2048 ]);
108
+ return output .toByteArray ();
109
+ }
110
+ }
76
111
112
+ private long copyLarge (final InputStream input , final OutputStream output ,
113
+ final byte [] buffer ) throws IOException {
114
+ long count = 0 ;
115
+ int n ;
116
+ while (-1 != (n = input .read (buffer ))) {
117
+ output .write (buffer , 0 , n );
118
+ count += n ;
119
+ }
120
+ return count ;
121
+ }
122
+
123
+ @ Override
124
+ public Flux <Object > decode (Publisher <DataBuffer > inputStream ,
125
+ ResolvableType elementType , MimeType mimeType , Map <String , Object > hints ) {
126
+ return Flux .from (inputStream ).map (buffer -> decode (buffer , elementType , mimeType , hints ));
77
127
}
78
128
}
0 commit comments