Skip to content

Commit 58e1f27

Browse files
committed
Add Disruptor support for Frontend
1 parent d61ca68 commit 58e1f27

File tree

3 files changed

+83
-9
lines changed

3 files changed

+83
-9
lines changed

devicehive-proxy-ws-kafka-impl/src/main/java/com/devicehive/proxy/FrontendProxyClient.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
*/
2222

2323
import com.devicehive.api.RequestResponseMatcher;
24+
import com.devicehive.model.ServerEvent;
2425
import com.devicehive.proxy.api.ProxyClient;
2526
import com.devicehive.proxy.api.ProxyMessageBuilder;
2627
import com.devicehive.proxy.api.payload.NotificationCreatePayload;
@@ -31,6 +32,7 @@
3132
import com.devicehive.shim.api.Response;
3233
import com.devicehive.shim.api.client.RpcClient;
3334
import com.google.gson.Gson;
35+
import com.lmax.disruptor.RingBuffer;
3436
import org.slf4j.Logger;
3537
import org.slf4j.LoggerFactory;
3638

@@ -49,21 +51,23 @@ public class FrontendProxyClient implements RpcClient {
4951
private final ProxyClient client;
5052
private final RequestResponseMatcher requestResponseMatcher;
5153
private final Gson gson;
54+
private final RingBuffer<ServerEvent> ringBuffer;
5255

53-
public FrontendProxyClient(String requestTopic, String replyToTopic, ProxyClient client, RequestResponseMatcher requestResponseMatcher, Gson gson) {
56+
public FrontendProxyClient(String requestTopic, String replyToTopic, ProxyClient client, RequestResponseMatcher requestResponseMatcher, Gson gson, RingBuffer<ServerEvent> ringBuffer) {
5457
this.requestTopic = requestTopic;
5558
this.replyToTopic = replyToTopic;
5659
this.client = client;
5760
this.requestResponseMatcher = requestResponseMatcher;
5861
this.gson = gson;
62+
this.ringBuffer = ringBuffer;
5963
}
6064

6165
@Override
6266
public void call(Request request, Consumer<Response> callback) {
6367
requestResponseMatcher.addRequestCallback(request.getCorrelationId(), callback);
6468
logger.debug("Request callback added for request: {}, correlationId: {}", request.getBody(), request.getCorrelationId());
6569

66-
push(request);
70+
ringBuffer.publishEvent((serverEvent, sequence, response) -> serverEvent.set(response), request);
6771
}
6872

6973
@Override

devicehive-proxy-ws-kafka-impl/src/main/java/com/devicehive/proxy/ProxyResponseHandler.java

+30-2
Original file line numberDiff line numberDiff line change
@@ -21,25 +21,41 @@
2121
*/
2222

2323
import com.devicehive.api.RequestResponseMatcher;
24+
import com.devicehive.model.ServerEvent;
2425
import com.devicehive.proxy.api.NotificationHandler;
2526
import com.devicehive.proxy.api.ProxyClient;
27+
import com.devicehive.proxy.api.ProxyMessageBuilder;
28+
import com.devicehive.proxy.api.payload.NotificationCreatePayload;
29+
import com.devicehive.proxy.client.WebSocketKafkaProxyClient;
30+
import com.devicehive.proxy.config.WebSocketKafkaProxyConfig;
31+
import com.devicehive.shim.api.Request;
2632
import com.devicehive.shim.api.Response;
2733
import com.google.gson.Gson;
34+
import com.lmax.disruptor.WorkHandler;
2835
import org.slf4j.Logger;
2936
import org.slf4j.LoggerFactory;
3037
import org.springframework.beans.factory.annotation.Autowired;
3138

32-
public class ProxyResponseHandler implements NotificationHandler {
39+
public class ProxyResponseHandler implements NotificationHandler, WorkHandler<ServerEvent> {
3340

3441
private static final Logger logger = LoggerFactory.getLogger(ProxyResponseHandler.class);
3542

3643
private final Gson gson;
44+
private final String requestTopic;
45+
private final String replyToTopic;
46+
private final ProxyClient proxyClient;
3747
private final RequestResponseMatcher requestResponseMatcher;
3848

3949
@Autowired
40-
public ProxyResponseHandler(Gson gson, RequestResponseMatcher requestResponseMatcher) {
50+
public ProxyResponseHandler(Gson gson, String requestTopic, String replyToTopic, WebSocketKafkaProxyConfig proxyConfig, RequestResponseMatcher requestResponseMatcher) {
4151
this.gson = gson;
52+
this.requestTopic = requestTopic;
53+
this.replyToTopic = replyToTopic;
4254
this.requestResponseMatcher = requestResponseMatcher;
55+
WebSocketKafkaProxyClient webSocketKafkaProxyClient = new WebSocketKafkaProxyClient((message, client) -> {});
56+
webSocketKafkaProxyClient.setWebSocketKafkaProxyConfig(proxyConfig);
57+
this.proxyClient = webSocketKafkaProxyClient;
58+
this.proxyClient.start();
4359
}
4460

4561
@Override
@@ -49,4 +65,16 @@ public void handle(String message, ProxyClient client) {
4965

5066
requestResponseMatcher.offerResponse(response);
5167
}
68+
69+
@Override
70+
public void onEvent(ServerEvent serverEvent) throws Exception {
71+
final Request request = serverEvent.get();
72+
if (request.getBody() == null) {
73+
throw new NullPointerException("Request body must not be null.");
74+
}
75+
request.setReplyTo(replyToTopic);
76+
77+
proxyClient.push(ProxyMessageBuilder.notification(
78+
new NotificationCreatePayload(requestTopic, gson.toJson(request), request.getPartitionKey())));
79+
}
5280
}

devicehive-proxy-ws-kafka-impl/src/main/java/com/devicehive/proxy/config/FrontendProxyClientConfig.java

+47-5
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,14 @@
2121
*/
2222

2323
import com.devicehive.api.RequestResponseMatcher;
24+
import com.devicehive.model.ServerEvent;
2425
import com.devicehive.proxy.FrontendProxyClient;
2526
import com.devicehive.proxy.ProxyResponseHandler;
2627
import com.devicehive.proxy.api.NotificationHandler;
2728
import com.devicehive.proxy.client.WebSocketKafkaProxyClient;
28-
import com.devicehive.proxy.config.WebSocketKafkaProxyConfig;
2929
import com.devicehive.shim.api.client.RpcClient;
3030
import com.google.gson.Gson;
31+
import com.lmax.disruptor.*;
3132
import org.springframework.beans.factory.annotation.Value;
3233
import org.springframework.context.annotation.Bean;
3334
import org.springframework.context.annotation.ComponentScan;
@@ -42,6 +43,9 @@
4243
import java.util.Base64;
4344
import java.util.Optional;
4445
import java.util.UUID;
46+
import java.util.concurrent.ExecutorService;
47+
import java.util.concurrent.Executors;
48+
import java.util.stream.IntStream;
4549

4650
import static com.devicehive.configuration.Constants.REQUEST_TOPIC;
4751

@@ -80,16 +84,54 @@ public RequestResponseMatcher requestResponseMatcher() {
8084
}
8185

8286
@Bean
83-
public NotificationHandler notificationHandler(Gson gson, RequestResponseMatcher requestResponseMatcher) {
84-
return new ProxyResponseHandler(gson, requestResponseMatcher);
87+
public NotificationHandler notificationHandler(Gson gson, RequestResponseMatcher requestResponseMatcher, WebSocketKafkaProxyConfig proxyConfig) {
88+
return new ProxyResponseHandler(gson, REQUEST_TOPIC, RESPONSE_TOPIC, proxyConfig, requestResponseMatcher);
8589
}
8690

8791
@Bean
88-
public RpcClient rpcClient(NotificationHandler notificationHandler, WebSocketKafkaProxyConfig proxyConfig, RequestResponseMatcher requestResponseMatcher, Gson gson) {
92+
public WorkerPool<ServerEvent> workerPool(Gson gson, RequestResponseMatcher requestResponseMatcher, WebSocketKafkaProxyConfig proxyConfig) {
93+
final ProxyResponseHandler[] workHandlers = new ProxyResponseHandler[proxyConfig.getWorkerThreads()];
94+
IntStream.range(0, proxyConfig.getWorkerThreads()).forEach(
95+
nbr -> workHandlers[nbr] = new ProxyResponseHandler(gson, REQUEST_TOPIC, RESPONSE_TOPIC, proxyConfig, requestResponseMatcher)
96+
);
97+
final RingBuffer<ServerEvent> ringBuffer = RingBuffer.createMultiProducer(ServerEvent::new, proxyConfig.getBufferSize(), getWaitStrategy(proxyConfig.getWaitStrategy()));
98+
final SequenceBarrier barrier = ringBuffer.newBarrier();
99+
WorkerPool<ServerEvent> workerPool = new WorkerPool<>(ringBuffer, barrier, new FatalExceptionHandler(), workHandlers);
100+
ringBuffer.addGatingSequences(workerPool.getWorkerSequences());
101+
return workerPool;
102+
}
103+
104+
@Bean
105+
public RpcClient rpcClient(NotificationHandler notificationHandler, WebSocketKafkaProxyConfig proxyConfig, RequestResponseMatcher requestResponseMatcher, Gson gson, WorkerPool<ServerEvent> workerPool) {
89106
WebSocketKafkaProxyClient proxyClient = new WebSocketKafkaProxyClient(notificationHandler);
90107
proxyClient.setWebSocketKafkaProxyConfig(proxyConfig);
91-
RpcClient client = new FrontendProxyClient(REQUEST_TOPIC, RESPONSE_TOPIC, proxyClient, requestResponseMatcher, gson);
108+
final ExecutorService execService = Executors.newFixedThreadPool(proxyConfig.getWorkerThreads());
109+
RingBuffer<ServerEvent> ringBuffer = workerPool.start(execService);
110+
RpcClient client = new FrontendProxyClient(REQUEST_TOPIC, RESPONSE_TOPIC, proxyClient, requestResponseMatcher, gson, ringBuffer);
92111
client.start();
93112
return client;
94113
}
114+
115+
private WaitStrategy getWaitStrategy(String strategy) {
116+
WaitStrategy waitStrategy;
117+
118+
switch (strategy) {
119+
case "blocking":
120+
waitStrategy = new BlockingWaitStrategy();
121+
break;
122+
case "sleeping":
123+
waitStrategy = new SleepingWaitStrategy();
124+
break;
125+
case "yielding":
126+
waitStrategy = new YieldingWaitStrategy();
127+
break;
128+
case "busyspin":
129+
waitStrategy = new BusySpinWaitStrategy();
130+
break;
131+
default:
132+
waitStrategy = new BlockingWaitStrategy();
133+
break;
134+
}
135+
return waitStrategy;
136+
}
95137
}

0 commit comments

Comments
 (0)