Skip to content

Commit 05081d6

Browse files
author
Nitesh Kant
committed
Integrating with eureka and ocelli
1 parent 32af53e commit 05081d6

11 files changed

+235
-170
lines changed

reactive-lab-gateway/src/main/java/io/reactivex/lab/gateway/APIServiceLayer.java

-23
This file was deleted.

reactive-lab-gateway/src/main/java/io/reactivex/lab/gateway/StartGatewayServer.java

+38-3
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,18 @@
11
package io.reactivex.lab.gateway;
22

3+
import com.netflix.eureka2.client.Eureka;
4+
import com.netflix.eureka2.client.EurekaClient;
5+
import com.netflix.eureka2.client.resolver.ServerResolver;
6+
import com.netflix.eureka2.client.resolver.ServerResolvers;
7+
import com.netflix.eureka2.interests.Interests;
8+
import com.netflix.eureka2.transport.EurekaTransports;
39
import com.netflix.hystrix.HystrixRequestLog;
410
import com.netflix.hystrix.contrib.metrics.eventstream.HystrixMetricsPoller;
511
import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext;
612
import io.netty.buffer.ByteBuf;
713
import io.netty.handler.codec.http.HttpResponseStatus;
14+
import io.netty.handler.logging.LogLevel;
15+
import io.reactivex.lab.gateway.clients.LoadBalancerFactory;
816
import io.reactivex.lab.gateway.routes.RouteForDeviceHome;
917
import io.reactivex.lab.gateway.routes.mock.TestRouteBasic;
1018
import io.reactivex.lab.gateway.routes.mock.TestRouteWithHystrix;
@@ -13,19 +21,45 @@
1321
import io.reactivex.netty.pipeline.PipelineConfigurators;
1422
import io.reactivex.netty.protocol.http.server.HttpServerRequest;
1523
import io.reactivex.netty.protocol.http.server.HttpServerResponse;
24+
import io.reactivex.netty.protocol.http.sse.ServerSentEvent;
25+
import netflix.ocelli.Host;
26+
import netflix.ocelli.eureka.EurekaMembershipSource;
27+
import netflix.ocelli.rxnetty.HttpClientPool;
1628
import rx.Observable;
1729
import rx.Subscriber;
1830
import rx.subscriptions.Subscriptions;
1931

2032
import java.util.concurrent.TimeUnit;
2133

