Skip to content

Commit 5b955db

Browse files
Merge pull request #4 from NiteshKant/master
Remove deprecated SSE class uses
2 parents 4ffc4b2 + e4ff609 commit 5b955db

19 files changed

+47
-89
lines changed

reactive-lab-edge/src/main/java/io/reactivex/lab/edge/EdgeServer.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ private static void startHystrixMetricsStream() {
9999
});
100100
}).subscribe());
101101
});
102-
}, PipelineConfigurators.<ByteBuf> sseServerConfigurator()).start();
102+
}, PipelineConfigurators.<ByteBuf> serveSseConfigurator()).start();
103103
}
104104

105105
final static Observable<String> streamPoller = Observable.create((Subscriber<? super String> s) -> {

reactive-lab-edge/src/main/java/io/reactivex/lab/edge/clients/BookmarksCommand.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ protected Observable<Bookmark> run() {
3737
.submit(HttpClientRequest.createGet("/bookmarks?" + UrlGenerator.generate("videoId", videos)))
3838
.flatMap(r -> {
3939
Observable<Bookmark> bytesToJson = r.getContent().map(sse -> {
40-
return Bookmark.fromJson(sse.getEventData());
40+
return Bookmark.fromJson(sse.contentAsString());
4141
});
4242
return bytesToJson;
4343
});

reactive-lab-edge/src/main/java/io/reactivex/lab/edge/clients/GeoCommand.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ protected Observable<GeoIP> run() {
2828
.submit(HttpClientRequest.createGet("/geo?" + UrlGenerator.generate("ip", ips)))
2929
.flatMap(r -> {
3030
Observable<GeoIP> bytesToJson = r.getContent().map(sse -> {
31-
return GeoIP.fromJson(sse.getEventData());
31+
return GeoIP.fromJson(sse.contentAsString());
3232
});
3333
return bytesToJson;
3434
});

reactive-lab-edge/src/main/java/io/reactivex/lab/edge/clients/PersonalizedCatalogCommand.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ protected Observable<Catalog> run() {
3535
.submit(HttpClientRequest.createGet("/catalog?" + UrlGenerator.generate("userId", users)))
3636
.flatMap(r -> {
3737
Observable<Catalog> bytesToJson = r.getContent().map(sse -> {
38-
return Catalog.fromJson(sse.getEventData());
38+
return Catalog.fromJson(sse.contentAsString());
3939
});
4040
return bytesToJson;
4141
});

reactive-lab-edge/src/main/java/io/reactivex/lab/edge/clients/RatingsCommand.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ protected Observable<Rating> run() {
3434
.submit(HttpClientRequest.createGet("/ratings?" + UrlGenerator.generate("videoId", videos)))
3535
.flatMap(r -> {
3636
Observable<Rating> bytesToJson = r.getContent().map(sse -> {
37-
return Rating.fromJson(sse.getEventData());
37+
return Rating.fromJson(sse.contentAsString());
3838
});
3939
return bytesToJson;
4040
});

reactive-lab-edge/src/main/java/io/reactivex/lab/edge/clients/SocialCommand.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ protected Observable<Social> run() {
3535
.submit(HttpClientRequest.createGet("/social?" + UrlGenerator.generate("userId", users)))
3636
.flatMap(r -> {
3737
Observable<Social> bytesToJson = r.getContent().map(sse -> {
38-
return Social.fromJson(sse.getEventData());
38+
return Social.fromJson(sse.contentAsString());
3939
});
4040
return bytesToJson;
4141
});

reactive-lab-edge/src/main/java/io/reactivex/lab/edge/clients/UserCommand.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ protected Observable<User> run() {
2828
.submit(HttpClientRequest.createGet("/user?" + UrlGenerator.generate("userId", userIds)))
2929
.flatMap(r -> {
3030
Observable<User> user = r.getContent().map(sse -> {
31-
return User.fromJson(sse.getEventData());
31+
return User.fromJson(sse.contentAsString());
3232
});
3333
return user;
3434
});

reactive-lab-edge/src/main/java/io/reactivex/lab/edge/clients/VideoMetadataCommand.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ protected Observable<VideoMetadata> run() {
3535
.submit(HttpClientRequest.createGet("/metadata?" + UrlGenerator.generate("videoId", videos)))
3636
.flatMap(r -> {
3737
Observable<VideoMetadata> bytesToJson = r.getContent().map(sse -> {
38-
return VideoMetadata.fromJson(sse.getEventData());
38+
return VideoMetadata.fromJson(sse.contentAsString());
3939
});
4040
return bytesToJson;
4141
});

reactive-lab-edge/src/main/java/io/reactivex/lab/edge/common/RxNettySSE.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -10,23 +10,23 @@
1010
import io.reactivex.netty.protocol.http.server.HttpServer;
1111
import io.reactivex.netty.protocol.http.server.HttpServerBuilder;
1212
import io.reactivex.netty.protocol.http.server.RequestHandler;
13-
import io.reactivex.netty.protocol.text.sse.ServerSentEvent;
13+
import io.reactivex.netty.protocol.http.sse.ServerSentEvent;
1414

1515
public class RxNettySSE {
1616

1717
private static final EventLoopGroup EVENT_LOOP = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors(), new RxNettyThreadFactory());
1818

1919
public static HttpServer<ByteBuf, ServerSentEvent> createHttpServer(int port,
2020
RequestHandler<ByteBuf, ServerSentEvent> requestHandler) {
21-
return new HttpServerBuilder<ByteBuf, ServerSentEvent>(port, requestHandler)
22-
.pipelineConfigurator(PipelineConfigurators.<ByteBuf> sseServerConfigurator())
21+
return new HttpServerBuilder<>(port, requestHandler)
22+
.pipelineConfigurator(PipelineConfigurators.<ByteBuf> serveSseConfigurator())
2323
.eventLoop(EVENT_LOOP)
2424
.build();
2525
}
2626

2727
public static HttpClient<ByteBuf, ServerSentEvent> createHttpClient(String host, int port) {
2828
return new HttpClientBuilder<ByteBuf, ServerSentEvent>(host, port)
29-
.pipelineConfigurator(PipelineConfigurators.<ByteBuf> sseClientConfigurator())
29+
.pipelineConfigurator(PipelineConfigurators.<ByteBuf> clientSseConfigurator())
3030
.eventloop(EVENT_LOOP)
3131
.build();
3232

reactive-lab-services/src/main/java/io/reactivex/lab/services/MiddleTierService.java

+6-5
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,12 @@
22

33
import io.netty.buffer.ByteBuf;
44
import io.netty.handler.codec.http.HttpResponseStatus;
5-
import io.reactivex.lab.services.common.RxNettySSE;
5+
import io.reactivex.netty.RxNetty;
6+
import io.reactivex.netty.pipeline.PipelineConfigurators;
67
import io.reactivex.netty.protocol.http.server.HttpServer;
78
import io.reactivex.netty.protocol.http.server.HttpServerRequest;
89
import io.reactivex.netty.protocol.http.server.HttpServerResponse;
9-
import io.reactivex.netty.protocol.text.sse.ServerSentEvent;
10+
import io.reactivex.netty.protocol.http.sse.ServerSentEvent;
1011

1112
import java.util.List;
1213

@@ -16,17 +17,17 @@ public abstract class MiddleTierService {
1617

1718
public HttpServer<ByteBuf, ServerSentEvent> createServer(int port) {
1819
System.out.println("Start " + getClass().getSimpleName() + " on port: " + port);
19-
return RxNettySSE.createHttpServer(port, (request, response) -> {
20+
return RxNetty.createHttpServer(port, (request, response) -> {
2021
// System.out.println("Server => Request: " + request.getPath());
2122
try {
2223
return handleRequest(request, response);
2324
} catch (Throwable e) {
2425
e.printStackTrace();
2526
System.err.println("Server => Error [" + request.getPath() + "] => " + e);
2627
response.setStatus(HttpResponseStatus.BAD_REQUEST);
27-
return response.writeAndFlush(new ServerSentEvent("1", "data:", "Error 500: Bad Request\n" + e.getMessage() + "\n"));
28+
return response.writeStringAndFlush("data: Error 500: Bad Request\n" + e.getMessage() + "\n");
2829
}
29-
});
30+
}, PipelineConfigurators.<ByteBuf> serveSseConfigurator());
3031
}
3132

3233
protected abstract Observable<Void> handleRequest(HttpServerRequest<?> request, HttpServerResponse<ServerSentEvent> response);

reactive-lab-services/src/main/java/io/reactivex/lab/services/common/RxNettySSE.java

-42
This file was deleted.

reactive-lab-services/src/main/java/io/reactivex/lab/services/impls/BookmarksService.java

+3-4
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import io.reactivex.lab.services.common.SimpleJson;
66
import io.reactivex.netty.protocol.http.server.HttpServerRequest;
77
import io.reactivex.netty.protocol.http.server.HttpServerResponse;
8-
import io.reactivex.netty.protocol.text.sse.ServerSentEvent;
8+
import io.reactivex.netty.protocol.http.sse.ServerSentEvent;
99

1010
import java.util.HashMap;
1111
import java.util.List;
@@ -30,8 +30,7 @@ protected Observable<Void> handleRequest(HttpServerRequest<?> request, HttpServe
3030
video.put("videoId", videoId);
3131
video.put("position", (int) (Math.random() * 5000));
3232
return video;
33-
}).flatMap(video -> {
34-
return response.writeAndFlush(new ServerSentEvent("", "data", SimpleJson.mapToJson(video)));
35-
}).delay(latency, TimeUnit.MILLISECONDS); // simulate latency
33+
}).flatMap(video -> response.writeStringAndFlush("data: " + SimpleJson.mapToJson(video) + "\n"))
34+
.delay(latency, TimeUnit.MILLISECONDS).doOnCompleted(response::close); // simulate latency
3635
}
3736
}

reactive-lab-services/src/main/java/io/reactivex/lab/services/impls/GeoService.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import io.reactivex.lab.services.common.SimpleJson;
55
import io.reactivex.netty.protocol.http.server.HttpServerRequest;
66
import io.reactivex.netty.protocol.http.server.HttpServerResponse;
7-
import io.reactivex.netty.protocol.text.sse.ServerSentEvent;
7+
import io.reactivex.netty.protocol.http.sse.ServerSentEvent;
88

99
import java.util.HashMap;
1010
import java.util.List;
@@ -27,7 +27,8 @@ protected Observable<Void> handleRequest(HttpServerRequest<?> request, HttpServe
2727
ip_data.put("latitude", "51.5");
2828
data.put(ip, ip_data);
2929
}
30-
return response.writeAndFlush(new ServerSentEvent("", "data", SimpleJson.mapToJson(data)));
30+
return response.writeStringAndFlush("data: " + SimpleJson.mapToJson(data) + "\n")
31+
.doOnCompleted(response::close);
3132
}).delay(10, TimeUnit.MILLISECONDS);
3233
}
3334
}

reactive-lab-services/src/main/java/io/reactivex/lab/services/impls/MockService.java

+5-4
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,11 @@
44
import io.reactivex.lab.services.MiddleTierService;
55
import io.reactivex.netty.protocol.http.server.HttpServerRequest;
66
import io.reactivex.netty.protocol.http.server.HttpServerResponse;
7-
import io.reactivex.netty.protocol.text.sse.ServerSentEvent;
7+
import io.reactivex.netty.protocol.http.sse.ServerSentEvent;
8+
import rx.Observable;
89

910
import java.util.List;
1011

11-
import rx.Observable;
12-
1312
public class MockService extends MiddleTierService {
1413

1514
@Override
@@ -40,7 +39,9 @@ protected Observable<Void> handleRequest(HttpServerRequest<?> request, HttpServe
4039
}
4140

4241
response.setStatus(HttpResponseStatus.OK);
43-
return MockResponse.generateJson(id, delay, itemSize, numItems).flatMap(json -> response.writeStringAndFlush(json));
42+
return MockResponse.generateJson(id, delay, itemSize, numItems)
43+
.flatMap(json -> response.writeStringAndFlush("data:" + json + "\n"))
44+
.doOnCompleted(response::close);
4445
}
4546

4647
}

reactive-lab-services/src/main/java/io/reactivex/lab/services/impls/PersonalizedCatalogService.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import io.reactivex.lab.services.common.SimpleJson;
55
import io.reactivex.netty.protocol.http.server.HttpServerRequest;
66
import io.reactivex.netty.protocol.http.server.HttpServerResponse;
7-
import io.reactivex.netty.protocol.text.sse.ServerSentEvent;
7+
import io.reactivex.netty.protocol.http.sse.ServerSentEvent;
88

99
import java.util.Arrays;
1010
import java.util.HashMap;
@@ -25,8 +25,8 @@ protected Observable<Void> handleRequest(HttpServerRequest<?> request, HttpServe
2525
userData.put("other_data", "goes_here");
2626
userData.put("videos", Arrays.asList(12345, 23456, 34567, 45678, 56789, 67890));
2727
return userData;
28-
}).flatMap(list -> {
29-
return response.writeAndFlush(new ServerSentEvent("", "data", SimpleJson.mapToJson(list)));
30-
}).delay(((long) (Math.random() * 100) + 20), TimeUnit.MILLISECONDS); // simulate latency
28+
}).flatMap(list -> response.writeStringAndFlush("data: " + SimpleJson.mapToJson(list) + "\n"))
29+
.delay(((long) (Math.random() * 100) + 20), TimeUnit.MILLISECONDS)
30+
.doOnCompleted(response::close); // simulate latency
3131
}
3232
}

