) milliseconds
+ serverHeartbeat = Math.max(serverHeartbeatNew, Integer.parseInt(heartbeats[0]));
+ }
+ }
+ if (clientHeartbeat > 0 || serverHeartbeat > 0) {
+ scheduler = Schedulers.io();
+ if (clientHeartbeat > 0) {
+ //client MUST/WANT send heart-beat
+ Log.d(TAG, "Client will send heart-beat every " + clientHeartbeat + " ms");
+ scheduleClientHeartBeat();
+ }
+ if (serverHeartbeat > 0) {
+ Log.d(TAG, "Client will listen to server heart-beat every " + serverHeartbeat + " ms");
+ //client WANT to listen to server heart-beat
+ scheduleServerHeartBeatCheck();
+
+ // initialize the server heartbeat
+ lastServerHeartBeat = System.currentTimeMillis();
+ }
+ }
+ }
+
+ private void scheduleServerHeartBeatCheck() {
+ if (serverHeartbeat > 0 && scheduler != null) {
+ final long now = System.currentTimeMillis();
+ Log.d(TAG, "Scheduling server heart-beat to be checked in " + serverHeartbeat + " ms and now is '" + now + "'");
+ //add some slack on the check
+ serverCheckHeartBeatTask = scheduler.scheduleDirect(() ->
+ checkServerHeartBeat(), serverHeartbeat, TimeUnit.MILLISECONDS);
+ }
+ }
+
+ private void checkServerHeartBeat() {
+ if (serverHeartbeat > 0) {
+ final long now = System.currentTimeMillis();
+ //use a forgiving boundary as some heart beats can be delayed or lost.
+ final long boundary = now - (3 * serverHeartbeat);
+ //we need to check because the task could failed to abort
+ if (lastServerHeartBeat < boundary) {
+ Log.d(TAG, "It's a sad day ;( Server didn't send heart-beat on time. Last received at '" + lastServerHeartBeat + "' and now is '" + now + "'");
+ if (failedListener != null) {
+ failedListener.onServerHeartBeatFailed();
+ }
+ } else {
+ Log.d(TAG, "We were checking and server sent heart-beat on time. So well-behaved :)");
+ lastServerHeartBeat = System.currentTimeMillis();
+ }
+ }
+ }
+
+ /**
+ * Used to abort the server heart-beat check.
+ */
+ private void abortServerHeartBeatCheck() {
+ lastServerHeartBeat = System.currentTimeMillis();
+ Log.d(TAG, "Aborted last check because server sent heart-beat on time ('" + lastServerHeartBeat + "'). So well-behaved :)");
+ if (serverCheckHeartBeatTask != null) {
+ serverCheckHeartBeatTask.dispose();
+ }
+ scheduleServerHeartBeatCheck();
+ }
+
+ /**
+ * Schedule a client heart-beat if clientHeartbeat > 0.
+ */
+ private void scheduleClientHeartBeat() {
+ if (clientHeartbeat > 0 && scheduler != null) {
+ Log.d(TAG, "Scheduling client heart-beat to be sent in " + clientHeartbeat + " ms");
+ clientSendHeartBeatTask = scheduler.scheduleDirect(() ->
+ sendClientHeartBeat(), clientHeartbeat, TimeUnit.MILLISECONDS);
+ }
+ }
+
+ /**
+ * Send the raw heart-beat to the server.
+ */
+ private void sendClientHeartBeat() {
+ sendCallback.sendClientHeartBeat("\r\n");
+ Log.d(TAG, "PING >>>");
+ //schedule next client heart beat
+ this.scheduleClientHeartBeat();
+ }
+
+ /**
+ * Used when we have a scheduled heart-beat and we send a new message to the server.
+ * The new message will work as an heart-beat so we can abort current one and schedule another
+ */
+ private void abortClientHeartBeatSend() {
+ if (clientSendHeartBeatTask != null) {
+ clientSendHeartBeatTask.dispose();
+ }
+ scheduleClientHeartBeat();
+ }
+
+ public interface FailedListener {
+ void onServerHeartBeatFailed();
+ }
+
+ public interface SendCallback {
+ void sendClientHeartBeat(String pingMessage);
+ }
+}
diff --git a/lib/src/main/java/ua/naiksoftware/stomp/Stomp.java b/lib/src/main/java/ua/naiksoftware/stomp/Stomp.java
index 5a1d1c5..db65635 100644
--- a/lib/src/main/java/ua/naiksoftware/stomp/Stomp.java
+++ b/lib/src/main/java/ua/naiksoftware/stomp/Stomp.java
@@ -1,42 +1,73 @@
package ua.naiksoftware.stomp;
-import org.java_websocket.WebSocket;
+import android.support.annotation.NonNull;
+import android.support.annotation.Nullable;
import java.util.Map;
-import ua.naiksoftware.stomp.client.StompClient;
+import okhttp3.OkHttpClient;
+import ua.naiksoftware.stomp.provider.OkHttpConnectionProvider;
+import ua.naiksoftware.stomp.provider.WebSocketsConnectionProvider;
/**
* Supported overlays:
- * - org.java_websocket.WebSocket ('org.java-websocket:Java-WebSocket:1.3.0')
- *
- * You can add own relay, just implement ConnectionProvider for you stomp transport,
- * such as web socket.
- *
+ * - org.java_websocket.WebSocket ('org.java-websocket:Java-WebSocket:1.3.2')
+ * - okhttp3.WebSocket ('com.squareup.okhttp3:okhttp:3.8.1')
+ *
+ * You can add own relay, just implement ConnectionProvider for you stomp transport,
+ * such as web socket.
+ *
* Created by naik on 05.05.16.
*/
public class Stomp {
- public static StompClient over(Class clazz, String uri) {
- return over(clazz, uri, null);
+ public static StompClient over(@NonNull ConnectionProvider connectionProvider, String uri) {
+ return over(connectionProvider, uri, null, null);
}
/**
+ * @param connectionProvider connectionProvider method
+ * @param uri URI to connect
+ * @param connectHttpHeaders HTTP headers, will be passed with handshake query, may be null
+ * @return StompClient for receiving and sending messages. Call #StompClient.connect
+ */
+ public static StompClient over(@NonNull ConnectionProvider connectionProvider, String uri, Map connectHttpHeaders) {
+ return over(connectionProvider, uri, connectHttpHeaders, null);
+ }
+
+ /**
+ * {@code webSocketClient} can accept the following type of clients:
+ *
+ * - {@code org.java_websocket.WebSocket}: cannot accept an existing client
+ * - {@code okhttp3.WebSocket}: can accept a non-null instance of {@code okhttp3.OkHttpClient}
+ *
*
- * @param clazz class for using as transport
- * @param uri URI to connect
+ * @param connectionProvider connectionProvider method
+ * @param uri URI to connect
* @param connectHttpHeaders HTTP headers, will be passed with handshake query, may be null
+ * @param okHttpClient Existing client that will be used to open the WebSocket connection, may be null to use default client
* @return StompClient for receiving and sending messages. Call #StompClient.connect
*/
- public static StompClient over(Class clazz, String uri, Map connectHttpHeaders) {
- if (clazz == WebSocket.class) {
+ public static StompClient over(@NonNull ConnectionProvider connectionProvider, String uri, @Nullable Map connectHttpHeaders, @Nullable OkHttpClient okHttpClient) {
+ if (connectionProvider == ConnectionProvider.JWS) {
+ if (okHttpClient != null) {
+ throw new IllegalArgumentException("You cannot pass an OkHttpClient when using JWS. Use null instead.");
+ }
return createStompClient(new WebSocketsConnectionProvider(uri, connectHttpHeaders));
}
- throw new RuntimeException("Not supported overlay transport: " + clazz.getName());
+ if (connectionProvider == ConnectionProvider.OKHTTP) {
+ return createStompClient(new OkHttpConnectionProvider(uri, connectHttpHeaders, (okHttpClient == null) ? new OkHttpClient() : okHttpClient));
+ }
+
+ throw new IllegalArgumentException("ConnectionProvider type not supported: " + connectionProvider.toString());
}
- private static StompClient createStompClient(ConnectionProvider connectionProvider) {
+ private static StompClient createStompClient(ua.naiksoftware.stomp.provider.ConnectionProvider connectionProvider) {
return new StompClient(connectionProvider);
}
+
+ public enum ConnectionProvider {
+ OKHTTP, JWS
+ }
}
diff --git a/lib/src/main/java/ua/naiksoftware/stomp/StompClient.java b/lib/src/main/java/ua/naiksoftware/stomp/StompClient.java
new file mode 100644
index 0000000..e4f73cc
--- /dev/null
+++ b/lib/src/main/java/ua/naiksoftware/stomp/StompClient.java
@@ -0,0 +1,335 @@
+package ua.naiksoftware.stomp;
+
+import android.annotation.SuppressLint;
+import android.support.annotation.NonNull;
+import android.support.annotation.Nullable;
+import android.util.Log;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+import io.reactivex.BackpressureStrategy;
+import io.reactivex.Completable;
+import io.reactivex.CompletableSource;
+import io.reactivex.Flowable;
+import io.reactivex.disposables.Disposable;
+import io.reactivex.subjects.BehaviorSubject;
+import io.reactivex.subjects.PublishSubject;
+import ua.naiksoftware.stomp.dto.StompCommand;
+import ua.naiksoftware.stomp.dto.StompMessage;
+import ua.naiksoftware.stomp.pathmatcher.PathMatcher;
+import ua.naiksoftware.stomp.pathmatcher.SimplePathMatcher;
+import ua.naiksoftware.stomp.provider.ConnectionProvider;
+import ua.naiksoftware.stomp.dto.LifecycleEvent;
+import ua.naiksoftware.stomp.dto.StompHeader;
+
+/**
+ * Created by naik on 05.05.16.
+ */
+public class StompClient {
+
+ private static final String TAG = StompClient.class.getSimpleName();
+
+ public static final String SUPPORTED_VERSIONS = "1.1,1.2";
+ public static final String DEFAULT_ACK = "auto";
+
+ private final ConnectionProvider connectionProvider;
+ private ConcurrentHashMap topics;
+ private boolean legacyWhitespace;
+
+ private PublishSubject messageStream;
+ private BehaviorSubject connectionStream;
+ private ConcurrentHashMap> streamMap;
+ private PathMatcher pathMatcher;
+ private Disposable lifecycleDisposable;
+ private Disposable messagesDisposable;
+ private PublishSubject lifecyclePublishSubject;
+ private List headers;
+ private HeartBeatTask heartBeatTask;
+
+ public StompClient(ConnectionProvider connectionProvider) {
+ this.connectionProvider = connectionProvider;
+ streamMap = new ConcurrentHashMap<>();
+ lifecyclePublishSubject = PublishSubject.create();
+ pathMatcher = new SimplePathMatcher();
+ heartBeatTask = new HeartBeatTask(this::sendHeartBeat, () -> {
+ lifecyclePublishSubject.onNext(new LifecycleEvent(LifecycleEvent.Type.FAILED_SERVER_HEARTBEAT));
+ });
+ }
+
+ /**
+ * Sets the heartbeat interval to request from the server.
+ *
+ * Not very useful yet, because we don't have any heartbeat logic on our side.
+ *
+ * @param ms heartbeat time in milliseconds
+ */
+ public StompClient withServerHeartbeat(int ms) {
+ heartBeatTask.setServerHeartbeat(ms);
+ return this;
+ }
+
+ /**
+ * Sets the heartbeat interval that client propose to send.
+ *
+ * Not very useful yet, because we don't have any heartbeat logic on our side.
+ *
+ * @param ms heartbeat time in milliseconds
+ */
+ public StompClient withClientHeartbeat(int ms) {
+ heartBeatTask.setClientHeartbeat(ms);
+ return this;
+ }
+
+ /**
+ * Connect without reconnect if connected
+ */
+ public void connect() {
+ connect(null);
+ }
+
+ /**
+ * Connect to websocket. If already connected, this will silently fail.
+ *
+ * @param _headers HTTP headers to send in the INITIAL REQUEST, i.e. during the protocol upgrade
+ */
+ public void connect(@Nullable List _headers) {
+
+ Log.d(TAG, "Connect");
+
+ this.headers = _headers;
+
+ if (isConnected()) {
+ Log.d(TAG, "Already connected, ignore");
+ return;
+ }
+ lifecycleDisposable = connectionProvider.lifecycle()
+ .subscribe(lifecycleEvent -> {
+ switch (lifecycleEvent.getType()) {
+ case OPENED:
+ List headers = new ArrayList<>();
+ headers.add(new StompHeader(StompHeader.VERSION, SUPPORTED_VERSIONS));
+ headers.add(new StompHeader(StompHeader.HEART_BEAT,
+ heartBeatTask.getClientHeartbeat() + "," + heartBeatTask.getServerHeartbeat()));
+
+ if (_headers != null) headers.addAll(_headers);
+
+ connectionProvider.send(new StompMessage(StompCommand.CONNECT, headers, null).compile(legacyWhitespace))
+ .subscribe(() -> {
+ Log.d(TAG, "Publish open");
+ lifecyclePublishSubject.onNext(lifecycleEvent);
+ });
+ break;
+
+ case CLOSED:
+ Log.d(TAG, "Socket closed");
+ disconnect();
+ break;
+
+ case ERROR:
+ Log.d(TAG, "Socket closed with error");
+ lifecyclePublishSubject.onNext(lifecycleEvent);
+ break;
+ }
+ });
+
+ messagesDisposable = connectionProvider.messages()
+ .map(StompMessage::from)
+ .filter(heartBeatTask::consumeHeartBeat)
+ .doOnNext(getMessageStream()::onNext)
+ .filter(msg -> msg.getStompCommand().equals(StompCommand.CONNECTED))
+ .subscribe(stompMessage -> {
+ getConnectionStream().onNext(true);
+ }, onError -> {
+ Log.e(TAG, "Error parsing message", onError);
+ });
+ }
+
+ synchronized private BehaviorSubject getConnectionStream() {
+ if (connectionStream == null || connectionStream.hasComplete()) {
+ connectionStream = BehaviorSubject.createDefault(false);
+ }
+ return connectionStream;
+ }
+
+ synchronized private PublishSubject getMessageStream() {
+ if (messageStream == null || messageStream.hasComplete()) {
+ messageStream = PublishSubject.create();
+ }
+ return messageStream;
+ }
+
+ public Completable send(String destination) {
+ return send(destination, null);
+ }
+
+ public Completable send(String destination, String data) {
+ return send(new StompMessage(
+ StompCommand.SEND,
+ Collections.singletonList(new StompHeader(StompHeader.DESTINATION, destination)),
+ data));
+ }
+
+ public Completable send(@NonNull StompMessage stompMessage) {
+ Completable completable = connectionProvider.send(stompMessage.compile(legacyWhitespace));
+ CompletableSource connectionComplete = getConnectionStream()
+ .filter(isConnected -> isConnected)
+ .firstElement().ignoreElement();
+ return completable
+ .startWith(connectionComplete);
+ }
+
+ @SuppressLint("CheckResult")
+ private void sendHeartBeat(@NonNull String pingMessage) {
+ Completable completable = connectionProvider.send(pingMessage);
+ CompletableSource connectionComplete = getConnectionStream()
+ .filter(isConnected -> isConnected)
+ .firstElement().ignoreElement();
+ completable.startWith(connectionComplete)
+ .onErrorComplete()
+ .subscribe();
+ }
+
+ public Flowable lifecycle() {
+ return lifecyclePublishSubject.toFlowable(BackpressureStrategy.BUFFER);
+ }
+
+ /**
+ * Disconnect from server, and then reconnect with the last-used headers
+ */
+ @SuppressLint("CheckResult")
+ public void reconnect() {
+ disconnectCompletable()
+ .subscribe(() -> connect(headers),
+ e -> Log.e(TAG, "Disconnect error", e));
+ }
+
+ @SuppressLint("CheckResult")
+ public void disconnect() {
+ disconnectCompletable().subscribe(() -> {
+ }, e -> Log.e(TAG, "Disconnect error", e));
+ }
+
+ public Completable disconnectCompletable() {
+
+ heartBeatTask.shutdown();
+
+ if (lifecycleDisposable != null) {
+ lifecycleDisposable.dispose();
+ }
+ if (messagesDisposable != null) {
+ messagesDisposable.dispose();
+ }
+
+ return connectionProvider.disconnect()
+ .doFinally(() -> {
+ Log.d(TAG, "Stomp disconnected");
+ getConnectionStream().onComplete();
+ getMessageStream().onComplete();
+ lifecyclePublishSubject.onNext(new LifecycleEvent(LifecycleEvent.Type.CLOSED));
+ });
+ }
+
+ public Flowable topic(String destinationPath) {
+ return topic(destinationPath, null);
+ }
+
+ public Flowable topic(@NonNull String destPath, List headerList) {
+ if (destPath == null)
+ return Flowable.error(new IllegalArgumentException("Topic path cannot be null"));
+ else if (!streamMap.containsKey(destPath))
+ streamMap.put(destPath,
+ Completable.defer(() -> subscribePath(destPath, headerList)).andThen(
+ getMessageStream()
+ .filter(msg -> pathMatcher.matches(destPath, msg))
+ .toFlowable(BackpressureStrategy.BUFFER)
+ .doFinally(() -> unsubscribePath(destPath).subscribe())
+ .share())
+ );
+ return streamMap.get(destPath);
+ }
+
+ private Completable subscribePath(String destinationPath, @Nullable List headerList) {
+ String topicId = UUID.randomUUID().toString();
+
+ if (topics == null) topics = new ConcurrentHashMap<>();
+
+ // Only continue if we don't already have a subscription to the topic
+ if (topics.containsKey(destinationPath)) {
+ Log.d(TAG, "Attempted to subscribe to already-subscribed path!");
+ return Completable.complete();
+ }
+
+ topics.put(destinationPath, topicId);
+ List headers = new ArrayList<>();
+ headers.add(new StompHeader(StompHeader.ID, topicId));
+ headers.add(new StompHeader(StompHeader.DESTINATION, destinationPath));
+ headers.add(new StompHeader(StompHeader.ACK, DEFAULT_ACK));
+ if (headerList != null) headers.addAll(headerList);
+ return send(new StompMessage(StompCommand.SUBSCRIBE,
+ headers, null))
+ .doOnError(throwable -> unsubscribePath(destinationPath).subscribe());
+ }
+
+
+ private Completable unsubscribePath(String dest) {
+ streamMap.remove(dest);
+
+ String topicId = topics.get(dest);
+
+ if (topicId == null) {
+ return Completable.complete();
+ }
+
+ topics.remove(dest);
+
+ Log.d(TAG, "Unsubscribe path: " + dest + " id: " + topicId);
+
+ return send(new StompMessage(StompCommand.UNSUBSCRIBE,
+ Collections.singletonList(new StompHeader(StompHeader.ID, topicId)), null)).onErrorComplete();
+ }
+
+ /**
+ * Set the wildcard or other matcher for Topic subscription.
+ *
+ * Right now, the only options are simple, rmq supported.
+ * But you can write you own matcher by implementing {@link PathMatcher}
+ *
+ * When set to {@link ua.naiksoftware.stomp.pathmatcher.RabbitPathMatcher}, topic subscription allows for RMQ-style wildcards.
+ *
+ *
+ * @param pathMatcher Set to {@link SimplePathMatcher} by default
+ */
+ public void setPathMatcher(PathMatcher pathMatcher) {
+ this.pathMatcher = pathMatcher;
+ }
+
+ public boolean isConnected() {
+ return getConnectionStream().getValue();
+ }
+
+ /**
+ * Reverts to the old frame formatting, which included two newlines between the message body
+ * and the end-of-frame marker.
+ *
+ * Legacy: Body\n\n^@
+ *
+ * Default: Body^@
+ *
+ * @param legacyWhitespace whether to append an extra two newlines
+ * @see The STOMP spec
+ */
+ public void setLegacyWhitespace(boolean legacyWhitespace) {
+ this.legacyWhitespace = legacyWhitespace;
+ }
+
+ /** returns the to topic (subscription id) corresponding to a given destination
+ * @param dest the destination
+ * @return the topic (subscription id) or null if no topic corresponds to the destination */
+ public String getTopicId(String dest) {
+ return topics.get(dest);
+ }
+}
diff --git a/lib/src/main/java/ua/naiksoftware/stomp/client/StompClient.java b/lib/src/main/java/ua/naiksoftware/stomp/client/StompClient.java
deleted file mode 100644
index 9973ccb..0000000
--- a/lib/src/main/java/ua/naiksoftware/stomp/client/StompClient.java
+++ /dev/null
@@ -1,206 +0,0 @@
-package ua.naiksoftware.stomp.client;
-
-import android.util.Log;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-
-import rx.Observable;
-import rx.Subscriber;
-import rx.Subscription;
-import rx.observables.ConnectableObservable;
-import ua.naiksoftware.stomp.ConnectionProvider;
-import ua.naiksoftware.stomp.LifecycleEvent;
-import ua.naiksoftware.stomp.StompHeader;
-
-/**
- * Created by naik on 05.05.16.
- */
-public class StompClient {
-
- private static final String TAG = StompClient.class.getSimpleName();
-
- public static final String SUPPORTED_VERSIONS = "1.1,1.0";
- public static final String DEFAULT_ACK = "auto";
-
- private Subscription mMessagesSubscription;
- private Map>> mSubscribers = new HashMap<>();
- private List> mWaitConnectionObservables;
- private final ConnectionProvider mConnectionProvider;
- private HashMap mTopics;
- private boolean mConnected;
-
- public StompClient(ConnectionProvider connectionProvider) {
- mConnectionProvider = connectionProvider;
- mWaitConnectionObservables = new ArrayList<>();
- }
-
- /**
- * Connect without reconnect if connected
- */
- public void connect() {
- connect(null);
- }
-
- public void connect(boolean reconnect) {
- connect(null, reconnect);
- }
-
- /**
- * Connect without reconnect if connected
- *
- * @param _headers might be null
- */
- public void connect(List _headers) {
- connect(_headers, false);
- }
-
- /**
- * If already connected and reconnect=false - nope
- *
- * @param _headers might be null
- */
- public void connect(List _headers, boolean reconnect) {
- if (reconnect) disconnect();
- if (mConnected) return;
- mConnectionProvider.getLifecycleReceiver()
- .subscribe(lifecycleEvent -> {
- switch (lifecycleEvent.getType()) {
- case OPENED:
- List headers = new ArrayList<>();
- headers.add(new StompHeader(StompHeader.VERSION, SUPPORTED_VERSIONS));
- if (_headers != null) headers.addAll(_headers);
- mConnectionProvider.send(new StompMessage(StompCommand.CONNECT, headers, null).compile())
- .subscribe();
- break;
-
- case CLOSED:
- mConnected = false;
- break;
- }
- });
-
- mMessagesSubscription = mConnectionProvider.messages()
- .map(StompMessage::from)
- .subscribe(stompMessage -> {
- if (stompMessage.getStompCommand().equals(StompCommand.CONNECTED)) {
- mConnected = true;
- for (ConnectableObservable observable : mWaitConnectionObservables) {
- observable.connect();
- }
- mWaitConnectionObservables.clear();
- }
- callSubscribers(stompMessage);
- });
- }
-
- public Observable send(String destination) {
- return send(new StompMessage(
- StompCommand.SEND,
- Collections.singletonList(new StompHeader(StompHeader.DESTINATION, destination)),
- null));
- }
-
- public Observable send(String destination, String data) {
- return send(new StompMessage(
- StompCommand.SEND,
- Collections.singletonList(new StompHeader(StompHeader.DESTINATION, destination)),
- data));
- }
-
- public Observable send(StompMessage stompMessage) {
- Observable observable = mConnectionProvider.send(stompMessage.compile());
- if (!mConnected) {
- ConnectableObservable deffered = observable.publish();
- mWaitConnectionObservables.add(deffered);
- return deffered;
- } else {
- return observable;
- }
- }
-
- private void callSubscribers(StompMessage stompMessage) {
- String messageDestination = stompMessage.findHeader(StompHeader.DESTINATION);
- for (String dest : mSubscribers.keySet()) {
- if (dest.equals(messageDestination)) {
- for (Subscriber super StompMessage> subscriber : mSubscribers.get(dest)) {
- subscriber.onNext(stompMessage);
- }
- return;
- }
- }
- }
-
- public Observable lifecycle() {
- return mConnectionProvider.getLifecycleReceiver();
- }
-
- public void disconnect() {
- if (mMessagesSubscription != null) mMessagesSubscription.unsubscribe();
- mConnected = false;
- }
-
- public Observable topic(String destinationPath) {
- return topic(destinationPath, null);
- }
-
- public Observable topic(String destinationPath, List headerList) {
- return Observable.create(subscriber -> {
- Set> subscribersSet = mSubscribers.get(destinationPath);
- if (subscribersSet == null) {
- subscribersSet = new HashSet<>();
- mSubscribers.put(destinationPath, subscribersSet);
- subscribePath(destinationPath, headerList).subscribe();
- }
- subscribersSet.add(subscriber);
-
- }).doOnUnsubscribe(() -> {
- for (String dest : mSubscribers.keySet()) {
- Set> set = mSubscribers.get(dest);
- for (Subscriber super StompMessage> subscriber : set) {
- if (subscriber.isUnsubscribed()) {
- set.remove(subscriber);
- if (set.size() < 1) {
- mSubscribers.remove(dest);
- unsubscribePath(dest).subscribe();
- }
- }
- }
- }
- });
- }
-
- private Observable subscribePath(String destinationPath, List headerList) {
- if (destinationPath == null) return Observable.empty();
- String topicId = UUID.randomUUID().toString();
-
- if (mTopics == null) mTopics = new HashMap<>();
- mTopics.put(destinationPath, topicId);
- List headers = new ArrayList<>();
- headers.add(new StompHeader(StompHeader.ID, topicId));
- headers.add(new StompHeader(StompHeader.DESTINATION, destinationPath));
- headers.add(new StompHeader(StompHeader.ACK, DEFAULT_ACK));
- if (headerList != null) headers.addAll(headerList);
- return send(new StompMessage(StompCommand.SUBSCRIBE,
- headers, null));
- }
-
-
- private Observable unsubscribePath(String dest) {
- String topicId = mTopics.get(dest);
- Log.d(TAG, "Unsubscribe path: " + dest + " id: " + topicId);
-
- return send(new StompMessage(StompCommand.UNSUBSCRIBE,
- Collections.singletonList(new StompHeader(StompHeader.ID, topicId)), null));
- }
-
- public boolean isConnected() {
- return mConnected;
- }
-}
diff --git a/lib/src/main/java/ua/naiksoftware/stomp/LifecycleEvent.java b/lib/src/main/java/ua/naiksoftware/stomp/dto/LifecycleEvent.java
similarity index 92%
rename from lib/src/main/java/ua/naiksoftware/stomp/LifecycleEvent.java
rename to lib/src/main/java/ua/naiksoftware/stomp/dto/LifecycleEvent.java
index d8aec43..019d331 100644
--- a/lib/src/main/java/ua/naiksoftware/stomp/LifecycleEvent.java
+++ b/lib/src/main/java/ua/naiksoftware/stomp/dto/LifecycleEvent.java
@@ -1,4 +1,4 @@
-package ua.naiksoftware.stomp;
+package ua.naiksoftware.stomp.dto;
import java.util.TreeMap;
@@ -9,7 +9,7 @@
public class LifecycleEvent {
public enum Type {
- OPENED, CLOSED, ERROR
+ OPENED, CLOSED, ERROR, FAILED_SERVER_HEARTBEAT
}
private final Type mType;
diff --git a/lib/src/main/java/ua/naiksoftware/stomp/client/StompCommand.java b/lib/src/main/java/ua/naiksoftware/stomp/dto/StompCommand.java
similarity index 92%
rename from lib/src/main/java/ua/naiksoftware/stomp/client/StompCommand.java
rename to lib/src/main/java/ua/naiksoftware/stomp/dto/StompCommand.java
index ca35194..029a197 100644
--- a/lib/src/main/java/ua/naiksoftware/stomp/client/StompCommand.java
+++ b/lib/src/main/java/ua/naiksoftware/stomp/dto/StompCommand.java
@@ -1,4 +1,4 @@
-package ua.naiksoftware.stomp.client;
+package ua.naiksoftware.stomp.dto;
/**
* Created by naik on 05.05.16.
diff --git a/lib/src/main/java/ua/naiksoftware/stomp/StompHeader.java b/lib/src/main/java/ua/naiksoftware/stomp/dto/StompHeader.java
similarity index 71%
rename from lib/src/main/java/ua/naiksoftware/stomp/StompHeader.java
rename to lib/src/main/java/ua/naiksoftware/stomp/dto/StompHeader.java
index 9b45440..3846909 100644
--- a/lib/src/main/java/ua/naiksoftware/stomp/StompHeader.java
+++ b/lib/src/main/java/ua/naiksoftware/stomp/dto/StompHeader.java
@@ -1,13 +1,14 @@
-package ua.naiksoftware.stomp;
+package ua.naiksoftware.stomp.dto;
/**
* Created by naik on 05.05.16.
*/
public class StompHeader {
- public static final String VERSION = "version";
+ public static final String VERSION = "accept-version";
public static final String HEART_BEAT = "heart-beat";
public static final String DESTINATION = "destination";
+ public static final String SUBSCRIPTION = "subscription";
public static final String CONTENT_TYPE = "content-type";
public static final String MESSAGE_ID = "message-id";
public static final String ID = "id";
@@ -28,4 +29,9 @@ public String getKey() {
public String getValue() {
return mValue;
}
+
+ @Override
+ public String toString() {
+ return "StompHeader{" + mKey + '=' + mValue + '}';
+ }
}
diff --git a/lib/src/main/java/ua/naiksoftware/stomp/client/StompMessage.java b/lib/src/main/java/ua/naiksoftware/stomp/dto/StompMessage.java
similarity index 77%
rename from lib/src/main/java/ua/naiksoftware/stomp/client/StompMessage.java
rename to lib/src/main/java/ua/naiksoftware/stomp/dto/StompMessage.java
index 37810e2..60ccb91 100644
--- a/lib/src/main/java/ua/naiksoftware/stomp/client/StompMessage.java
+++ b/lib/src/main/java/ua/naiksoftware/stomp/dto/StompMessage.java
@@ -1,4 +1,7 @@
-package ua.naiksoftware.stomp.client;
+package ua.naiksoftware.stomp.dto;
+
+import android.support.annotation.NonNull;
+import android.support.annotation.Nullable;
import java.io.StringReader;
import java.util.ArrayList;
@@ -7,8 +10,6 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import ua.naiksoftware.stomp.StompHeader;
-
/**
* Created by naik on 05.05.16.
*/
@@ -40,6 +41,7 @@ public String getStompCommand() {
return mStompCommand;
}
+ @Nullable
public String findHeader(String key) {
if (mStompHeaders == null) return null;
for (StompHeader header : mStompHeaders) {
@@ -48,7 +50,13 @@ public String findHeader(String key) {
return null;
}
+ @NonNull
public String compile() {
+ return compile(false);
+ }
+
+ @NonNull
+ public String compile(boolean legacyWhitespace) {
StringBuilder builder = new StringBuilder();
builder.append(mStompCommand).append('\n');
for (StompHeader header : mStompHeaders) {
@@ -56,13 +64,14 @@ public String compile() {
}
builder.append('\n');
if (mPayload != null) {
- builder.append(mPayload).append("\n\n");
+ builder.append(mPayload);
+ if (legacyWhitespace) builder.append("\n\n");
}
builder.append(TERMINATE_MESSAGE_SYMBOL);
return builder.toString();
}
- public static StompMessage from(String data) {
+ public static StompMessage from(@Nullable String data) {
if (data == null || data.trim().isEmpty()) {
return new StompMessage(StompCommand.UNKNOWN, null, data);
}
@@ -77,11 +86,20 @@ public static StompMessage from(String data) {
headers.add(new StompHeader(matcher.group(1), matcher.group(2)));
}
- reader.skip("\\s");
+ reader.skip("\n\n");
reader.useDelimiter(TERMINATE_MESSAGE_SYMBOL);
String payload = reader.hasNext() ? reader.next() : null;
return new StompMessage(command, headers, payload);
}
+
+ @Override
+ public String toString() {
+ return "StompMessage{" +
+ "command='" + mStompCommand + '\'' +
+ ", headers=" + mStompHeaders +
+ ", payload='" + mPayload + '\'' +
+ '}';
+ }
}
diff --git a/lib/src/main/java/ua/naiksoftware/stomp/pathmatcher/PathMatcher.java b/lib/src/main/java/ua/naiksoftware/stomp/pathmatcher/PathMatcher.java
new file mode 100644
index 0000000..e5343b6
--- /dev/null
+++ b/lib/src/main/java/ua/naiksoftware/stomp/pathmatcher/PathMatcher.java
@@ -0,0 +1,8 @@
+package ua.naiksoftware.stomp.pathmatcher;
+
+import ua.naiksoftware.stomp.dto.StompMessage;
+
+public interface PathMatcher {
+
+ boolean matches(String path, StompMessage msg);
+}
diff --git a/lib/src/main/java/ua/naiksoftware/stomp/pathmatcher/RabbitPathMatcher.java b/lib/src/main/java/ua/naiksoftware/stomp/pathmatcher/RabbitPathMatcher.java
new file mode 100644
index 0000000..1ee2b25
--- /dev/null
+++ b/lib/src/main/java/ua/naiksoftware/stomp/pathmatcher/RabbitPathMatcher.java
@@ -0,0 +1,51 @@
+package ua.naiksoftware.stomp.pathmatcher;
+
+import ua.naiksoftware.stomp.dto.StompHeader;
+import ua.naiksoftware.stomp.dto.StompMessage;
+
+import java.util.ArrayList;
+
+public class RabbitPathMatcher implements PathMatcher {
+
+ /**
+ * RMQ-style wildcards.
+ * See more info here.
+ */
+ @Override
+ public boolean matches(String path, StompMessage msg) {
+ String dest = msg.findHeader(StompHeader.DESTINATION);
+ if (dest == null) return false;
+
+ // for example string "lorem.ipsum.*.sit":
+
+ // split it up into ["lorem", "ipsum", "*", "sit"]
+ String[] split = path.split("\\.");
+ ArrayList transformed = new ArrayList<>();
+ // check for wildcards and replace with corresponding regex
+ for (String s : split) {
+ switch (s) {
+ case "*":
+ transformed.add("[^.]+");
+ break;
+ case "#":
+ // TODO: make this work with zero-word
+ // e.g. "lorem.#.dolor" should ideally match "lorem.dolor"
+ transformed.add(".*");
+ break;
+ default:
+ transformed.add(s.replaceAll("\\*", ".*"));
+ break;
+ }
+ }
+ // at this point, 'transformed' looks like ["lorem", "ipsum", "[^.]+", "sit"]
+ StringBuilder sb = new StringBuilder();
+ for (String s : transformed) {
+ if (sb.length() > 0) sb.append("\\.");
+ sb.append(s);
+ }
+ String join = sb.toString();
+ // join = "lorem\.ipsum\.[^.]+\.sit"
+
+ return dest.matches(join);
+ }
+}
diff --git a/lib/src/main/java/ua/naiksoftware/stomp/pathmatcher/SimplePathMatcher.java b/lib/src/main/java/ua/naiksoftware/stomp/pathmatcher/SimplePathMatcher.java
new file mode 100644
index 0000000..34611d5
--- /dev/null
+++ b/lib/src/main/java/ua/naiksoftware/stomp/pathmatcher/SimplePathMatcher.java
@@ -0,0 +1,14 @@
+package ua.naiksoftware.stomp.pathmatcher;
+
+import ua.naiksoftware.stomp.dto.StompHeader;
+import ua.naiksoftware.stomp.dto.StompMessage;
+
+public class SimplePathMatcher implements PathMatcher {
+
+ @Override
+ public boolean matches(String path, StompMessage msg) {
+ String dest = msg.findHeader(StompHeader.DESTINATION);
+ if (dest == null) return false;
+ else return path.equals(dest);
+ }
+}
diff --git a/lib/src/main/java/ua/naiksoftware/stomp/pathmatcher/SubscriptionPathMatcher.java b/lib/src/main/java/ua/naiksoftware/stomp/pathmatcher/SubscriptionPathMatcher.java
new file mode 100644
index 0000000..8886201
--- /dev/null
+++ b/lib/src/main/java/ua/naiksoftware/stomp/pathmatcher/SubscriptionPathMatcher.java
@@ -0,0 +1,24 @@
+package ua.naiksoftware.stomp.pathmatcher;
+
+import ua.naiksoftware.stomp.StompClient;
+import ua.naiksoftware.stomp.dto.StompHeader;
+import ua.naiksoftware.stomp.dto.StompHeader.*;
+import ua.naiksoftware.stomp.dto.StompMessage;
+
+public class SubscriptionPathMatcher implements PathMatcher {
+
+ private final StompClient stompClient;
+
+ public SubscriptionPathMatcher(StompClient stompClient) {
+ this.stompClient = stompClient;
+ }
+
+ @Override
+ public boolean matches(String path, StompMessage msg) {
+ // Compare subscription
+ String pathSubscription = stompClient.getTopicId(path);
+ if (pathSubscription == null) return false;
+ String subscription = msg.findHeader(StompHeader.SUBSCRIPTION);
+ return pathSubscription.equals(subscription);
+ }
+}
diff --git a/lib/src/main/java/ua/naiksoftware/stomp/provider/AbstractConnectionProvider.java b/lib/src/main/java/ua/naiksoftware/stomp/provider/AbstractConnectionProvider.java
new file mode 100644
index 0000000..c0f938e
--- /dev/null
+++ b/lib/src/main/java/ua/naiksoftware/stomp/provider/AbstractConnectionProvider.java
@@ -0,0 +1,125 @@
+package ua.naiksoftware.stomp.provider;
+
+import android.support.annotation.NonNull;
+import android.support.annotation.Nullable;
+import android.util.Log;
+
+import java.util.concurrent.TimeUnit;
+
+import io.reactivex.Completable;
+import io.reactivex.Observable;
+import io.reactivex.schedulers.Schedulers;
+import io.reactivex.subjects.PublishSubject;
+import ua.naiksoftware.stomp.dto.LifecycleEvent;
+import ua.naiksoftware.stomp.dto.StompHeader;
+import ua.naiksoftware.stomp.dto.StompCommand;
+import ua.naiksoftware.stomp.dto.StompMessage;
+
+/**
+ * Created by forresthopkinsa on 8/8/2017.
+ *
+ * Created because there was a lot of shared code between JWS and OkHttp connection providers.
+ */
+
+public abstract class AbstractConnectionProvider implements ConnectionProvider {
+
+ private static final String TAG = AbstractConnectionProvider.class.getSimpleName();
+
+ @NonNull
+ private final PublishSubject lifecycleStream;
+ @NonNull
+ private final PublishSubject messagesStream;
+
+ public AbstractConnectionProvider() {
+ lifecycleStream = PublishSubject.create();
+ messagesStream = PublishSubject.create();
+ }
+
+ @NonNull
+ @Override
+ public Observable messages() {
+ return messagesStream.startWith(initSocket().toObservable());
+ }
+
+ /**
+ * Simply close socket.
+ *
+ * For example:
+ *
+ * webSocket.close();
+ *
+ */
+ protected abstract void rawDisconnect();
+
+ @Override
+ public Completable disconnect() {
+ return Completable
+ .fromAction(this::rawDisconnect);
+ }
+
+ private Completable initSocket() {
+ return Completable
+ .fromAction(this::createWebSocketConnection);
+ }
+
+ /**
+ * Most important method: connects to websocket and notifies program of messages.
+ *
+ * See implementations in OkHttpConnectionProvider and WebSocketsConnectionProvider.
+ */
+ protected abstract void createWebSocketConnection();
+
+ @NonNull
+ @Override
+ public Completable send(String stompMessage) {
+ return Completable.fromCallable(() -> {
+ if (getSocket() == null) {
+ throw new IllegalStateException("Not connected");
+ } else {
+ Log.d(TAG, "Send STOMP message: " + stompMessage);
+ rawSend(stompMessage);
+ return null;
+ }
+ });
+ }
+
+ /**
+ * Just a simple message send.
+ *
+ * For example:
+ *
+ * webSocket.send(stompMessage);
+ *
+ *
+ * @param stompMessage message to send
+ */
+ protected abstract void rawSend(String stompMessage);
+
+ /**
+ * Get socket object.
+ * Used for null checking; this object is expected to be null when the connection is not yet established.
+ *
+ * For example:
+ *
+ * return webSocket;
+ *
+ */
+ @Nullable
+ protected abstract Object getSocket();
+
+ protected void emitLifecycleEvent(@NonNull LifecycleEvent lifecycleEvent) {
+ Log.d(TAG, "Emit lifecycle event: " + lifecycleEvent.getType().name());
+ lifecycleStream.onNext(lifecycleEvent);
+ }
+
+ protected void emitMessage(String stompMessage) {
+ Log.d(TAG, "Receive STOMP message: " + stompMessage);
+ messagesStream.onNext(stompMessage);
+ }
+
+ @NonNull
+ @Override
+ public Observable lifecycle() {
+ return lifecycleStream;
+ }
+}
diff --git a/lib/src/main/java/ua/naiksoftware/stomp/ConnectionProvider.java b/lib/src/main/java/ua/naiksoftware/stomp/provider/ConnectionProvider.java
similarity index 54%
rename from lib/src/main/java/ua/naiksoftware/stomp/ConnectionProvider.java
rename to lib/src/main/java/ua/naiksoftware/stomp/provider/ConnectionProvider.java
index bf39020..bfac8ee 100644
--- a/lib/src/main/java/ua/naiksoftware/stomp/ConnectionProvider.java
+++ b/lib/src/main/java/ua/naiksoftware/stomp/provider/ConnectionProvider.java
@@ -1,6 +1,8 @@
-package ua.naiksoftware.stomp;
+package ua.naiksoftware.stomp.provider;
-import rx.Observable;
+import io.reactivex.Completable;
+import io.reactivex.Observable;
+import ua.naiksoftware.stomp.dto.LifecycleEvent;
/**
* Created by naik on 05.05.16.
@@ -17,10 +19,16 @@ public interface ConnectionProvider {
* onError if not connected or error detected will be called, or onCompleted id sending started
* TODO: send messages with ACK
*/
- Observable send(String stompMessage);
+ Completable send(String stompMessage);
/**
* Subscribe this for receive #LifecycleEvent events
*/
- Observable getLifecycleReceiver();
+ Observable lifecycle();
+
+ /**
+ * Disconnects from server. This is basically a Callable.
+ * Automatically emits Lifecycle.CLOSE
+ */
+ Completable disconnect();
}
diff --git a/lib/src/main/java/ua/naiksoftware/stomp/provider/OkHttpConnectionProvider.java b/lib/src/main/java/ua/naiksoftware/stomp/provider/OkHttpConnectionProvider.java
new file mode 100644
index 0000000..3846b49
--- /dev/null
+++ b/lib/src/main/java/ua/naiksoftware/stomp/provider/OkHttpConnectionProvider.java
@@ -0,0 +1,124 @@
+package ua.naiksoftware.stomp.provider;
+
+import android.support.annotation.NonNull;
+import android.support.annotation.Nullable;
+import android.util.Log;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeMap;
+
+import okhttp3.Headers;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import okhttp3.WebSocket;
+import okhttp3.WebSocketListener;
+import okio.ByteString;
+import ua.naiksoftware.stomp.dto.LifecycleEvent;
+
+public class OkHttpConnectionProvider extends AbstractConnectionProvider {
+
+ public static final String TAG = "OkHttpConnProvider";
+
+ private final String mUri;
+ @NonNull
+ private final Map mConnectHttpHeaders;
+ private final OkHttpClient mOkHttpClient;
+
+ @Nullable
+ private WebSocket openSocket;
+
+ public OkHttpConnectionProvider(String uri, @Nullable Map connectHttpHeaders, OkHttpClient okHttpClient) {
+ super();
+ mUri = uri;
+ mConnectHttpHeaders = connectHttpHeaders != null ? connectHttpHeaders : new HashMap<>();
+ mOkHttpClient = okHttpClient;
+ }
+
+ @Override
+ public void rawDisconnect() {
+ if (openSocket != null) {
+ openSocket.close(1000, "");
+ }
+ }
+
+ @Override
+ protected void createWebSocketConnection() {
+ Request.Builder requestBuilder = new Request.Builder()
+ .url(mUri);
+
+ addConnectionHeadersToBuilder(requestBuilder, mConnectHttpHeaders);
+
+ openSocket = mOkHttpClient.newWebSocket(requestBuilder.build(),
+ new WebSocketListener() {
+ @Override
+ public void onOpen(WebSocket webSocket, @NonNull Response response) {
+ LifecycleEvent openEvent = new LifecycleEvent(LifecycleEvent.Type.OPENED);
+
+ TreeMap headersAsMap = headersAsMap(response);
+
+ openEvent.setHandshakeResponseHeaders(headersAsMap);
+ emitLifecycleEvent(openEvent);
+ }
+
+ @Override
+ public void onMessage(WebSocket webSocket, String text) {
+ emitMessage(text);
+ }
+
+ @Override
+ public void onMessage(WebSocket webSocket, @NonNull ByteString bytes) {
+ emitMessage(bytes.utf8());
+ }
+
+ @Override
+ public void onClosed(WebSocket webSocket, int code, String reason) {
+ openSocket = null;
+ emitLifecycleEvent(new LifecycleEvent(LifecycleEvent.Type.CLOSED));
+ }
+
+ @Override
+ public void onFailure(WebSocket webSocket, Throwable t, Response response) {
+ // in OkHttp, a Failure is equivalent to a JWS-Error *and* a JWS-Close
+ emitLifecycleEvent(new LifecycleEvent(LifecycleEvent.Type.ERROR, new Exception(t)));
+ openSocket = null;
+ emitLifecycleEvent(new LifecycleEvent(LifecycleEvent.Type.CLOSED));
+ }
+
+ @Override
+ public void onClosing(final WebSocket webSocket, final int code, final String reason) {
+ webSocket.close(code, reason);
+ }
+ }
+
+ );
+ }
+
+ @Override
+ protected void rawSend(String stompMessage) {
+ openSocket.send(stompMessage);
+ }
+
+ @Nullable
+ @Override
+ protected Object getSocket() {
+ return openSocket;
+ }
+
+ @NonNull
+ private TreeMap headersAsMap(@NonNull Response response) {
+ TreeMap headersAsMap = new TreeMap<>();
+ Headers headers = response.headers();
+ for (String key : headers.names()) {
+ headersAsMap.put(key, headers.get(key));
+ }
+ return headersAsMap;
+ }
+
+ private void addConnectionHeadersToBuilder(@NonNull Request.Builder requestBuilder, @NonNull Map mConnectHttpHeaders) {
+ for (Map.Entry headerEntry : mConnectHttpHeaders.entrySet()) {
+ requestBuilder.addHeader(headerEntry.getKey(), headerEntry.getValue());
+ }
+ }
+}
diff --git a/lib/src/main/java/ua/naiksoftware/stomp/WebSocketsConnectionProvider.java b/lib/src/main/java/ua/naiksoftware/stomp/provider/WebSocketsConnectionProvider.java
similarity index 52%
rename from lib/src/main/java/ua/naiksoftware/stomp/WebSocketsConnectionProvider.java
rename to lib/src/main/java/ua/naiksoftware/stomp/provider/WebSocketsConnectionProvider.java
index 459398d..3a1e8cb 100644
--- a/lib/src/main/java/ua/naiksoftware/stomp/WebSocketsConnectionProvider.java
+++ b/lib/src/main/java/ua/naiksoftware/stomp/provider/WebSocketsConnectionProvider.java
@@ -1,80 +1,71 @@
-package ua.naiksoftware.stomp;
+package ua.naiksoftware.stomp.provider;
+import android.support.annotation.NonNull;
+import android.support.annotation.Nullable;
import android.util.Log;
import org.java_websocket.WebSocket;
-import org.java_websocket.client.DefaultSSLWebSocketClientFactory;
import org.java_websocket.client.WebSocketClient;
-import org.java_websocket.drafts.Draft_17;
+import org.java_websocket.drafts.Draft_6455;
import org.java_websocket.exceptions.InvalidDataException;
import org.java_websocket.handshake.ClientHandshake;
import org.java_websocket.handshake.ServerHandshake;
+import ua.naiksoftware.stomp.dto.LifecycleEvent;
import java.net.URI;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
-import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import javax.net.ssl.SSLContext;
-
-import rx.Observable;
-import rx.Subscriber;
+import javax.net.ssl.SSLSocketFactory;
/**
* Created by naik on 05.05.16.
*/
-public class WebSocketsConnectionProvider implements ConnectionProvider {
+
+public class WebSocketsConnectionProvider extends AbstractConnectionProvider {
private static final String TAG = WebSocketsConnectionProvider.class.getSimpleName();
private final String mUri;
+ @NonNull
private final Map mConnectHttpHeaders;
+
private WebSocketClient mWebSocketClient;
- private List> mLifecycleSubscribers;
- private List> mMessagesSubscribers;
private boolean haveConnection;
private TreeMap mServerHandshakeHeaders;
/**
* Support UIR scheme ws://host:port/path
+ *
* @param connectHttpHeaders may be null
*/
- public WebSocketsConnectionProvider(String uri, Map connectHttpHeaders) {
+ public WebSocketsConnectionProvider(String uri, @Nullable Map connectHttpHeaders) {
mUri = uri;
mConnectHttpHeaders = connectHttpHeaders != null ? connectHttpHeaders : new HashMap<>();
- mLifecycleSubscribers = new ArrayList<>();
- mMessagesSubscribers = new ArrayList<>();
}
@Override
- public Observable messages() {
- Observable observable = Observable.create(subscriber -> {
- mMessagesSubscribers.add(subscriber);
-
- }).doOnUnsubscribe(() -> {
- Iterator> iterator = mMessagesSubscribers.iterator();
- while (iterator.hasNext()) {
- if (iterator.next().isUnsubscribed()) iterator.remove();
- }
-
- if (mMessagesSubscribers.size() < 1) mWebSocketClient.close();
- });
-
- createWebSocketConnection();
- return observable;
+ public void rawDisconnect() {
+ try {
+ mWebSocketClient.closeBlocking();
+ } catch (InterruptedException e) {
+ Log.e(TAG, "Thread interrupted while waiting for Websocket closing: ", e);
+ throw new RuntimeException(e);
+ }
}
- private void createWebSocketConnection() {
+ @Override
+ protected void createWebSocketConnection() {
if (haveConnection)
throw new IllegalStateException("Already have connection to web socket");
- mWebSocketClient = new WebSocketClient(URI.create(mUri), new Draft_17(), mConnectHttpHeaders, 0) {
+ mWebSocketClient = new WebSocketClient(URI.create(mUri), new Draft_6455(), mConnectHttpHeaders, 0) {
@Override
- public void onWebsocketHandshakeReceivedAsClient(WebSocket conn, ClientHandshake request, ServerHandshake response) throws InvalidDataException {
+ public void onWebsocketHandshakeReceivedAsClient(WebSocket conn, ClientHandshake request, @NonNull ServerHandshake response) throws InvalidDataException {
Log.d(TAG, "onWebsocketHandshakeReceivedAsClient with response: " + response.getHttpStatus() + " " + response.getHttpStatusMessage());
mServerHandshakeHeaders = new TreeMap<>();
Iterator keys = response.iterateHttpFields();
@@ -85,7 +76,7 @@ public void onWebsocketHandshakeReceivedAsClient(WebSocket conn, ClientHandshake
}
@Override
- public void onOpen(ServerHandshake handshakeData) {
+ public void onOpen(@NonNull ServerHandshake handshakeData) {
Log.d(TAG, "onOpen with handshakeData: " + handshakeData.getHttpStatus() + " " + handshakeData.getHttpStatusMessage());
LifecycleEvent openEvent = new LifecycleEvent(LifecycleEvent.Type.OPENED);
openEvent.setHandshakeResponseHeaders(mServerHandshakeHeaders);
@@ -103,6 +94,9 @@ public void onClose(int code, String reason, boolean remote) {
Log.d(TAG, "onClose: code=" + code + " reason=" + reason + " remote=" + remote);
haveConnection = false;
emitLifecycleEvent(new LifecycleEvent(LifecycleEvent.Type.CLOSED));
+
+ Log.d(TAG, "Disconnect after close.");
+ disconnect();
}
@Override
@@ -112,11 +106,12 @@ public void onError(Exception ex) {
}
};
- if(mUri.startsWith("wss")) {
+ if (mUri.startsWith("wss")) {
try {
SSLContext sc = SSLContext.getInstance("TLS");
sc.init(null, null, null);
- mWebSocketClient.setWebSocketFactory(new DefaultSSLWebSocketClientFactory(sc));
+ SSLSocketFactory factory = sc.getSocketFactory();
+ mWebSocketClient.setSocket(factory.createSocket());
} catch (Exception e) {
e.printStackTrace();
}
@@ -127,41 +122,12 @@ public void onError(Exception ex) {
}
@Override
- public Observable send(String stompMessage) {
- return Observable.create(subscriber -> {
- if (mWebSocketClient == null) {
- subscriber.onError(new IllegalStateException("Not connected yet"));
- } else {
- mWebSocketClient.send(stompMessage);
- subscriber.onCompleted();
- }
- });
- }
-
- private void emitLifecycleEvent(LifecycleEvent lifecycleEvent) {
- Log.d(TAG, "Emit lifecycle event: " + lifecycleEvent.getType().name());
- for (Subscriber super LifecycleEvent> subscriber : mLifecycleSubscribers) {
- subscriber.onNext(lifecycleEvent);
- }
- }
-
- private void emitMessage(String stompMessage) {
- Log.d(TAG, "Emit STOMP message: " + stompMessage);
- for (Subscriber super String> subscriber : mMessagesSubscribers) {
- subscriber.onNext(stompMessage);
- }
+ protected void rawSend(String stompMessage) {
+ mWebSocketClient.send(stompMessage);
}
@Override
- public Observable getLifecycleReceiver() {
- return Observable.create(subscriber -> {
- mLifecycleSubscribers.add(subscriber);
-
- }).doOnUnsubscribe(() -> {
- Iterator> iterator = mLifecycleSubscribers.iterator();
- while (iterator.hasNext()) {
- if (iterator.next().isUnsubscribed()) iterator.remove();
- }
- });
+ protected Object getSocket() {
+ return mWebSocketClient;
}
}
diff --git a/lib/src/test/groovy/ua/naiksoftware/stomp/Configuration.groovy b/lib/src/test/groovy/ua/naiksoftware/stomp/Configuration.groovy
new file mode 100644
index 0000000..da6b5ee
--- /dev/null
+++ b/lib/src/test/groovy/ua/naiksoftware/stomp/Configuration.groovy
@@ -0,0 +1,38 @@
+package ua.naiksoftware.stomp
+
+import com.andrewreitz.spock.android.AndroidSpecification
+import groovy.util.logging.Slf4j
+import org.testcontainers.containers.BindMode
+import org.testcontainers.containers.GenericContainer
+import org.testcontainers.containers.output.OutputFrame
+import org.testcontainers.containers.startupcheck.StartupCheckStrategy
+import org.testcontainers.containers.wait.strategy.Wait
+import spock.lang.Shared
+import spock.lang.Specification
+
+import java.util.function.Consumer
+
+class Configuration extends AndroidSpecification {
+
+ @Shared
+ static GenericContainer testServer = setupServer()
+
+ static GenericContainer setupServer() {
+
+ def projectRoot = new File('../')
+ new ProcessBuilder(['./gradlew', 'test-server:bootJar'])
+ .directory(projectRoot)
+ .start().waitForProcessOutput(System.out as Appendable, System.err as Appendable)
+
+ def testServerPath = new File(projectRoot.getAbsoluteFile().getParentFile().getParent(),
+ 'test-server/build/artifacts/test-server-1.0.jar').path
+ testServer = new GenericContainer('openjdk:8-jre-alpine')
+ .withFileSystemBind(testServerPath, '/app.jar', BindMode.READ_ONLY)
+ .withCommand('java -jar /app.jar')
+ .withLogConsumer({ frame -> println frame.utf8String })
+// .waitingFor(Wait.forHttp('/health'))
+ .withExposedPorts(8080)
+ testServer.start()
+ return testServer
+ }
+}
\ No newline at end of file
diff --git a/lib/src/test/groovy/ua/naiksoftware/stomp/ConnectionTests.groovy b/lib/src/test/groovy/ua/naiksoftware/stomp/ConnectionTests.groovy
new file mode 100644
index 0000000..6dfbe8f
--- /dev/null
+++ b/lib/src/test/groovy/ua/naiksoftware/stomp/ConnectionTests.groovy
@@ -0,0 +1,32 @@
+package ua.naiksoftware.stomp
+
+
+import io.reactivex.functions.Predicate
+import io.reactivex.subscribers.TestSubscriber
+import ua.naiksoftware.stomp.dto.LifecycleEvent
+
+class ConnectionTests extends Configuration {
+
+ def "connection must be opened"() {
+ given:
+ def client = Stomp.over(Stomp.ConnectionProvider.OKHTTP,
+ 'ws://' + Configuration.testServer.getContainerIpAddress()
+ + ':' + Configuration.testServer.getFirstMappedPort() + '/example-endpoint/websocket')
+ client.connect()
+ def testSubscriber = new TestSubscriber()
+
+ when:
+ client.lifecycle().subscribe(testSubscriber)
+
+ then:
+ testSubscriber.awaitCount(1).assertValue((Predicate) { event ->
+ if (event.exception) {
+ event.exception.printStackTrace()
+ }
+ return event.type == LifecycleEvent.Type.OPENED
+ })
+
+ cleanup:
+ client.disconnect()
+ }
+}
diff --git a/lib/src/test/groovy/ua/naiksoftware/stomp/PathMatcherTests.groovy b/lib/src/test/groovy/ua/naiksoftware/stomp/PathMatcherTests.groovy
new file mode 100644
index 0000000..f37fa40
--- /dev/null
+++ b/lib/src/test/groovy/ua/naiksoftware/stomp/PathMatcherTests.groovy
@@ -0,0 +1,28 @@
+package ua.naiksoftware.stomp
+
+import ua.naiksoftware.stomp.dto.StompCommand
+import ua.naiksoftware.stomp.dto.StompHeader
+import ua.naiksoftware.stomp.dto.StompMessage
+import ua.naiksoftware.stomp.pathmatcher.RabbitPathMatcher
+
+class PathMatcherTests extends Configuration {
+
+ def "rmq-style matcher must return expected value"() {
+ given:
+ def matcher = new RabbitPathMatcher()
+
+ expect:
+ matcher.matches(path, message(dest)) == value
+
+ where:
+ path | dest | value
+ 'lorem/*/ipsum' | 'lorem/any/ipsum' | true
+ 'lorem/*/ipsum' | 'lorem/ipsum' | false
+ 'lorem/*/prefix*' | 'lorem/ipsum/prefix123'| true
+ 'lorem/*/pref*3' | 'lorem/ipsum/prefix123'| true
+ }
+
+ def message(String dest) {
+ return new StompMessage(StompCommand.MESSAGE, [new StompHeader(StompHeader.DESTINATION, dest)], null)
+ }
+}
diff --git a/lib/src/test/java/ua/naiksoftware/stomp/ExampleUnitTest.java b/lib/src/test/java/ua/naiksoftware/stomp/ExampleUnitTest.java
deleted file mode 100644
index 082e58a..0000000
--- a/lib/src/test/java/ua/naiksoftware/stomp/ExampleUnitTest.java
+++ /dev/null
@@ -1,15 +0,0 @@
-package ua.naiksoftware.stomp;
-
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-/**
- * To work on unit tests, switch the Test Artifact in the Build Variants view.
- */
-public class ExampleUnitTest {
- @Test
- public void addition_isCorrect() throws Exception {
- assertEquals(4, 2 + 2);
- }
-}
\ No newline at end of file
diff --git a/settings.gradle b/settings.gradle
index e5b9f10..7b57977 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -1 +1 @@
-include ':example-client', ':lib'
+include ':example-client', ':lib', ':test-server'
diff --git a/test-server/README.md b/test-server/README.md
new file mode 100644
index 0000000..e4a5bc7
--- /dev/null
+++ b/test-server/README.md
@@ -0,0 +1,5 @@
+# Example backend with STOMP
+
+Backend for project https://github.com/NaikSoftware/StompProtocolAndroid/tree/master/example-client
+
+You need configure MQ server before (for example RabbitMQ or ActiveMQ)
\ No newline at end of file
diff --git a/test-server/build.gradle b/test-server/build.gradle
new file mode 100644
index 0000000..ccfae21
--- /dev/null
+++ b/test-server/build.gradle
@@ -0,0 +1,46 @@
+group 'ua.naiksoftware.test_server'
+version '1.0'
+
+buildscript {
+
+ ext {
+ springBootVersion = '2.1.1.RELEASE'
+ springCloudVersion = 'Finchley.SR1'
+ }
+
+ repositories {
+ mavenCentral()
+ }
+ dependencies {
+ classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
+ }
+}
+
+apply plugin: 'groovy'
+apply plugin: 'org.springframework.boot'
+apply plugin: 'io.spring.dependency-management'
+
+sourceCompatibility = JavaVersion.VERSION_1_8
+targetCompatibility = JavaVersion.VERSION_1_8
+
+repositories {
+ mavenCentral()
+}
+
+dependencies {
+ compile 'org.springframework.boot:spring-boot-starter-websocket'
+ compile 'org.springframework.boot:spring-boot-starter-web'
+ compile 'ch.qos.logback:logback-classic:1.2.3'
+ compile 'org.codehaus.groovy:groovy-all:2.4.15'
+}
+
+dependencyManagement {
+ imports {
+ mavenBom "org.springframework.cloud:spring-cloud-dependencies:${springCloudVersion}"
+ }
+}
+
+bootJar {
+ destinationDir = project.file("build/artifacts")
+}
+
diff --git a/test-server/src/main/groovy/ua/naiksoftware/test_server/Main.groovy b/test-server/src/main/groovy/ua/naiksoftware/test_server/Main.groovy
new file mode 100644
index 0000000..1809f9d
--- /dev/null
+++ b/test-server/src/main/groovy/ua/naiksoftware/test_server/Main.groovy
@@ -0,0 +1,13 @@
+package ua.naiksoftware.test_server
+
+import org.springframework.boot.SpringApplication
+import org.springframework.boot.autoconfigure.SpringBootApplication
+
+@SpringBootApplication
+class Main {
+
+ static void main(String[] args) {
+ SpringApplication.run(Main.class, args);
+ }
+
+}
\ No newline at end of file
diff --git a/test-server/src/main/groovy/ua/naiksoftware/test_server/config/WebSocketConfig.groovy b/test-server/src/main/groovy/ua/naiksoftware/test_server/config/WebSocketConfig.groovy
new file mode 100644
index 0000000..7215b0a
--- /dev/null
+++ b/test-server/src/main/groovy/ua/naiksoftware/test_server/config/WebSocketConfig.groovy
@@ -0,0 +1,43 @@
+package ua.naiksoftware.test_server.config
+
+
+import org.springframework.context.annotation.Configuration
+import org.springframework.messaging.simp.config.MessageBrokerRegistry
+import org.springframework.scheduling.concurrent.DefaultManagedTaskScheduler
+import org.springframework.web.socket.config.annotation.EnableWebSocket
+import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker
+import org.springframework.web.socket.config.annotation.StompEndpointRegistry
+import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer
+import org.springframework.web.socket.config.annotation.WebSocketTransportRegistration
+
+/**
+ * Created by naik on 04.05.16.
+ */
+@Configuration
+@EnableWebSocket
+@EnableWebSocketMessageBroker
+class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
+
+ @Override
+ void configureMessageBroker(MessageBrokerRegistry config) {
+
+ long[] heartbeat = [ 30000, 30000 ];
+ config.enableSimpleBroker("/topic", "/queue", "/exchange")
+ .setTaskScheduler(new DefaultManagedTaskScheduler()) // enable heartbeats
+ .setHeartbeatValue(heartbeat); // enable heartbeats
+
+// config.enableStompBrokerRelay("/topic", "/queue", "/exchange"); // Uncomment for external message broker (ActiveMQ, RabbitMQ)
+ config.setApplicationDestinationPrefixes("/topic", "/queue"); // prefix in client queries
+ config.setUserDestinationPrefix("/user");
+ }
+
+ @Override
+ void registerStompEndpoints(StompEndpointRegistry registry) {
+ registry.addEndpoint("/example-endpoint").setAllowedOrigins("*").withSockJS()
+ }
+
+ @Override
+ void configureWebSocketTransport(WebSocketTransportRegistration registration) {
+ registration.setMessageSizeLimit(8 * 1024);
+ }
+}
diff --git a/test-server/src/main/groovy/ua/naiksoftware/test_server/controller/SocketController.groovy b/test-server/src/main/groovy/ua/naiksoftware/test_server/controller/SocketController.groovy
new file mode 100644
index 0000000..112ca2a
--- /dev/null
+++ b/test-server/src/main/groovy/ua/naiksoftware/test_server/controller/SocketController.groovy
@@ -0,0 +1,35 @@
+package ua.naiksoftware.test_server.controller
+
+import groovy.util.logging.Slf4j
+import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.messaging.handler.annotation.MessageMapping
+import org.springframework.messaging.handler.annotation.SendTo
+import org.springframework.web.bind.annotation.RequestMapping
+import org.springframework.web.bind.annotation.RequestMethod
+import org.springframework.web.bind.annotation.RequestParam
+import org.springframework.web.bind.annotation.RestController
+import ua.naiksoftware.test_server.model.EchoModel
+import ua.naiksoftware.test_server.service.SocketService
+
+/**
+ * Created by Naik on 23.02.17.
+ */
+@Slf4j
+@RestController
+class SocketController {
+
+ @Autowired
+ SocketService socketService
+
+ @MessageMapping('/hello-msg-mapping')
+ @SendTo('/topic/greetings')
+ EchoModel echoMessageMapping(String message) {
+ log.debug("React to hello-msg-mapping")
+ return new EchoModel(message.trim())
+ }
+
+ @RequestMapping(value = '/hello-convert-and-send', method = RequestMethod.POST)
+ void echoConvertAndSend(@RequestParam('msg') String message) {
+ socketService.echoMessage(message)
+ }
+}
diff --git a/test-server/src/main/groovy/ua/naiksoftware/test_server/model/EchoModel.groovy b/test-server/src/main/groovy/ua/naiksoftware/test_server/model/EchoModel.groovy
new file mode 100644
index 0000000..3b16687
--- /dev/null
+++ b/test-server/src/main/groovy/ua/naiksoftware/test_server/model/EchoModel.groovy
@@ -0,0 +1,12 @@
+package ua.naiksoftware.test_server.model
+
+/**
+ * Created by Naik on 23.02.17.
+ */
+class EchoModel {
+
+ EchoModel(String echo) {
+ this.echo = echo
+ }
+ String echo;
+}
diff --git a/test-server/src/main/groovy/ua/naiksoftware/test_server/service/SocketService.groovy b/test-server/src/main/groovy/ua/naiksoftware/test_server/service/SocketService.groovy
new file mode 100644
index 0000000..d4ff983
--- /dev/null
+++ b/test-server/src/main/groovy/ua/naiksoftware/test_server/service/SocketService.groovy
@@ -0,0 +1,24 @@
+package ua.naiksoftware.test_server.service
+
+import groovy.util.logging.Slf4j
+import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.messaging.simp.SimpMessagingTemplate
+import org.springframework.stereotype.Service
+import ua.naiksoftware.test_server.model.EchoModel
+
+/**
+ * Created by Naik on 23.02.17.
+ */
+@Slf4j
+@Service
+class SocketService {
+
+ @Autowired
+ private SimpMessagingTemplate simpTemplate;
+
+ def echoMessage(String message) {
+ log.debug("Start convertAndSend ${new Date()}")
+ simpTemplate.convertAndSend('/topic/greetings', new EchoModel(message))
+ log.debug("End convertAndSend ${new Date()}")
+ }
+}
diff --git a/test-server/src/main/resources/application.properties b/test-server/src/main/resources/application.properties
new file mode 100644
index 0000000..d905eee
--- /dev/null
+++ b/test-server/src/main/resources/application.properties
@@ -0,0 +1,8 @@
+server.port=8080
+
+spring.jackson.deserialization.accept_empty_string_as_null_object=true
+spring.jackson.property-naming-strategy=CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES
+spring.jackson.serialization.write_char_arrays_as_json_arrays=true
+spring.jackson.serialization-inclusion=NON_NULL
+
+logging.level.org.springframework.web=DEBUG
\ No newline at end of file