-
Notifications
You must be signed in to change notification settings - Fork 73
/
Copy pathUserCommand.java
87 lines (72 loc) · 3.17 KB
/
UserCommand.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
package io.reactivex.lab.gateway.clients;
import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixObservableCommand;
import io.netty.buffer.ByteBuf;
import io.reactivex.lab.gateway.clients.UserCommand.User;
import io.reactivex.lab.gateway.common.SimpleJson;
import io.reactivex.lab.gateway.loadbalancer.DiscoveryAndLoadBalancer;
import io.reactivex.netty.protocol.http.client.HttpClientRequest;
import io.reactivex.netty.protocol.http.sse.ServerSentEvent;
import netflix.ocelli.LoadBalancer;
import netflix.ocelli.rxnetty.HttpClientHolder;
import rx.Observable;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class UserCommand extends HystrixObservableCommand<User> {
private final List<String> userIds;
private static final LoadBalancer<HttpClientHolder<ByteBuf, ServerSentEvent>> loadBalancer =
DiscoveryAndLoadBalancer.getFactory().forVip("reactive-lab-user-service");
public UserCommand(List<String> userIds) {
super(HystrixCommandGroupKey.Factory.asKey("User"));
this.userIds = userIds;
}
@Override
protected Observable<User> construct() {
HttpClientRequest<ByteBuf> request = HttpClientRequest.createGet("/user?" + UrlGenerator.generate("userId", userIds));
return loadBalancer.choose().map(holder -> holder.getClient())
.<User>flatMap(client -> client.submit(request)
.flatMap(r -> r.getContent().map(
(ServerSentEvent sse) -> {
String user = sse.contentAsString();
return User.fromJson(user);
})))
.retry(1);
}
@Override
protected Observable<User> resumeWithFallback() {
return Observable.from(userIds).map(id -> {
Map<String, Object> fallback = new HashMap<>();
fallback.put("userId", id);
fallback.put("name", "Fallback Name Here");
fallback.put("other_data", "goes_here");
User u = new User(fallback);
return u;
});
}
public static class User implements ID {
private final Map<String, Object> data;
public User(Map<String, Object> jsonToMap) {
this.data = jsonToMap;
}
public static User fromJson(String json) {
Map<String, Object> data = SimpleJson.jsonToMap(json);
if (!data.containsKey("userId")) {
throw new IllegalArgumentException("A User object requires a 'userId'");
} else {
try {
int id = Integer.parseInt(String.valueOf(data.get("userId")));
} catch (Exception e) {
throw new IllegalArgumentException("The `userId` must be an Integer");
}
}
return new User(data);
}
public int getId() {
return Integer.parseInt(String.valueOf(data.get("userId")));
}
public String getName() {
return (String) data.get("name");
}
}
}