reactive-lab-services/src/main/java/io/reactivex/lab/services/impls/RatingsService.java

+3-4
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import io.reactivex.lab.services.common.SimpleJson;
55
import io.reactivex.netty.protocol.http.server.HttpServerRequest;
66
import io.reactivex.netty.protocol.http.server.HttpServerResponse;
7-
import io.reactivex.netty.protocol.text.sse.ServerSentEvent;
7+
import io.reactivex.netty.protocol.http.sse.ServerSentEvent;
88

99
import java.util.HashMap;
1010
import java.util.List;
@@ -25,8 +25,7 @@ protected Observable<Void> handleRequest(HttpServerRequest<?> request, HttpServe
2525
video.put("actual_user_rating", 4);
2626
video.put("average_user_rating", 3.1);
2727
return video;
28-
}).flatMap(video -> {
29-
return response.writeAndFlush(new ServerSentEvent("", "data", SimpleJson.mapToJson(video)));
30-
}).delay(20, TimeUnit.MILLISECONDS); // simulate latenc
28+
}).flatMap(video -> response.writeStringAndFlush("data : " + SimpleJson.mapToJson(video) + "\n"))
29+
.delay(20, TimeUnit.MILLISECONDS).doOnCompleted(response::close); // simulate latenc
3130
}
3231
}