34+
import static io.reactivex.netty.pipeline.PipelineConfigurators.clientSseConfigurator;
35+
2236
public class StartGatewayServer {
2337

38+
private static RouteForDeviceHome routeForDeviceHome;
39+
2440
public static void main(String... args) {
2541
// hystrix stream => http://localhost:9999
2642
startHystrixMetricsStream();
2743

28-
RouteForDeviceHome.getInstance();
44+
ServerResolver.Server discoveryServer = new ServerResolver.Server("127.0.0.1", 7001);
45+
ServerResolver.Server registrationServer = new ServerResolver.Server("127.0.0.1", 7001);
46+
EurekaClient client = Eureka.newClientBuilder(ServerResolvers.from(discoveryServer),
47+
ServerResolvers.from(registrationServer))
48+
.withCodec(EurekaTransports.Codec.Json)
49+
.build();
50+
EurekaMembershipSource membershipSource = new EurekaMembershipSource(client);
51+
LoadBalancerFactory loadBalancerFactory = new LoadBalancerFactory(membershipSource,
52+
new HttpClientPool<>((Host host) -> RxNetty.<ByteBuf, ServerSentEvent>newHttpClientBuilder(host.getHostName(), host.getPort())
53+
.pipelineConfigurator(clientSseConfigurator())
54+
.enableWireLogging(LogLevel.ERROR)
55+
.build()));
56+
57+
/**
58+
* This is making sure that eureka's client registry is warmed up.
59+
*/
60+
client.forInterest(Interests.forFullRegistry()).take(1).toBlocking().single();
61+
62+
routeForDeviceHome = new RouteForDeviceHome(loadBalancerFactory);
2963

3064
System.out.println("Server => Starting at http://localhost:8080/");
3165
System.out.println(" Sample URLs: ");
@@ -41,7 +75,7 @@ public static void main(String... args) {
4175
return Observable.defer(() -> {
4276
HystrixRequestContext.initializeContext();
4377
try {
44-
return handleRoutes(request, response).doOnCompleted(response::close);
78+
return handleRoutes(request, response);
4579
} catch (Throwable e) {
4680
System.err.println("Server => Error [" + request.getPath() + "] => " + e);
4781
response.setStatus(HttpResponseStatus.BAD_REQUEST);
@@ -58,6 +92,7 @@ public static void main(String... args) {
5892
} else {
5993
System.err.println("HystrixRequestContext not initialized for thread: " + Thread.currentThread());
6094
}
95+
response.close();
6196
});
6297
}).startAndWait();
6398
}
@@ -67,7 +102,7 @@ public static void main(String... args) {
67102
*/
68103
private static Observable<Void> handleRoutes(HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) {
69104
if (request.getPath().equals("/device/home")) {
70-
return RouteForDeviceHome.getInstance().handle(request, response);
105+
return routeForDeviceHome.handle(request, response);
71106
} else if (request.getPath().equals("/testBasic")) {
72107
return TestRouteBasic.handle(request, response);
73108
} else if (request.getPath().equals("/testWithSimpleFaultTolerance")) {

reactive-lab-gateway/src/main/java/io/reactivex/lab/gateway/clients/BookmarksCommand.java

+4-23
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,6 @@
66
import io.reactivex.lab.gateway.clients.BookmarksCommand.Bookmark;
77
import io.reactivex.lab.gateway.clients.PersonalizedCatalogCommand.Video;
88
import io.reactivex.lab.gateway.common.SimpleJson;
9-
import io.reactivex.netty.RxNetty;
10-
import io.reactivex.netty.pipeline.PipelineConfigurators;
119
import io.reactivex.netty.protocol.http.client.HttpClientRequest;
1210
import io.reactivex.netty.protocol.http.sse.ServerSentEvent;
1311
import netflix.ocelli.LoadBalancer;
@@ -38,30 +36,13 @@ public BookmarksCommand(List<Video> videos, LoadBalancer<HttpClientHolder<ByteBu
3836

3937
@Override
4038
public Observable<Bookmark> run() {
41-
/*
4239
HttpClientRequest<ByteBuf> request = HttpClientRequest.createGet( "/bookmarks?"
4340
+ UrlGenerator.generate("videoId", videos));
4441
return loadBalancer.choose()
45-
.map(holder -> {
46-
return holder.getClient();
47-
})
48-
.flatMap(client -> {
49-
return client.submit(request)
50-
.flatMap(r -> {
51-
return r.getContent().map(sse -> {
52-
return Bookmark.fromJson(sse.contentAsString());
53-
});
54-
});
55-
});
56-
*/
57-
return RxNetty.createHttpClient("localhost", 9190, PipelineConfigurators.<ByteBuf>clientSseConfigurator())
58-
.submit(HttpClientRequest.createGet("/bookmarks?" + UrlGenerator.generate("videoId", videos)))
59-
.flatMap(r -> r.getContent().map(sse -> {
60-
String json = sse.contentAsString();
61-
System.out.println("Returning bookmark: " + json);
62-
return Bookmark.fromJson(json);
63-
}));
64-
42+
.map(holder -> holder.getClient())
43+
.flatMap(client -> client.submit(request)
44+
.flatMap(r -> r.getContent().map(sse -> Bookmark.fromJson(
45+
sse.contentAsString()))));
6546
}
6647

6748
protected Observable<Bookmark> getFallback() {

reactive-lab-gateway/src/main/java/io/reactivex/lab/gateway/clients/GeoCommand.java

+11-6
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,10 @@
55
import io.netty.buffer.ByteBuf;
66
import io.reactivex.lab.gateway.clients.GeoCommand.GeoIP;
77
import io.reactivex.lab.gateway.common.SimpleJson;
8-
import io.reactivex.netty.RxNetty;
9-
import io.reactivex.netty.pipeline.PipelineConfigurators;
108
import io.reactivex.netty.protocol.http.client.HttpClientRequest;
9+
import io.reactivex.netty.protocol.http.sse.ServerSentEvent;
10+
import netflix.ocelli.LoadBalancer;
11+
import netflix.ocelli.rxnetty.HttpClientHolder;
1112
import rx.Observable;
1213

1314
import java.util.List;
@@ -16,17 +17,21 @@
1617
public class GeoCommand extends HystrixObservableCommand<GeoIP> {
1718

1819
private final List<String> ips;
20+
private final LoadBalancer<HttpClientHolder<ByteBuf, ServerSentEvent>> loadBalancer;
1921

20-
public GeoCommand(List<String> ips) {
22+
public GeoCommand(List<String> ips, LoadBalancer<HttpClientHolder<ByteBuf, ServerSentEvent>> loadBalancer) {
2123
super(HystrixCommandGroupKey.Factory.asKey("GeoIP"));
2224
this.ips = ips;
25+
this.loadBalancer = loadBalancer;
2326
}
2427

2528
@Override
2629
protected Observable<GeoIP> run() {
27-
return RxNetty.createHttpClient("localhost", 9191, PipelineConfigurators.<ByteBuf>clientSseConfigurator())
28-
.submit(HttpClientRequest.createGet("/geo?" + UrlGenerator.generate("ip", ips)))
29-
.flatMap(r -> r.getContent().map(sse -> GeoIP.fromJson(sse.contentAsString())));
30+
HttpClientRequest<ByteBuf> request = HttpClientRequest.createGet("/geo?" + UrlGenerator.generate("ip", ips));
31+
return loadBalancer.choose().map(holder -> holder.getClient())
32+
.flatMap(client -> client.submit(request)
33+
.flatMap(r -> r.getContent()
34+
.map(sse -> GeoIP.fromJson(sse.contentAsString()))));
3035
}
3136

3237
public static class GeoIP {

reactive-lab-gateway/src/main/java/io/reactivex/lab/gateway/clients/MiddleTierLoadBalancer.java reactive-lab-gateway/src/main/java/io/reactivex/lab/gateway/clients/LoadBalancerFactory.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,13 @@
2121
*
2222
* @author Nitesh Kant
2323
*/
24-
public class MiddleTierLoadBalancer {
24+
public class LoadBalancerFactory {
2525

2626
private final EurekaMembershipSource membershipSource;
2727
private final HttpClientPool<ByteBuf, ServerSentEvent> clientPool;
2828

29-
public MiddleTierLoadBalancer(EurekaMembershipSource membershipSource,
30-
HttpClientPool<ByteBuf, ServerSentEvent> clientPool) {
29+
public LoadBalancerFactory(EurekaMembershipSource membershipSource,
30+
HttpClientPool<ByteBuf, ServerSentEvent> clientPool) {
3131
this.membershipSource = membershipSource;
3232
this.clientPool = clientPool;
3333
}

reactive-lab-gateway/src/main/java/io/reactivex/lab/gateway/clients/PersonalizedCatalogCommand.java

+17-12
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,10 @@
66
import io.reactivex.lab.gateway.clients.PersonalizedCatalogCommand.Catalog;
77
import io.reactivex.lab.gateway.clients.UserCommand.User;
88
import io.reactivex.lab.gateway.common.SimpleJson;
9-
import io.reactivex.netty.RxNetty;
10-
import io.reactivex.netty.pipeline.PipelineConfigurators;
119
import io.reactivex.netty.protocol.http.client.HttpClientRequest;
10+
import io.reactivex.netty.protocol.http.sse.ServerSentEvent;
11+
import netflix.ocelli.LoadBalancer;
12+
import netflix.ocelli.rxnetty.HttpClientHolder;
1213
import rx.Observable;
1314

1415
import java.util.Arrays;
@@ -18,26 +19,30 @@
1819
public class PersonalizedCatalogCommand extends HystrixObservableCommand<Catalog> {
1920

2021
private final List<User> users;
22+
private final LoadBalancer<HttpClientHolder<ByteBuf, ServerSentEvent>> loadBalancer;
2123

22-
public PersonalizedCatalogCommand(User user) {
23-
this(Arrays.asList(user));
24+
public PersonalizedCatalogCommand(User user, LoadBalancer<HttpClientHolder<ByteBuf, ServerSentEvent>> loadBalancer) {
25+
this(Arrays.asList(user), loadBalancer);
2426
// replace with HystrixCollapser
2527
}
2628

27-
public PersonalizedCatalogCommand(List<User> users) {
29+
public PersonalizedCatalogCommand(List<User> users, LoadBalancer<HttpClientHolder<ByteBuf, ServerSentEvent>> loadBalancer) {
2830
super(HystrixCommandGroupKey.Factory.asKey("PersonalizedCatalog"));
2931
this.users = users;
32+
this.loadBalancer = loadBalancer;
3033
}
3134

3235
@Override
3336
protected Observable<Catalog> run() {
34-
return RxNetty.createHttpClient("localhost", 9192, PipelineConfigurators.<ByteBuf>clientSseConfigurator())
35-
.submit(HttpClientRequest.createGet("/catalog?" + UrlGenerator.generate("userId", users)))
36-
.flatMap(r -> r.getContent().map(sse -> {
37-
String catalog = sse.contentAsString();
38-
System.out.println("catalog = " + catalog);
39-
return Catalog.fromJson(catalog);
40-
}));
37+
HttpClientRequest<ByteBuf> request = HttpClientRequest.createGet("/catalog?" + UrlGenerator.generate("userId",
38+
users));
39+
return loadBalancer.choose().map(holder -> holder.getClient())
40+
.flatMap(client -> client.submit(request)
41+
.flatMap(r -> r.getContent().map(sse -> {
42+
String catalog = sse.contentAsString();
43+
System.out.println("catalog = " + catalog);
44+
return Catalog.fromJson(catalog);
45+
})));
4146
}
4247

4348
public static class Catalog {

reactive-lab-gateway/src/main/java/io/reactivex/lab/gateway/clients/RatingsCommand.java

+31-12
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,10 @@
66
import io.reactivex.lab.gateway.clients.PersonalizedCatalogCommand.Video;
77
import io.reactivex.lab.gateway.clients.RatingsCommand.Rating;
88
import io.reactivex.lab.gateway.common.SimpleJson;
9-
import io.reactivex.netty.RxNetty;
10-
import io.reactivex.netty.pipeline.PipelineConfigurators;
119
import io.reactivex.netty.protocol.http.client.HttpClientRequest;
10+
import io.reactivex.netty.protocol.http.sse.ServerSentEvent;
11+
import netflix.ocelli.LoadBalancer;
12+
import netflix.ocelli.rxnetty.HttpClientHolder;
1213
import rx.Observable;
1314

1415
import java.util.Arrays;
@@ -17,26 +18,37 @@
1718

1819
public class RatingsCommand extends HystrixObservableCommand<Rating> {
1920
private final List<Video> videos;
21+
private final LoadBalancer<HttpClientHolder<ByteBuf, ServerSentEvent>> loadBalancer;
2022

21-
public RatingsCommand(Video video) {
22-
this(Arrays.asList(video));
23+
public RatingsCommand(Video video, LoadBalancer<HttpClientHolder<ByteBuf, ServerSentEvent>> loadBalancer) {
24+
this(Arrays.asList(video), loadBalancer);
2325
// replace with HystrixCollapser
2426
}
2527

26-
public RatingsCommand(List<Video> videos) {
28+
public RatingsCommand(List<Video> videos, LoadBalancer<HttpClientHolder<ByteBuf, ServerSentEvent>> loadBalancer) {
2729
super(HystrixCommandGroupKey.Factory.asKey("Ratings"));
2830
this.videos = videos;
31+
this.loadBalancer = loadBalancer;
2932
}
3033

3134
@Override
3235
protected Observable<Rating> run() {
33-
return RxNetty.createHttpClient("localhost", 9193, PipelineConfigurators.<ByteBuf>clientSseConfigurator())
34-
.submit(HttpClientRequest.createGet("/ratings?" + UrlGenerator.generate("videoId", videos)))
35-
.flatMap(r -> r.getContent().map(sse -> {
36-
String ratings = sse.contentAsString();
37-
System.out.println("ratings = " + ratings);
38-
return Rating.fromJson(ratings);
39-
}));
36+
HttpClientRequest<ByteBuf> request = HttpClientRequest.createGet("/ratings?" + UrlGenerator.generate("videoId",
37+
videos));
38+
return loadBalancer.choose().map(holder -> holder.getClient())
39+
.flatMap(client -> {
40+
System.out.println("RatingsCommand.run");
41+
return client.submit(request)
42+
.flatMap(r -> {
43+
System.out.println("RatingsCommand.response" + r.getStatus());
44+
return r.getContent().map(sse -> {
45+
System.out.println("RatingsCommand.content");
46+
String ratings = sse.contentAsString();
47+
System.out.println("ratings = " + ratings);
48+
return Rating.fromJson(ratings);
49+
});
50+
});
51+
});
4052
}
4153

4254
public static class Rating {
@@ -64,4 +76,11 @@ public static Rating fromJson(String json) {
6476
}
6577

6678
}
79+
/*
80+
81+
public static void main(String[] args) {
82+
RxNetty.createHttpClient("127.0.0.1", 9193, PipelineConfigurators.<ByteBuf>clientSseConfigurator())
83+
.submit(HttpClientRequest.createGet("?id=1&videoId=2")).flatMap().toBlocking()
84+
}
85+
*/
6786
}

reactive-lab-gateway/src/main/java/io/reactivex/lab/gateway/clients/SocialCommand.java

+17-12
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,10 @@
66
import io.reactivex.lab.gateway.clients.SocialCommand.Social;
77
import io.reactivex.lab.gateway.clients.UserCommand.User;
88
import io.reactivex.lab.gateway.common.SimpleJson;
9-
import io.reactivex.netty.RxNetty;
10-
import io.reactivex.netty.pipeline.PipelineConfigurators;
119
import io.reactivex.netty.protocol.http.client.HttpClientRequest;
10+
import io.reactivex.netty.protocol.http.sse.ServerSentEvent;
11+
import netflix.ocelli.LoadBalancer;
12+
import netflix.ocelli.rxnetty.HttpClientHolder;
1213
import rx.Observable;
1314

1415
import java.util.Arrays;
@@ -18,26 +19,30 @@
1819
public class SocialCommand extends HystrixObservableCommand<Social> {
1920

2021
private final List<User> users;
22+
private final LoadBalancer<HttpClientHolder<ByteBuf, ServerSentEvent>> loadBalancer;
2123

22-
public SocialCommand(User user) {
23-
this(Arrays.asList(user));
24+
public SocialCommand(User user, LoadBalancer<HttpClientHolder<ByteBuf, ServerSentEvent>> loadBalancer) {
25+
this(Arrays.asList(user), loadBalancer);
2426
// replace with HystrixCollapser
2527
}
2628

27-
public SocialCommand(List<User> users) {
29+
public SocialCommand(List<User> users, LoadBalancer<HttpClientHolder<ByteBuf, ServerSentEvent>> loadBalancer) {
2830
super(HystrixCommandGroupKey.Factory.asKey("Social"));
2931
this.users = users;
32+
this.loadBalancer = loadBalancer;
3033
}
3134

3235
@Override
3336
protected Observable<Social> run() {
34-
return RxNetty.createHttpClient("localhost", 9194, PipelineConfigurators.<ByteBuf> clientSseConfigurator())
35-
.submit(HttpClientRequest.createGet("/social?" + UrlGenerator.generate("userId", users)))
36-
.flatMap(r -> r.getContent().map(sse -> {
37-
String social = sse.contentAsString();
38-
System.out.println("social = " + social);
39-
return Social.fromJson(social);
40-
}));
37+
HttpClientRequest<ByteBuf> request = HttpClientRequest.createGet("/social?" + UrlGenerator.generate("userId",
38+
users));
39+
return loadBalancer.choose().map(holder -> holder.getClient())
40+
.flatMap(client -> client.submit(request)
41+
.flatMap(r -> r.getContent().map(sse -> {
42+
String social = sse.contentAsString();
43+
System.out.println("social = " + social);
44+
return Social.fromJson(social);
45+
})));
4146
}
4247

4348
public static class Social {

0 commit comments

Comments
 (0)