-
Notifications
You must be signed in to change notification settings - Fork 73
/
Copy pathBookmarkCommand.java
65 lines (51 loc) · 2.1 KB
/
BookmarkCommand.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package io.reactivex.lab.gateway.clients;
import io.netty.buffer.ByteBuf;
import io.reactivex.lab.gateway.clients.BookmarksCommand.Bookmark;
import io.reactivex.lab.gateway.clients.PersonalizedCatalogCommand.Video;
import io.reactivex.lab.gateway.loadbalancer.DiscoveryAndLoadBalancer;
import io.reactivex.netty.protocol.http.sse.ServerSentEvent;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import netflix.ocelli.LoadBalancer;
import netflix.ocelli.rxnetty.HttpClientHolder;
import rx.functions.Func1;
import com.netflix.hystrix.HystrixCollapser.CollapsedRequest;
import com.netflix.hystrix.HystrixObservableCollapser;
import com.netflix.hystrix.HystrixObservableCommand;
public class BookmarkCommand extends HystrixObservableCollapser<Integer, Bookmark, Bookmark, Video> {
private final Video video;
private static final LoadBalancer<HttpClientHolder<ByteBuf, ServerSentEvent>> loadBalancer =
DiscoveryAndLoadBalancer.getFactory().forVip("reactive-lab-bookmark-service");
public BookmarkCommand(Video video) {
this.video = video;
}
@Override
public Video getRequestArgument() {
return video;
}
@Override
protected HystrixObservableCommand<Bookmark> createCommand(Collection<CollapsedRequest<Bookmark, Video>> requests) {
List<Video> videos = new ArrayList<>();
for (CollapsedRequest<Bookmark, Video> r : requests) {
videos.add(r.getArgument());
}
return new BookmarksCommand(videos, loadBalancer);
}
protected void onMissingResponse(CollapsedRequest<Bookmark, Video> r) {
// set a default using setResponse or an exception like this
r.setException(new Exception("No bookmark"));
}
@Override
protected Func1<Bookmark, Integer> getBatchReturnTypeKeySelector() {
return Bookmark::getVideoId;
}
@Override
protected Func1<Video, Integer> getRequestArgumentKeySelector() {
return Video::getId;
}
@Override
protected Func1<Bookmark, Bookmark> getBatchReturnTypeToResponseTypeMapper() {
return (b) -> b;
}
}