reactive-lab-services/src/main/java/io/reactivex/lab/services/impls/SocialService.java

+3-4
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import io.reactivex.lab.services.common.SimpleJson;
55
import io.reactivex.netty.protocol.http.server.HttpServerRequest;
66
import io.reactivex.netty.protocol.http.server.HttpServerResponse;
7-
import io.reactivex.netty.protocol.text.sse.ServerSentEvent;
7+
import io.reactivex.netty.protocol.http.sse.ServerSentEvent;
88

99
import java.util.Arrays;
1010
import java.util.HashMap;
@@ -22,9 +22,8 @@ protected Observable<Void> handleRequest(HttpServerRequest<?> request, HttpServe
2222
user.put("userId", userId);
2323
user.put("friends", Arrays.asList(randomUser(), randomUser(), randomUser(), randomUser()));
2424
return user;
25-
}).flatMap(list -> {
26-
return response.writeAndFlush(new ServerSentEvent("", "data", SimpleJson.mapToJson(list)));
27-
}).delay(((long) (Math.random() * 100) + 20), TimeUnit.MILLISECONDS); // simulate latency
25+
}).flatMap(list -> response.writeStringAndFlush("data: " + SimpleJson.mapToJson(list) + "\n"))
26+
.delay(((long) (Math.random() * 100) + 20), TimeUnit.MILLISECONDS).doOnCompleted(response::close); // simulate latency
2827
}
2928

