Skip to content

Commit a10e021

Browse files
committed
Add pool for proxy subscription
1 parent 58e1f27 commit a10e021

File tree

3 files changed

+38
-19
lines changed

3 files changed

+38
-19
lines changed

devicehive-proxy-ws-kafka-impl/src/main/java/com/devicehive/proxy/FrontendProxyClient.java

+24-8
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,14 @@
2222

2323
import com.devicehive.api.RequestResponseMatcher;
2424
import com.devicehive.model.ServerEvent;
25+
import com.devicehive.proxy.api.NotificationHandler;
2526
import com.devicehive.proxy.api.ProxyClient;
2627
import com.devicehive.proxy.api.ProxyMessageBuilder;
2728
import com.devicehive.proxy.api.payload.NotificationCreatePayload;
2829
import com.devicehive.proxy.api.payload.SubscribePayload;
2930
import com.devicehive.proxy.api.payload.TopicsPayload;
31+
import com.devicehive.proxy.client.WebSocketKafkaProxyClient;
32+
import com.devicehive.proxy.config.WebSocketKafkaProxyConfig;
3033
import com.devicehive.shim.api.Request;
3134
import com.devicehive.shim.api.RequestType;
3235
import com.devicehive.shim.api.Response;
@@ -37,29 +40,32 @@
3740
import org.slf4j.LoggerFactory;
3841

3942
import java.util.Arrays;
40-
import java.util.concurrent.CompletableFuture;
41-
import java.util.concurrent.ExecutionException;
42-
import java.util.concurrent.TimeUnit;
43-
import java.util.concurrent.TimeoutException;
43+
import java.util.UUID;
44+
import java.util.concurrent.*;
4445
import java.util.function.Consumer;
4546

