Skip to content

Commit a76f481

Browse files
committed
DEV-184 - Migration to Kafka-ws-proxy
1 parent 2d5f76d commit a76f481

File tree

52 files changed

+548
-178
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

52 files changed

+548
-178
lines changed

devicehive-backend/src/main/java/com/devicehive/application/RequestHandlersMapper.java

+38-28
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,26 @@
2222

2323
import com.devicehive.messages.handler.DeviceCreateHandler;
2424
import com.devicehive.messages.handler.PluginSubscribeRequestHandler;
25-
import com.devicehive.messages.handler.command.*;
26-
import com.devicehive.messages.handler.dao.list.*;
25+
import com.devicehive.messages.handler.PluginUnsubscribeRequestHandler;
26+
import com.devicehive.messages.handler.command.CommandGetSubscriptionRequestHandler;
27+
import com.devicehive.messages.handler.command.CommandInsertHandler;
28+
import com.devicehive.messages.handler.command.CommandSearchHandler;
29+
import com.devicehive.messages.handler.command.CommandSubscribeRequestHandler;
30+
import com.devicehive.messages.handler.command.CommandUpdateHandler;
31+
import com.devicehive.messages.handler.command.CommandUpdateSubscribeRequestHandler;
32+
import com.devicehive.messages.handler.command.CommandsUpdateHandler;
33+
import com.devicehive.messages.handler.dao.list.ListDeviceHandler;
34+
import com.devicehive.messages.handler.dao.list.ListNetworkHandler;
35+
import com.devicehive.messages.handler.dao.list.ListSubscribeHandler;
36+
import com.devicehive.messages.handler.dao.list.ListUserHandler;
2737
import com.devicehive.messages.handler.notification.NotificationSubscribeRequestHandler;
2838
import com.devicehive.messages.handler.command.CommandUnsubscribeRequestHandler;
2939
import com.devicehive.messages.handler.notification.NotificationInsertHandler;
3040
import com.devicehive.messages.handler.notification.NotificationSearchHandler;
3141
import com.devicehive.messages.handler.notification.NotificationUnsubscribeRequestHandler;
3242
import com.devicehive.shim.api.Action;
3343
import com.devicehive.shim.api.server.RequestHandler;
44+
import com.google.common.collect.ImmutableMap;
3445
import org.springframework.beans.factory.annotation.Autowired;
3546
import org.springframework.stereotype.Component;
3647