3029
private static int randomUser() {

reactive-lab-services/src/main/java/io/reactivex/lab/services/impls/UserService.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import io.reactivex.lab.services.common.SimpleJson;
55
import io.reactivex.netty.protocol.http.server.HttpServerRequest;
66
import io.reactivex.netty.protocol.http.server.HttpServerResponse;
7-
import io.reactivex.netty.protocol.text.sse.ServerSentEvent;
7+
import io.reactivex.netty.protocol.http.sse.ServerSentEvent;
88

99
import java.util.HashMap;
1010
import java.util.List;
@@ -27,8 +27,8 @@ protected Observable<Void> handleRequest(HttpServerRequest<?> request, HttpServe
2727
user.put("name", "Name Here");
2828
user.put("other_data", "goes_here");
2929
return user;
30-
}).flatMap(user -> {
31-
return response.writeAndFlush(new ServerSentEvent("1", "data", SimpleJson.mapToJson(user)));
32-
}).delay(((long) (Math.random() * 20) + 1500), TimeUnit.MILLISECONDS); // simulate latency
30+
}).flatMap(user -> response.writeStringAndFlush("data: " + SimpleJson.mapToJson(user) + "\n")
31+
.doOnCompleted(response::close))
32+
.delay(((long) (Math.random() * 20) + 1500), TimeUnit.MILLISECONDS); // simulate latency
3333
}
3434
}

