-
Notifications
You must be signed in to change notification settings - Fork 73
/
Copy pathBookmarksCommand.java
84 lines (69 loc) · 2.8 KB
/
BookmarksCommand.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
package io.reactivex.lab.gateway.clients;
import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixObservableCommand;
import io.netty.buffer.ByteBuf;
import io.reactivex.lab.gateway.clients.BookmarksCommand.Bookmark;
import io.reactivex.lab.gateway.clients.PersonalizedCatalogCommand.Video;
import io.reactivex.lab.gateway.common.SimpleJson;
import io.reactivex.netty.protocol.http.client.HttpClientRequest;
import io.reactivex.netty.protocol.http.sse.ServerSentEvent;
import netflix.ocelli.LoadBalancer;
import netflix.ocelli.rxnetty.HttpClientHolder;
import rx.Observable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class BookmarksCommand extends HystrixObservableCommand<Bookmark> {
final List<Video> videos;
private final LoadBalancer<HttpClientHolder<ByteBuf, ServerSentEvent>> loadBalancer;
final String cacheKey;
public BookmarksCommand(List<Video> videos, LoadBalancer<HttpClientHolder<ByteBuf, ServerSentEvent>> loadBalancer) {
super(HystrixCommandGroupKey.Factory.asKey("GetBookmarks"));
this.videos = videos;
this.loadBalancer = loadBalancer;
StringBuilder b = new StringBuilder();
for (Video v : videos) {
b.append(v.getId()).append("-");
}
this.cacheKey = b.toString();
}
@Override
public Observable<Bookmark> construct() {
HttpClientRequest<ByteBuf> request = HttpClientRequest.createGet("/bookmarks?" + UrlGenerator.generate("videoId", videos));
return loadBalancer.choose()
.map(holder -> holder.getClient())
.<Bookmark>flatMap(client -> client.submit(request)
.flatMap(r -> r.getContent().map((ServerSentEvent sse) -> Bookmark.fromJson(sse.contentAsString()))))
.retry(1);
}
protected Observable<Bookmark> resumeWithFallback() {
List<Bookmark> bs = new ArrayList<>();
for (Video v : videos) {
Map<String, Object> data = new HashMap<>();
data.put("position", 0);
data.put("videoId", v.getId());
bs.add(new Bookmark(data));
}
return Observable.from(bs);
}
@Override
protected String getCacheKey() {
return cacheKey;
}
public static class Bookmark {
private final Map<String, Object> data;
Bookmark(Map<String, Object> data) {
this.data = data;
}
public static Bookmark fromJson(String json) {
return new Bookmark(SimpleJson.jsonToMap(json));
}
public int getPosition() {
return (int) data.get("position");
}
public int getVideoId() {
return Integer.parseInt(String.valueOf(data.get("videoId")));
}
}
}