Skip to content

Commit 8452ad1

Browse files
committed
Server Backend now supports WebSocket-Kafka proxy client
1 parent 2a7ad46 commit 8452ad1

File tree

12 files changed

+264
-19
lines changed

12 files changed

+264
-19
lines changed

devicehive-backend/pom.xml

+4
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,10 @@
2222
<groupId>org.springframework.boot</groupId>
2323
<artifactId>spring-boot-starter-web</artifactId>
2424
</dependency>
25+
<dependency>
26+
<groupId>org.springframework.boot</groupId>
27+
<artifactId>spring-boot-starter-undertow</artifactId>
28+
</dependency>
2529
<dependency>
2630
<groupId>com.devicehive</groupId>
2731
<artifactId>devicehive-test-utils</artifactId>

devicehive-backend/src/main/java/com/devicehive/application/BackendConfig.java

+3-4
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,11 @@
2323
import com.devicehive.eventbus.FilterRegistry;
2424
import com.devicehive.eventbus.SubscriberRegistry;
2525
import com.devicehive.json.GsonFactory;
26+
import com.devicehive.shim.api.server.MessageDispatcher;
2627
import com.google.gson.Gson;
27-
import org.springframework.beans.factory.annotation.Autowired;
2828
import org.springframework.context.annotation.Bean;
2929
import org.springframework.context.annotation.Configuration;
3030
import com.devicehive.eventbus.EventBus;
31-
import com.devicehive.shim.api.server.RpcServer;
3231
import org.springframework.context.annotation.DependsOn;
3332