4647
public class FrontendProxyClient implements RpcClient {
4748
private static final Logger logger = LoggerFactory.getLogger(FrontendProxyClient.class);
4849

4950
private final String requestTopic;
5051
private final String replyToTopic;
51-
private final ProxyClient client;
52+
private final WebSocketKafkaProxyClient client;
53+
private final WebSocketKafkaProxyConfig proxyConfig;
54+
private final NotificationHandler notificationHandler;
5255
private final RequestResponseMatcher requestResponseMatcher;
5356
private final Gson gson;
5457
private final RingBuffer<ServerEvent> ringBuffer;
5558

56-
public FrontendProxyClient(String requestTopic, String replyToTopic, ProxyClient client, RequestResponseMatcher requestResponseMatcher, Gson gson, RingBuffer<ServerEvent> ringBuffer) {
59+
public FrontendProxyClient(String requestTopic, String replyToTopic, WebSocketKafkaProxyConfig proxyConfig, NotificationHandler notificationHandler, RequestResponseMatcher requestResponseMatcher, Gson gson, RingBuffer<ServerEvent> ringBuffer) {
5760
this.requestTopic = requestTopic;
5861
this.replyToTopic = replyToTopic;
59-
this.client = client;
62+
this.proxyConfig = proxyConfig;
63+
this.notificationHandler = notificationHandler;
6064
this.requestResponseMatcher = requestResponseMatcher;
6165
this.gson = gson;
6266
this.ringBuffer = ringBuffer;
67+
this.client = new WebSocketKafkaProxyClient((message, client) -> {});
68+
client.setWebSocketKafkaProxyConfig(proxyConfig);
6369
}
6470

6571
@Override
@@ -85,7 +91,17 @@ public void push(Request request) {
8591
public void start() {
8692
client.start();
8793
client.push(ProxyMessageBuilder.create(new TopicsPayload(Arrays.asList(requestTopic, replyToTopic)))).join();
88-
client.push(ProxyMessageBuilder.subscribe(new SubscribePayload(replyToTopic))).join();
94+
95+
UUID uuid = UUID.randomUUID();
96+
Executor executionPool = Executors.newFixedThreadPool(proxyConfig.getWorkerThreads());
97+
for (int i = 0; i < proxyConfig.getWorkerThreads(); i++) {
98+
executionPool.execute(() -> {
99+
WebSocketKafkaProxyClient client = new WebSocketKafkaProxyClient(notificationHandler);
100+
client.setWebSocketKafkaProxyConfig(proxyConfig);
101+
client.start();
102+
client.push(ProxyMessageBuilder.subscribe(new SubscribePayload(replyToTopic, uuid.toString()))).join();
103+
});
104+
}
89105

90106
pingServer();
91107
}

devicehive-proxy-ws-kafka-impl/src/main/java/com/devicehive/proxy/config/BackendProxyClientConfig.java

+13-7
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.springframework.context.annotation.Configuration;
4343
import org.springframework.context.annotation.Profile;
4444

45+
import java.util.concurrent.Executor;
4546
import java.util.concurrent.ExecutorService;
4647
import java.util.concurrent.Executors;
4748
import java.util.stream.IntStream;
@@ -77,13 +78,18 @@ public NotificationHandler notificationHandler(Gson gson, WorkerPool<ServerEvent
7778
}
7879

7980
@Bean
80-
public ProxyClient proxyClient(NotificationHandler notificationHandler, WebSocketKafkaProxyConfig proxyConfig) {
81-
WebSocketKafkaProxyClient client = new WebSocketKafkaProxyClient(notificationHandler);
82-
client.setWebSocketKafkaProxyConfig(proxyConfig);
83-
client.start();
84-
client.push(ProxyMessageBuilder.create(new TopicsPayload(REQUEST_TOPIC))).join();
85-
client.push(ProxyMessageBuilder.subscribe(new SubscribePayload(REQUEST_TOPIC, proxyConfig.getConsumerGroup()))).join();
86-
return client;
81+
public Executor executionPool(NotificationHandler notificationHandler, WebSocketKafkaProxyConfig proxyConfig) {
82+
Executor executionPool = Executors.newFixedThreadPool(proxyConfig.getWorkerThreads());
83+
for (int i = 0; i < proxyConfig.getWorkerThreads(); i++) {
84+
executionPool.execute(() -> {
85+
WebSocketKafkaProxyClient client = new WebSocketKafkaProxyClient(notificationHandler);
86+
client.setWebSocketKafkaProxyConfig(proxyConfig);
87+
client.start();
88+
client.push(ProxyMessageBuilder.create(new TopicsPayload(REQUEST_TOPIC))).join();
89+
client.push(ProxyMessageBuilder.subscribe(new SubscribePayload(REQUEST_TOPIC, proxyConfig.getConsumerGroup()))).join();
90+
});
91+
}
92+
return executionPool;
8793
}
8894

8995
@Bean

devicehive-proxy-ws-kafka-impl/src/main/java/com/devicehive/proxy/config/FrontendProxyClientConfig.java

+1-4
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import com.devicehive.proxy.FrontendProxyClient;
2626
import com.devicehive.proxy.ProxyResponseHandler;
2727
import com.devicehive.proxy.api.NotificationHandler;
28-
import com.devicehive.proxy.client.WebSocketKafkaProxyClient;
2928
import com.devicehive.shim.api.client.RpcClient;
3029
import com.google.gson.Gson;
3130
import com.lmax.disruptor.*;
@@ -103,11 +102,9 @@ public WorkerPool<ServerEvent> workerPool(Gson gson, RequestResponseMatcher requ
103102

104103
@Bean
105104
public RpcClient rpcClient(NotificationHandler notificationHandler, WebSocketKafkaProxyConfig proxyConfig, RequestResponseMatcher requestResponseMatcher, Gson gson, WorkerPool<ServerEvent> workerPool) {
106-
WebSocketKafkaProxyClient proxyClient = new WebSocketKafkaProxyClient(notificationHandler);
107-
proxyClient.setWebSocketKafkaProxyConfig(proxyConfig);
108105
final ExecutorService execService = Executors.newFixedThreadPool(proxyConfig.getWorkerThreads());
109106
RingBuffer<ServerEvent> ringBuffer = workerPool.start(execService);
110-
RpcClient client = new FrontendProxyClient(REQUEST_TOPIC, RESPONSE_TOPIC, proxyClient, requestResponseMatcher, gson, ringBuffer);
107+
RpcClient client = new FrontendProxyClient(REQUEST_TOPIC, RESPONSE_TOPIC, proxyConfig, notificationHandler, requestResponseMatcher, gson, ringBuffer);
111108
client.start();
112109
return client;
113110
}

0 commit comments

Comments
 (0)