1
1
package io .reactivex .lab .edge ;
2
2
3
3
import io .netty .buffer .ByteBuf ;
4
- import io .netty .buffer .ByteBufAllocator ;
5
4
import io .netty .handler .codec .http .HttpResponseStatus ;
6
5
import io .reactivex .lab .edge .routes .RouteForDeviceHome ;
7
6
import io .reactivex .lab .edge .routes .mock .TestRouteBasic ;
11
10
import io .reactivex .netty .pipeline .PipelineConfigurators ;
12
11
import io .reactivex .netty .protocol .http .server .HttpServerRequest ;
13
12
import io .reactivex .netty .protocol .http .server .HttpServerResponse ;
14
- import io .reactivex .netty .protocol .text .sse .ServerSentEvent ;
15
- import io .reactivex .netty .serialization .ContentTransformer ;
16
13
17
14
import java .util .concurrent .TimeUnit ;
18
15
19
16
import rx .Observable ;
20
17
import rx .Subscriber ;
21
- import rx .functions .Func0 ;
22
- import rx .operators .OperatorDefer ;
23
18
import rx .subscriptions .Subscriptions ;
24
19
25
20
import com .netflix .hystrix .HystrixRequestLog ;
@@ -43,7 +38,7 @@ public static void main(String... args) {
43
38
// start web services => http://localhost:8080
44
39
RxNetty .createHttpServer (8080 , (request , response ) -> {
45
40
System .out .println ("Server => Request: " + request .getPath ());
46
- return defer (() -> {
41
+ return Observable . defer (() -> {
47
42
HystrixRequestContext .initializeContext ();
48
43
try {
49
44
return handleRoutes (request , response );
@@ -79,9 +74,6 @@ private static Observable<Void> handleRoutes(HttpServerRequest<ByteBuf> request,
79
74
return TestRouteWithSimpleFaultTolerance .handle (request , response );
80
75
} else if (request .getPath ().equals ("/testWithHystrix" )) {
81
76
return TestRouteWithHystrix .handle (request , response );
82
- } else if (request .getPath ().endsWith (".js" )) {
83
- System .out .println ("Server => Javascript Request: " + request .getPath ());
84
- return JavascriptRuntime .getInstance ().handleRequest (request , response );
85
77
} else {
86
78
return writeError (request , response , "Unknown path: " + request .getPath ());
87
79
}
@@ -93,13 +85,13 @@ private static void startHystrixMetricsStream() {
93
85
response .getHeaders ().add ("content-type" , "text/event-stream" );
94
86
return Observable .create ((Subscriber <? super Void > s ) -> {
95
87
s .add (streamPoller .subscribe (json -> {
96
- response .writeAndFlush ( new ServerSentEvent ( "" , "data" , json ) );
88
+ response .writeStringAndFlush ( "data: " + json );
97
89
}, error -> {
98
90
s .onError (error );
99
91
}));
100
92
101
93
s .add (Observable .interval (1000 , TimeUnit .MILLISECONDS ).flatMap (n -> {
102
- return response .writeAndFlush ( new ServerSentEvent ( "" , " ping" , "" ) )
94
+ return response .writeStringAndFlush ( " ping:" )
103
95
.onErrorReturn (e -> {
104
96
System .out .println ("Connection closed, unsubscribing from Hystrix Stream" );
105
97
s .unsubscribe ();
@@ -132,25 +124,4 @@ public static Observable<Void> writeError(HttpServerRequest<?> request, HttpServ
132
124
return response .writeStringAndFlush ("Error 500: " + message );
133
125
}
134
126
135
- public static SSETransformer SSE_TRANSFORMER = new SSETransformer ();
136
-
137
- private static class SSETransformer implements ContentTransformer <ServerSentEvent > {
138
- @ Override
139
- public ByteBuf transform (ServerSentEvent toTransform , ByteBufAllocator byteBufAllocator ) {
140
- StringBuilder eventBuilder = new StringBuilder ();
141
- eventBuilder .append (toTransform .getEventName ());
142
- eventBuilder .append (": " );
143
- eventBuilder .append (toTransform .getEventData ());
144
- eventBuilder .append ("\n \n " );
145
- String data = eventBuilder .toString ();
146
- return byteBufAllocator .buffer (data .length ()).writeBytes (data .getBytes ());
147
- }
148
- }
149
-
150
- /*
151
- * Workaround for Java 8 issue: https://github.com/Netflix/RxJava/issues/1157
152
- */
153
- public final static <T > Observable <T > defer (Func0 <Observable <T >> observableFactory ) {
154
- return Observable .create (new OperatorDefer <T >(observableFactory ));
155
- }
156
127
}
0 commit comments