Skip to content

Commit f25a5ac

Browse files
committed
Reduce amount of proxy ws client
1 parent d74c180 commit f25a5ac

File tree

2 files changed

+9
-2
lines changed

2 files changed

+9
-2
lines changed

devicehive-proxy-ws-kafka-impl/src/main/java/com/devicehive/proxy/ProxyResponseHandler.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,10 @@ public ProxyResponseHandler(Gson gson, String requestTopic, String replyToTopic,
5555
WebSocketKafkaProxyClient webSocketKafkaProxyClient = new WebSocketKafkaProxyClient((message, client) -> {});
5656
webSocketKafkaProxyClient.setWebSocketKafkaProxyConfig(proxyConfig);
5757
this.proxyClient = webSocketKafkaProxyClient;
58-
this.proxyClient.start();
58+
}
59+
60+
public void start() {
61+
proxyClient.start();
5962
}
6063

6164
@Override

devicehive-proxy-ws-kafka-impl/src/main/java/com/devicehive/proxy/config/FrontendProxyClientConfig.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,11 @@ public NotificationHandler notificationHandler(Gson gson, RequestResponseMatcher
9191
public WorkerPool<ServerEvent> workerPool(Gson gson, RequestResponseMatcher requestResponseMatcher, WebSocketKafkaProxyConfig proxyConfig) {
9292
final ProxyResponseHandler[] workHandlers = new ProxyResponseHandler[proxyConfig.getWorkerThreads()];
9393
IntStream.range(0, proxyConfig.getWorkerThreads()).forEach(
94-
nbr -> workHandlers[nbr] = new ProxyResponseHandler(gson, REQUEST_TOPIC, RESPONSE_TOPIC, proxyConfig, requestResponseMatcher)
94+
nbr -> {
95+
ProxyResponseHandler handler = new ProxyResponseHandler(gson, REQUEST_TOPIC, RESPONSE_TOPIC, proxyConfig, requestResponseMatcher);
96+
handler.start();
97+
workHandlers[nbr] = handler;
98+
}
9599
);
96100
final RingBuffer<ServerEvent> ringBuffer = RingBuffer.createMultiProducer(ServerEvent::new, proxyConfig.getBufferSize(), getWaitStrategy(proxyConfig.getWaitStrategy()));
97101
final SequenceBarrier barrier = ringBuffer.newBarrier();

0 commit comments

Comments
 (0)