3433
@Configuration
@@ -53,7 +52,7 @@ public SubscriberRegistry subscriberRegistry() {
5352

5453
@Bean
5554
@DependsOn("subscriberRegistry")
56-
public EventBus eventBus(RpcServer rpcServer, SubscriberRegistry subscriberRegistry) {
57-
return new EventBus(rpcServer.getDispatcher(), subscriberRegistry);
55+
public EventBus eventBus(MessageDispatcher dispatcher, SubscriberRegistry subscriberRegistry) {
56+
return new EventBus(dispatcher, subscriberRegistry);
5857
}
5958
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
package com.devicehive.application;
2+
3+
/*
4+
* #%L
5+
* DeviceHive Backend Logic
6+
* %%
7+
* Copyright (C) 2016 - 2017 DataArt
8+
* %%
9+
* Licensed under the Apache License, Version 2.0 (the "License");
10+
* you may not use this file except in compliance with the License.
11+
* You may obtain a copy of the License at
12+
*
13+
* http://www.apache.org/licenses/LICENSE-2.0
14+
*
15+
* Unless required by applicable law or agreed to in writing, software
16+
* distributed under the License is distributed on an "AS IS" BASIS,
17+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18+
* See the License for the specific language governing permissions and
19+
* limitations under the License.
20+
* #L%
21+
*/
22+
23+
import com.devicehive.proxy.ProxyMessageDispatcher;
24+
import com.devicehive.proxy.ProxyRequestHandler;
25+
import com.devicehive.proxy.api.NotificationHandler;
26+
import com.devicehive.proxy.api.ProxyClient;
27+
import com.devicehive.proxy.api.ProxyMessageBuilder;
28+
import com.devicehive.proxy.api.payload.TopicCreatePayload;
29+
import com.devicehive.proxy.api.payload.TopicSubscribePayload;
30+
import com.devicehive.proxy.client.WebSocketKafkaProxyClient;
31+
import com.devicehive.proxy.config.WebSocketKafkaProxyConfig;
32+
import com.devicehive.shim.api.server.MessageDispatcher;
33+
import com.google.gson.Gson;
34+
import org.springframework.context.annotation.Bean;
35+
import org.springframework.context.annotation.ComponentScan;
36+
import org.springframework.context.annotation.Configuration;
37+
import org.springframework.context.annotation.Profile;
38+
39+
import static com.devicehive.configuration.Constants.REQUEST_TOPIC;
40+
41+
@Configuration
42+
@Profile("ws-kafka-proxy")
43+
@ComponentScan({"com.devicehive.proxy.config", "com.devicehive.proxy.client"})
44+
public class BackendProxyClientConfig {
45+
46+
@Bean
47+
public NotificationHandler notificationHandler(Gson gson, RequestHandlersMapper requestHandlersMapper) {
48+
return new ProxyRequestHandler(gson, requestHandlersMapper);
49+
}
50+
51+
@Bean
52+
public ProxyClient proxyClient(NotificationHandler notificationHandler, WebSocketKafkaProxyConfig proxyConfig) {
53+
WebSocketKafkaProxyClient client = new WebSocketKafkaProxyClient(notificationHandler);
54+
client.setWebSocketKafkaProxyConfig(proxyConfig);
55+
client.start();
56+
client.push(ProxyMessageBuilder.create(new TopicCreatePayload(REQUEST_TOPIC)));
57+
client.push(ProxyMessageBuilder.subscribe(new TopicSubscribePayload(REQUEST_TOPIC))); // toDo: consumerGroup???
58+
return client;
59+
}
60+
61+
@Bean
62+
public MessageDispatcher messageDispatcher(Gson gson, WebSocketKafkaProxyConfig proxyConfig) {
63+
return new ProxyMessageDispatcher(gson, proxyConfig);
64+
}
65+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package com.devicehive.proxy;
2+
3+
/*
4+
* #%L
5+
* DeviceHive Backend Logic
6+
* %%
7+
* Copyright (C) 2016 - 2017 DataArt
8+
* %%
9+
* Licensed under the Apache License, Version 2.0 (the "License");
10+
* you may not use this file except in compliance with the License.
11+
* You may obtain a copy of the License at
12+
*
13+
* http://www.apache.org/licenses/LICENSE-2.0
14+
*
15+
* Unless required by applicable law or agreed to in writing, software
16+
* distributed under the License is distributed on an "AS IS" BASIS,
17+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18+
* See the License for the specific language governing permissions and
19+
* limitations under the License.
20+
* #L%
21+
*/
22+
23+
import com.devicehive.proxy.api.ProxyClient;
24+
import com.devicehive.proxy.api.ProxyMessage;
25+
import com.devicehive.proxy.api.ProxyMessageBuilder;
26+
import com.devicehive.proxy.api.payload.NotificationCreatePayload;
27+
import com.devicehive.proxy.client.WebSocketKafkaProxyClient;
28+
import com.devicehive.proxy.config.WebSocketKafkaProxyConfig;
29+
import com.devicehive.shim.api.Response;
30+
import com.devicehive.shim.api.server.MessageDispatcher;
31+
import com.google.gson.Gson;
32+
import org.springframework.beans.factory.annotation.Autowired;
33+
34+
public class ProxyMessageDispatcher implements MessageDispatcher {
35+
36+
private final Gson gson;
37+
private final ProxyClient proxyClient;
38+
39+
@Autowired
40+
public ProxyMessageDispatcher(Gson gson, WebSocketKafkaProxyConfig proxyConfig) {
41+
this.gson = gson;
42+
WebSocketKafkaProxyClient webSocketKafkaProxyClient = new WebSocketKafkaProxyClient((message, client) -> {});
43+
webSocketKafkaProxyClient.setWebSocketKafkaProxyConfig(proxyConfig);
44+
this.proxyClient = webSocketKafkaProxyClient;
45+
this.proxyClient.start();
46+
}
47+
48+
@Override
49+
public void send(String to, Response response) {
50+
ProxyMessage responseMessage = ProxyMessageBuilder.notification(new NotificationCreatePayload(to, gson.toJson(response)));
51+
proxyClient.push(responseMessage);
52+
}
53+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
package com.devicehive.proxy;
2+
3+
/*
4+
* #%L
5+
* DeviceHive Backend Logic
6+
* %%
7+
* Copyright (C) 2016 - 2017 DataArt
8+
* %%
9+
* Licensed under the Apache License, Version 2.0 (the "License");
10+
* you may not use this file except in compliance with the License.
11+
* You may obtain a copy of the License at
12+
*
13+
* http://www.apache.org/licenses/LICENSE-2.0
14+
*
15+
* Unless required by applicable law or agreed to in writing, software
16+
* distributed under the License is distributed on an "AS IS" BASIS,
17+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18+
* See the License for the specific language governing permissions and
19+
* limitations under the License.
20+
* #L%
21+
*/
22+
23+
import com.devicehive.application.RequestHandlersMapper;
24+
import com.devicehive.proxy.api.NotificationHandler;
25+
import com.devicehive.proxy.api.ProxyClient;
26+
import com.devicehive.proxy.api.ProxyMessage;
27+
import com.devicehive.proxy.api.ProxyMessageBuilder;
28+
import com.devicehive.proxy.api.payload.NotificationCreatePayload;
29+
import com.devicehive.shim.api.Action;
30+
import com.devicehive.shim.api.Request;
31+
import com.devicehive.shim.api.Response;
32+
import com.devicehive.shim.api.server.RequestHandler;
33+
import com.google.gson.Gson;
34+
import org.slf4j.Logger;
35+
import org.slf4j.LoggerFactory;
36+
import org.springframework.beans.factory.annotation.Autowired;
37+
import org.springframework.stereotype.Component;
38+
39+
import java.util.Optional;
40+
41+
@Component
42+
public class ProxyRequestHandler implements NotificationHandler {
43+
44+
private static final Logger logger = LoggerFactory.getLogger(ProxyRequestHandler.class);
45+
46+
private final Gson gson;
47+
private final RequestHandlersMapper requestHandlersMapper;
48+
49+
@Autowired
50+
public ProxyRequestHandler(Gson gson, RequestHandlersMapper requestHandlersMapper) {
51+
this.gson = gson;
52+
this.requestHandlersMapper = requestHandlersMapper;
53+
}
54+
55+
@Override
56+
public void handle(String message, ProxyClient client) {
57+
logger.debug("Received message from proxy client: " + message);
58+
final Request request = gson.fromJson(message, Request.class);
59+
final String replyTo = request.getReplyTo();
60+
61+
Response response;
62+
63+
switch (request.getType()) {
64+
case clientRequest:
65+
logger.debug("Client request received {}", request);
66+
response = handleClientRequest(request);
67+
break;
68+
case ping:
69+
logger.info("Ping request received from {}", replyTo);
70+
response = Response.newBuilder().buildSuccess();
71+
break;
72+
default:
73+
logger.warn("Unknown type of request received {} from client with topic {}, correlationId = {}",
74+
request.getType(), replyTo, request.getCorrelationId());
75+
response = Response.newBuilder()
76+
.buildFailed(404);
77+
}
78+
79+
// set correlationId explicitly to prevent missing it in request
80+
response.setCorrelationId(request.getCorrelationId());
81+
ProxyMessage responseMessage = ProxyMessageBuilder.notification(new NotificationCreatePayload(replyTo, gson.toJson(response)));
82+
client.push(responseMessage);
83+
}
84+
85+
private Response handleClientRequest(Request request) {
86+
Response response;
87+
final Action action = request.getBody().getAction();
88+
89+
RequestHandler requestHandler = requestHandlersMapper.requestHandlerMap().get(action);
90+
if (requestHandler == null) {
91+
throw new RuntimeException("Action '" + action + "' is not supported.");
92+
}
93+
try {
94+
response = Optional.ofNullable(requestHandler.handle(request))
95+
.orElseThrow(() -> new NullPointerException("Response must not be null"));
96+
} catch (Exception e) {
97+
logger.error("Unexpected exception occurred during request handling (action='{}', handler='{}')",
98+
request.getBody().getAction().name(), requestHandler.getClass().getCanonicalName(), e);
99+
100+
response = Response.newBuilder()
101+
.withLast(request.isSingleReplyExpected())
102+
.buildFailed(500);
103+
}
104+
return response;
105+
}
106+
}

devicehive-proxy-api/src/main/java/com/devicehive/proxy/api/NotificationHandler.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -22,5 +22,5 @@
2222

2323
public interface NotificationHandler {
2424

25-
void handle(String message);
25+
void handle(String message, ProxyClient client);
2626
}

devicehive-proxy-api/src/main/java/com/devicehive/proxy/api/payload/TopicCreatePayload.java

+5
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
* #L%
2121
*/
2222

23+
import java.util.Collections;
2324
import java.util.List;
2425

2526
public class TopicCreatePayload implements Payload {
@@ -30,6 +31,10 @@ public TopicCreatePayload(List<String> topics) {
3031
this.topics = topics;
3132
}
3233

34+
public TopicCreatePayload(String topic) {
35+
this.topics = Collections.singletonList(topic);
36+
}
37+
3338
public List<String> getTopics() {
3439
return topics;
3540
}

devicehive-proxy-ws-kafka-impl/src/main/java/com/devicehive/proxy/WebSocketApplication.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ public class WebSocketApplication {
3232

3333
public static void main(String[] args) {
3434

35-
ProxyClient client = new WebSocketKafkaProxyClient(message -> System.out.println("Received message: " + message));
35+
ProxyClient client = new WebSocketKafkaProxyClient((message, proxyClient) -> System.out.println("Received message: " + message));
3636
client.start();
3737

3838
CompletableFuture<ProxyMessage> healthFuture = client.push(ProxyMessageBuilder.health());

devicehive-proxy-ws-kafka-impl/src/main/java/com/devicehive/proxy/client/GsonProxyMessageDecoder.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,8 @@
3838

3939
class GsonProxyMessageDecoder implements Decoder.Text<List<ProxyMessage>> {
4040

41-
private static JsonParser parser = new JsonParser();
42-
private static Gson gson = new Gson();
41+
private static final JsonParser parser = new JsonParser();
42+
private static final Gson gson = new Gson();
4343

4444
@Override
4545
public List<ProxyMessage> decode(String s) throws DecodeException {
@@ -90,14 +90,14 @@ private ProxyMessage buildMessage(JsonObject object) {
9090
if (object.get("p") != null) {
9191
switch (type) {
9292
case "topic/create":
93-
decoded.withPayload(new TopicCreatePayload(gson.fromJson(object.get("p"), listType)));
93+
decoded.withPayload(new TopicCreatePayload((List<String>) gson.fromJson(object.get("p"), listType)));
9494
break;
9595
case "topic/list":
9696
decoded.withPayload(new TopicListPayload(gson.fromJson(object.get("p"),
9797
new TypeToken<List<TopicListPayload.TopicInfo>>() {}.getType())));
9898
break;
9999
case "topic/subscribe":
100-
decoded.withPayload(new TopicCreatePayload(gson.fromJson(object.get("p"), listType)));
100+
decoded.withPayload(new TopicCreatePayload((List<String>) gson.fromJson(object.get("p"), listType)));
101101
break;
102102
case "notif":
103103
decoded.withPayload(new NotificationPayload(gson.fromJson(object.get("p"), String.class)));

devicehive-proxy-ws-kafka-impl/src/main/java/com/devicehive/proxy/client/GsonProxyMessageEncoder.java

+16
Original file line numberDiff line numberDiff line change
@@ -21,18 +21,34 @@
2121
*/
2222

2323
import com.devicehive.proxy.api.ProxyMessage;
24+
import com.devicehive.proxy.api.payload.Payload;
25+
import com.devicehive.proxy.api.payload.TopicCreatePayload;
2426
import com.google.gson.Gson;
27+
import com.google.gson.JsonElement;
28+
import com.google.gson.JsonObject;
29+
import com.google.gson.reflect.TypeToken;
2530

2631
import javax.websocket.EncodeException;
2732
import javax.websocket.Encoder;
2833
import javax.websocket.EndpointConfig;
34+
import java.util.List;
2935

3036
class GsonProxyMessageEncoder implements Encoder.Text<ProxyMessage> {
3137

3238
private static Gson gson = new Gson();
3339

3440
@Override
3541
public String encode(ProxyMessage message) throws EncodeException {
42+
final Payload payload = message.getPayload();
43+
if (payload instanceof TopicCreatePayload) {
44+
JsonObject json = new JsonObject();
45+
JsonElement topicsJson = gson.toJsonTree(((TopicCreatePayload) payload).getTopics(), new TypeToken<List<String>>() {}.getType());
46+
json.addProperty("id", message.getId());
47+
json.addProperty("t", message.getType());
48+
json.addProperty("a", message.getAction());
49+
json.add("p", topicsJson);
50+
return gson.toJson(json);
51+
}
3652
return gson.toJson(message);
3753
}
3854

devicehive-proxy-ws-kafka-impl/src/main/java/com/devicehive/proxy/client/WebSocketKafkaProxyClient.java

+5-8
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import com.devicehive.proxy.config.WebSocketKafkaProxyConfig;
2929
import org.slf4j.Logger;
3030
import org.slf4j.LoggerFactory;
31-
import org.springframework.beans.factory.annotation.Autowired;
3231

3332
import javax.websocket.*;
3433
import java.io.IOException;
@@ -47,12 +46,6 @@ public class WebSocketKafkaProxyClient extends ProxyClient {
4746
private static final Logger logger = LoggerFactory.getLogger(WebSocketKafkaProxyClient.class);
4847

4948
private WebSocketKafkaProxyConfig webSocketKafkaProxyConfig;
50-
51-
@Autowired
52-
public void setWebSocketKafkaProxyConfig(WebSocketKafkaProxyConfig webSocketKafkaProxyConfig) {
53-
this.webSocketKafkaProxyConfig = webSocketKafkaProxyConfig;
54-
}
55-
5649
private Map<String, CompletableFuture<ProxyMessage>> futureMap;
5750
private Map<String, Boolean> ackReceived;
5851
private Session session;
@@ -131,9 +124,13 @@ public void onMessage(List<ProxyMessage> messages) {
131124

132125
if ("notif".equals(message.getType()) && message.getAction() == null) {
133126
NotificationPayload payload = (NotificationPayload) message.getPayload();
134-
notificationHandler.handle(payload.getValue());
127+
notificationHandler.handle(payload.getValue(), this);
135128
}
136129
logger.debug("Message {} was received", message);
137130
});
138131
}
132+
133+
public void setWebSocketKafkaProxyConfig(WebSocketKafkaProxyConfig webSocketKafkaProxyConfig) {
134+
this.webSocketKafkaProxyConfig = webSocketKafkaProxyConfig;
135+
}
139136
}

0 commit comments

Comments
 (0)