@@ -59,6 +70,7 @@ public class RequestHandlersMapper {
5970
private final ListSubscribeHandler listSubscribeHandler;
6071
private final DeviceCreateHandler deviceCreateHandler;
6172
private final PluginSubscribeRequestHandler pluginSubscribeRequestHandler;
73+
private final PluginUnsubscribeRequestHandler pluginUnsubscribeRequestHandler;
6274

6375
private Map<Action, RequestHandler> requestHandlerMap;
6476

@@ -81,7 +93,8 @@ public RequestHandlersMapper(CommandUpdateHandler commandUpdateHandler,
8193
CommandSubscribeRequestHandler commandSubscribeRequestHandler,
8294
CommandUnsubscribeRequestHandler commandUnsubscribeRequestHandler,
8395
CommandUpdateSubscribeRequestHandler commandUpdateSubscribeRequestHandler,
84-
PluginSubscribeRequestHandler pluginSubscribeRequestHandler) {
96+
PluginSubscribeRequestHandler pluginSubscribeRequestHandler,
97+
PluginUnsubscribeRequestHandler pluginUnsubscribeRequestHandler) {
8598
this.commandUpdateHandler = commandUpdateHandler;
8699
this.notificationSearchHandler = notificationSearchHandler;
87100
this.notificationInsertHandler = notificationInsertHandler;
@@ -100,35 +113,32 @@ public RequestHandlersMapper(CommandUpdateHandler commandUpdateHandler,
100113
this.commandUnsubscribeRequestHandler = commandUnsubscribeRequestHandler;
101114
this.commandUpdateSubscribeRequestHandler = commandUpdateSubscribeRequestHandler;
102115
this.pluginSubscribeRequestHandler = pluginSubscribeRequestHandler;
116+
this.pluginUnsubscribeRequestHandler = pluginUnsubscribeRequestHandler;
103117
}
104118

105119
@PostConstruct
106120
public void init() {
107-
requestHandlerMap = new HashMap<Action, RequestHandler>() {{
108-
put(Action.NOTIFICATION_SEARCH_REQUEST, notificationSearchHandler);
109-
put(Action.NOTIFICATION_INSERT_REQUEST, notificationInsertHandler);
110-
put(Action.NOTIFICATION_SUBSCRIBE_REQUEST, notificationSubscribeRequestHandler);
111-
put(Action.NOTIFICATION_UNSUBSCRIBE_REQUEST, notificationUnsubscribeRequestHandler);
112-
put(Action.COMMAND_INSERT_REQUEST, commandInsertHandler);
113-
put(Action.COMMAND_SEARCH_REQUEST, commandSearchHandler);
114-
put(Action.COMMAND_UPDATE_REQUEST, commandUpdateHandler);
115-
put(Action.COMMANDS_UPDATE_REQUEST, commandsUpdateHandler);
116-
put(Action.COMMAND_SUBSCRIBE_REQUEST, commandSubscribeRequestHandler);
117-
put(Action.COMMAND_UNSUBSCRIBE_REQUEST, commandUnsubscribeRequestHandler);
118-
put(Action.COMMAND_UPDATE_SUBSCRIBE_REQUEST, commandUpdateSubscribeRequestHandler);
119-
put(Action.COMMAND_GET_SUBSCRIPTION_REQUEST, commandGetSubscriptionRequestHandler);
120-
put(Action.PLUGIN_SUBSCRIBE_REQUEST, pluginSubscribeRequestHandler);
121-
122-
put(Action.LIST_USER_REQUEST, listUserHandler);
123-
124-
put(Action.LIST_NETWORK_REQUEST, listNetworkHandler);
125-
126-
put(Action.LIST_DEVICE_REQUEST, listDeviceHandler);
127-
128-
put(Action.LIST_SUBSCRIBE_REQUEST, listSubscribeHandler);
129-
130-
put(Action.DEVICE_CREATE_REQUEST, deviceCreateHandler);
131-
}};
121+
requestHandlerMap = ImmutableMap.<Action, RequestHandler>builder()
122+
.put(Action.NOTIFICATION_SEARCH_REQUEST, notificationSearchHandler)
123+
.put(Action.NOTIFICATION_INSERT_REQUEST, notificationInsertHandler)
124+
.put(Action.NOTIFICATION_SUBSCRIBE_REQUEST, notificationSubscribeRequestHandler)
125+
.put(Action.NOTIFICATION_UNSUBSCRIBE_REQUEST, notificationUnsubscribeRequestHandler)
126+
.put(Action.COMMAND_INSERT_REQUEST, commandInsertHandler)
127+
.put(Action.COMMAND_SEARCH_REQUEST, commandSearchHandler)
128+
.put(Action.COMMAND_UPDATE_REQUEST, commandUpdateHandler)
129+
.put(Action.COMMANDS_UPDATE_REQUEST, commandsUpdateHandler)
130+
.put(Action.COMMAND_SUBSCRIBE_REQUEST, commandSubscribeRequestHandler)
131+
.put(Action.COMMAND_UNSUBSCRIBE_REQUEST, commandUnsubscribeRequestHandler)
132+
.put(Action.COMMAND_UPDATE_SUBSCRIBE_REQUEST, commandUpdateSubscribeRequestHandler)
133+
.put(Action.COMMAND_GET_SUBSCRIPTION_REQUEST, commandGetSubscriptionRequestHandler)
134+
.put(Action.PLUGIN_SUBSCRIBE_REQUEST, pluginSubscribeRequestHandler)
135+
.put(Action.PLUGIN_UNSUBSCRIBE_REQUEST, pluginUnsubscribeRequestHandler)
136+
.put(Action.LIST_USER_REQUEST, listUserHandler)
137+
.put(Action.LIST_NETWORK_REQUEST, listNetworkHandler)
138+
.put(Action.LIST_DEVICE_REQUEST, listDeviceHandler)
139+
.put(Action.LIST_SUBSCRIBE_REQUEST, listSubscribeHandler)
140+
.put(Action.DEVICE_CREATE_REQUEST, deviceCreateHandler)
141+
.build();
132142
}
133143

134144
public Map<Action, RequestHandler> requestHandlerMap() {

devicehive-backend/src/main/java/com/devicehive/eventbus/SubscriberRegistry.java

+3-7
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@
2525
import com.hazelcast.core.HazelcastInstance;
2626
import com.hazelcast.core.MultiMap;
2727
import org.springframework.beans.factory.annotation.Autowired;
28-
import org.springframework.util.Assert;
2928

29+
import javax.validation.constraints.NotNull;
3030
import java.util.Collection;
3131
import java.util.Map;
3232
import java.util.Optional;
@@ -109,9 +109,7 @@ synchronized void unregister(Subscription subscription) {
109109
* @param subscription - subscription
110110
* @return - list of subscribers for subscription
111111
*/
112-
synchronized Collection<Subscriber> getSubscribers(Subscription subscription) {
113-
Assert.notNull(subscription);
114-
112+
synchronized Collection<Subscriber> getSubscribers(@NotNull Subscription subscription) {
115113
return Optional.ofNullable(subscriptions.get(subscription))
116114
.map(subIds -> subIds.stream().map(subscribers::get).collect(Collectors.toList()))
117115
.orElse(emptyList());
@@ -125,9 +123,7 @@ Subscriber getSubscriber(Long subscriptionId) {
125123
return this.subscribers.get(subscriptionId);
126124
}
127125

128-
Collection<Subscription> getSubscriptions(Subscriber subscriber) {
129-
Assert.notNull(subscriber);
130-
126+
Collection<Subscription> getSubscriptions(@NotNull Subscriber subscriber) {
131127
return Optional.ofNullable(subscriberSubscriptions.get(subscriber.getId()))
132128
.orElse(emptyList());
133129
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
package com.devicehive.messages.handler;
2+
3+
/*
4+
* #%L
5+
* DeviceHive Backend Logic
6+
* %%
7+
* Copyright (C) 2016 DataArt
8+
* %%
9+
* Licensed under the Apache License, Version 2.0 (the "License");
10+
* you may not use this file except in compliance with the License.
11+
* You may obtain a copy of the License at
12+
*
13+
* http://www.apache.org/licenses/LICENSE-2.0
14+
*
15+
* Unless required by applicable law or agreed to in writing, software
16+
* distributed under the License is distributed on an "AS IS" BASIS,
17+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18+
* See the License for the specific language governing permissions and
19+
* limitations under the License.
20+
* #L%
21+
*/
22+
23+
import com.devicehive.messages.handler.command.CommandUnsubscribeRequestHandler;
24+
import com.devicehive.messages.handler.notification.NotificationUnsubscribeRequestHandler;
25+
import com.devicehive.model.rpc.CommandUnsubscribeRequest;
26+
import com.devicehive.model.rpc.NotificationUnsubscribeRequest;
27+
import com.devicehive.model.rpc.PluginUnsubscribeRequest;
28+
import com.devicehive.model.rpc.PluginUnsubscribeResponse;
29+
import com.devicehive.shim.api.Request;
30+
import com.devicehive.shim.api.Response;
31+
import com.devicehive.shim.api.server.RequestHandler;
32+
import com.google.common.collect.ImmutableSet;
33+
import org.springframework.beans.factory.annotation.Autowired;
34+
import org.springframework.stereotype.Component;
35+
import org.springframework.util.Assert;
36+
37+
38+
@Component
39+
public class PluginUnsubscribeRequestHandler implements RequestHandler {
40+
41+
private CommandUnsubscribeRequestHandler commandUnsubscribeRequestHandler;
42+
private NotificationUnsubscribeRequestHandler notificationUnsubscribeRequestHandler;
43+
44+
@Autowired
45+
public void setCommandSubscribeRequestHandler(CommandUnsubscribeRequestHandler commandUnsubscribeRequestHandler) {
46+
this.commandUnsubscribeRequestHandler = commandUnsubscribeRequestHandler;
47+
}
48+
49+
@Autowired
50+
public void setNotificationSubscribeRequestHandler(NotificationUnsubscribeRequestHandler notificationUnsubscribeRequestHandler) {
51+
this.notificationUnsubscribeRequestHandler = notificationUnsubscribeRequestHandler;
52+
}
53+
54+
@Override
55+
public Response handle(Request request) {
56+
PluginUnsubscribeRequest body = (PluginUnsubscribeRequest) request.getBody();
57+
validate(body);
58+
59+
removeCommandSubscription(body);
60+
removeNotificationSubscription(body);
61+
62+
return Response.newBuilder()
63+
.withBody(new PluginUnsubscribeResponse(body.getSubscriptionId()))
64+
.withLast(false)
65+
.withCorrelationId(request.getCorrelationId())
66+
.buildSuccess();
67+
}
68+
69+
private Response removeNotificationSubscription(PluginUnsubscribeRequest body) {
70+
NotificationUnsubscribeRequest notificationUnsubscribeRequest =
71+
new NotificationUnsubscribeRequest(ImmutableSet.of(body.getSubscriptionId()));
72+
73+
Request notificationRequest = Request.newBuilder()
74+
.withBody(notificationUnsubscribeRequest)
75+
.withSingleReply(false)
76+
.build();
77+
notificationRequest.setReplyTo(body.getTopicName());
78+
return notificationUnsubscribeRequestHandler.handle(notificationRequest);
79+
}
80+
81+
private Response removeCommandSubscription(PluginUnsubscribeRequest body) {
82+
83+
CommandUnsubscribeRequest commandUnsubscribeRequest =
84+
new CommandUnsubscribeRequest(ImmutableSet.of(body.getSubscriptionId()));
85+
86+
Request commandRequest = Request.newBuilder()
87+
.withBody(commandUnsubscribeRequest)
88+
.withSingleReply(false)
89+
.build();
90+
commandRequest.setReplyTo(body.getTopicName());
91+
return commandUnsubscribeRequestHandler.handle(commandRequest);
92+
}
93+
94+
private void validate(PluginUnsubscribeRequest request) {
95+
Assert.notNull(request, "Request body is null");
96+
Assert.notNull(request.getSubscriptionId(), "Subscription id not provided");
97+
}
98+
99+
}

devicehive-common-auth/pom.xml

+4
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,10 @@
4545
<artifactId>jjwt</artifactId>
4646
<version>${jjwt.version}</version>
4747
</dependency>
48+
<dependency>
49+
<groupId>org.apache.httpcomponents</groupId>
50+
<artifactId>httpclient</artifactId>
51+
</dependency>
4852
</dependencies>
4953

5054
</project>

devicehive-common-auth/src/main/java/com/devicehive/auth/HiveAuthentication.java

+2
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
import java.util.Collection;
2828

2929
public class HiveAuthentication extends PreAuthenticatedAuthenticationToken {
30+
private static final long serialVersionUID = 3994727745773385047L;
31+
3032
private HivePrincipal hivePrincipal;
3133

3234
public HiveAuthentication(Object aPrincipal, Collection<? extends GrantedAuthority> anAuthorities) {

devicehive-common-auth/src/main/java/com/devicehive/resource/exceptions/ExpiredTokenException.java

+1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.springframework.security.core.AuthenticationException;
2424

2525
public class ExpiredTokenException extends AuthenticationException {
26+
private static final long serialVersionUID = -7505827097648274298L;
2627

2728
public ExpiredTokenException(String message) {
2829
super(message);

devicehive-common-auth/src/main/java/com/devicehive/service/exception/BackendException.java

+2
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
*/
2222

2323
public class BackendException extends Exception {
24+
private static final long serialVersionUID = -8016952729321715123L;
25+
2426

2527
private int errorCode;
2628

devicehive-frontend/src/main/java/com/devicehive/service/helpers/HttpRestHelper.java devicehive-common-auth/src/main/java/com/devicehive/service/helpers/HttpRestHelper.java

+22-21
Original file line numberDiff line numberDiff line change
@@ -23,42 +23,29 @@
2323

2424
import com.devicehive.exceptions.HiveException;
2525
import com.devicehive.model.ErrorResponse;
26-
import com.devicehive.model.updates.UserUpdate;
2726
import com.google.gson.Gson;
28-
import io.jsonwebtoken.lang.Collections;
29-
import org.apache.http.Header;
30-
import org.apache.http.HeaderIterator;
3127
import org.apache.http.HttpEntity;
32-
import org.apache.http.HttpResponse;
33-
import org.apache.http.NameValuePair;
34-
import org.apache.http.ProtocolVersion;
35-
import org.apache.http.RequestLine;
36-
import org.apache.http.client.ClientProtocolException;
37-
import org.apache.http.client.entity.UrlEncodedFormEntity;
3828
import org.apache.http.client.methods.CloseableHttpResponse;
3929
import org.apache.http.client.methods.HttpGet;
4030
import org.apache.http.client.methods.HttpPost;
41-
import org.apache.http.client.methods.HttpUriRequest;
31+
import org.apache.http.client.methods.HttpRequestBase;
4232
import org.apache.http.entity.StringEntity;
4333
import org.apache.http.impl.client.CloseableHttpClient;
44-
import org.apache.http.impl.client.HttpClientBuilder;
4534
import org.apache.http.impl.client.HttpClients;
4635
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
47-
import org.apache.http.message.BasicNameValuePair;
48-
import org.apache.http.params.HttpParams;
4936
import org.apache.http.util.EntityUtils;
5037
import org.springframework.beans.factory.annotation.Autowired;
51-
import org.springframework.security.access.method.P;
5238
import org.springframework.stereotype.Component;
5339
import org.springframework.util.StringUtils;
5440

5541
import javax.annotation.PostConstruct;
42+
import javax.ws.rs.ServiceUnavailableException;
5643
import javax.ws.rs.core.MediaType;
44+
import javax.ws.rs.core.Response;
5745
import java.io.IOException;
5846
import java.nio.charset.Charset;
5947

6048
import static com.devicehive.configuration.Constants.UTF8;
61-
import static javax.ws.rs.core.Response.Status.BAD_REQUEST;
6249
import static javax.ws.rs.core.Response.Status.CREATED;
6350
import static javax.ws.rs.core.Response.Status.OK;
6451

@@ -96,19 +83,33 @@ public <T> T post(String url, String jsonObject, Class<T> type, String token) {
9683
httpPost.addHeader(AUTHORIZATION, TOKEN_PREFIX + token);
9784
}
9885

99-
try (CloseableHttpResponse response = httpClient.execute(httpPost)) {
86+
return httpRequest(httpPost, type, CREATED);
87+
}
88+
89+
public <T> T get(String url, Class<T> type, String token) {
90+
HttpGet httpGet = new HttpGet(url);
91+
httpGet.addHeader("Content-Type", MediaType.APPLICATION_JSON);
92+
if (!StringUtils.isEmpty(token)) {
93+
httpGet.addHeader(AUTHORIZATION, TOKEN_PREFIX + token);
94+
}
95+
96+
return httpRequest(httpGet, type, OK);
97+
}
98+
99+
private <T> T httpRequest(HttpRequestBase httpRequestBase, Class<T> type, Response.Status status) {
100+
try (CloseableHttpResponse response = httpClient.execute(httpRequestBase)) {
100101
int statusCode = response.getStatusLine().getStatusCode();
101102
HttpEntity entity = response.getEntity();
102103

103-
if (statusCode != CREATED.getStatusCode()) {
104+
if (statusCode != status.getStatusCode()) {
104105
ErrorResponse errorResponse = gson.fromJson(EntityUtils.toString(entity), ErrorResponse.class);
105106
throw new HiveException(errorResponse.getMessage(), errorResponse.getError());
106107
}
107-
108+
108109
return gson.fromJson(EntityUtils.toString(entity), type);
109110
} catch (IOException e) {
110-
throw new HiveException("Auth service is not responding", BAD_REQUEST.getStatusCode());
111+
throw new ServiceUnavailableException("Service is not responding");
111112
}
112113
}
113-
114+
114115
}

0 commit comments

Comments
 (0)