reactive-lab-services/src/main/java/io/reactivex/lab/services/impls/VideoMetadataService.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import io.reactivex.lab.services.common.SimpleJson;
55
import io.reactivex.netty.protocol.http.server.HttpServerRequest;
66
import io.reactivex.netty.protocol.http.server.HttpServerResponse;
7-
import io.reactivex.netty.protocol.text.sse.ServerSentEvent;
7+
import io.reactivex.netty.protocol.http.sse.ServerSentEvent;
88

99
import java.util.HashMap;
1010
import java.util.List;
@@ -24,8 +24,8 @@ protected Observable<Void> handleRequest(HttpServerRequest<?> request, HttpServe
2424
video.put("title", "Video Title");
2525
video.put("other_data", "goes_here");
2626
return video;
27-
}).flatMap(video -> {
28-
return response.writeAndFlush(new ServerSentEvent("", "data", SimpleJson.mapToJson(video)));
29-
}).delay(((long) (Math.random() * 20) + 20), TimeUnit.MILLISECONDS); // simulate latency
27+
}).flatMap(video -> response.writeStringAndFlush("data: " + SimpleJson.mapToJson(video) + "\n")
28+
.doOnCompleted(response::close))
29+
.delay(((long) (Math.random() * 20) + 20), TimeUnit.MILLISECONDS); // simulate latency
3030
}
3131
}

0 commit comments

Comments
 (0)