-
Notifications
You must be signed in to change notification settings - Fork 73
/
Copy pathGeoCommand.java
53 lines (42 loc) · 1.93 KB
/
GeoCommand.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package io.reactivex.lab.gateway.clients;
import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixObservableCommand;
import io.netty.buffer.ByteBuf;
import io.reactivex.lab.gateway.clients.GeoCommand.GeoIP;
import io.reactivex.lab.gateway.common.SimpleJson;
import io.reactivex.lab.gateway.loadbalancer.DiscoveryAndLoadBalancer;
import io.reactivex.netty.protocol.http.client.HttpClientRequest;
import io.reactivex.netty.protocol.http.sse.ServerSentEvent;
import netflix.ocelli.LoadBalancer;
import netflix.ocelli.rxnetty.HttpClientHolder;
import rx.Observable;
import java.util.List;
import java.util.Map;
public class GeoCommand extends HystrixObservableCommand<GeoIP> {
private final List<String> ips;
private static final LoadBalancer<HttpClientHolder<ByteBuf, ServerSentEvent>> loadBalancer =
DiscoveryAndLoadBalancer.getFactory().forVip("reactive-lab-geo-service");
public GeoCommand(List<String> ips) {
super(HystrixCommandGroupKey.Factory.asKey("GeoIP"));
this.ips = ips;
}
@Override
protected Observable<GeoIP> construct() {
HttpClientRequest<ByteBuf> request = HttpClientRequest.createGet("/geo?" + UrlGenerator.generate("ip", ips));
return loadBalancer.choose()
.map(holder -> holder.getClient())
.<GeoIP>flatMap(client -> client.submit(request)
.flatMap(r -> r.getContent()
.map((ServerSentEvent sse) -> GeoIP.fromJson(sse.contentAsString()))))
.retry(1);
}
public static class GeoIP {
private final Map<String, Object> data;
private GeoIP(Map<String, Object> data) {
this.data = data;
}
public static GeoIP fromJson(String json) {
return new GeoIP(SimpleJson.jsonToMap(json));
}
}
}