@@ -17,13 +17,15 @@ package mqtt
17
17
18
18
import (
19
19
"errors"
20
- . "github.com/alsm/hrotti /packets"
20
+ "git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git /packets"
21
21
"net"
22
22
"sync"
23
23
"time"
24
24
)
25
25
26
- type Client interface {
26
+ // ClientInt is the interface definition for a Client as used by this
27
+ // library, the interface is primarily to allow mocking tests.
28
+ type ClientInt interface {
27
29
IsConnected () bool
28
30
Connect () Token
29
31
Disconnect (uint )
@@ -35,34 +37,31 @@ type Client interface {
35
37
Unsubscribe (... string ) Token
36
38
}
37
39
38
- // MqttClient is a lightweight MQTT v3.1 Client for communicating
40
+ // Client is an MQTT v3.1.1 client for communicating
39
41
// with an MQTT server using non-blocking methods that allow work
40
42
// to be done in the background.
41
-
42
43
// An application may connect to an MQTT server using:
43
44
// A plain TCP socket
44
45
// A secure SSL/TLS socket
45
46
// A websocket
46
-
47
47
// To enable ensured message delivery at Quality of Service (QoS) levels
48
48
// described in the MQTT spec, a message persistence mechanism must be
49
49
// used. This is done by providing a type which implements the Store
50
50
// interface. For convenience, FileStore and MemoryStore are provided
51
51
// implementations that should be sufficient for most use cases. More
52
52
// information can be found in their respective documentation.
53
-
54
53
// Numerous connection options may be specified by configuring a
55
54
// and then supplying a ClientOptions type.
56
- type MqttClient struct {
55
+ type Client struct {
57
56
sync.RWMutex
58
57
messageIds
59
58
conn net.Conn
60
- ibound chan ControlPacket
59
+ ibound chan packets. ControlPacket
61
60
obound chan * PacketAndToken
62
61
oboundP chan * PacketAndToken
63
62
msgRouter * router
64
63
stopRouter chan bool
65
- incomingPubChan chan * PublishPacket
64
+ incomingPubChan chan * packets. PublishPacket
66
65
errors chan error
67
66
stop chan struct {}
68
67
persist Store
@@ -77,8 +76,8 @@ type MqttClient struct {
77
76
// in the provided ClientOptions. The client must have the Start method called
78
77
// on it before it may be used. This is to make sure resources (such as a net
79
78
// connection) are created before the application is actually ready.
80
- func NewClient (o * ClientOptions ) * MqttClient {
81
- c := & MqttClient {}
79
+ func NewClient (o * ClientOptions ) * Client {
80
+ c := & Client {}
82
81
c .options = * o
83
82
84
83
if c .options .Store == nil {
@@ -99,31 +98,39 @@ func NewClient(o *ClientOptions) *MqttClient {
99
98
return c
100
99
}
101
100
102
- func (c * MqttClient ) IsConnected () bool {
101
+ // IsConnected returns a bool signifying whether
102
+ // the client is connected or not.
103
+ func (c * Client ) IsConnected () bool {
103
104
c .RLock ()
104
105
defer c .RUnlock ()
105
106
return c .connected
106
107
}
107
108
109
+ func (c * Client ) setConnected (status bool ) {
110
+ c .Lock ()
111
+ defer c .Unlock ()
112
+ c .connected = status
113
+ }
114
+
108
115
// Connect will create a connection to the message broker
109
116
// If clean session is false, then a slice will
110
117
// be returned containing Receipts for all messages
111
118
// that were in-flight at the last disconnect.
112
119
// If clean session is true, then any existing client
113
120
// state will be removed.
114
- func (c * MqttClient ) Connect () Token {
121
+ func (c * Client ) Connect () Token {
115
122
var err error
116
- t := newToken (CONNECT ).(* ConnectToken )
123
+ t := newToken (packets . Connect ).(* ConnectToken )
117
124
DEBUG .Println (CLI , "Connect()" )
118
125
119
126
go func () {
120
127
var rc byte
121
- cm := newConnectMsgFromOptions (c .options )
128
+ cm := newConnectMsgFromOptions (& c .options )
122
129
123
130
for _ , broker := range c .options .Servers {
124
131
CONN:
125
132
DEBUG .Println (CLI , "about to write new connect msg" )
126
- c .conn , err = openConnection (broker , & c .options .TlsConfig )
133
+ c .conn , err = openConnection (broker , & c .options .TLSConfig )
127
134
if err == nil {
128
135
DEBUG .Println (CLI , "socket connected to broker" )
129
136
switch c .options .ProtocolVersion {
@@ -140,12 +147,12 @@ func (c *MqttClient) Connect() Token {
140
147
cm .Write (c .conn )
141
148
142
149
rc = c .connect ()
143
- if rc != CONN_ACCEPTED {
150
+ if rc != packets . Accepted {
144
151
c .conn .Close ()
145
152
c .conn = nil
146
153
//if the protocol version was explicitly set don't do any fallback
147
154
if c .options .protocolVersionExplicit {
148
- ERROR .Println (CLI , "Connecting to" , broker , "CONNACK was not CONN_ACCEPTED, but rather" , ConnackReturnCodes [rc ])
155
+ ERROR .Println (CLI , "Connecting to" , broker , "CONNACK was not CONN_ACCEPTED, but rather" , packets . ConnackReturnCodes [rc ])
149
156
continue
150
157
}
151
158
if c .options .ProtocolVersion == 4 {
@@ -158,14 +165,14 @@ func (c *MqttClient) Connect() Token {
158
165
} else {
159
166
ERROR .Println (CLI , err .Error ())
160
167
WARN .Println (CLI , "failed to connect to broker, trying next" )
161
- rc = CONN_NETWORK_ERROR
168
+ rc = packets . NetworkError
162
169
}
163
170
}
164
171
165
172
if c .conn == nil {
166
173
ERROR .Println (CLI , "Failed to connect to a broker" )
167
174
t .returnCode = rc
168
- if rc != CONN_NETWORK_ERROR {
175
+ if rc != packets . NetworkError {
169
176
t .err = connErrors [rc ]
170
177
} else {
171
178
t .err = errors .New (connErrors [rc ].Error () + " : " + err .Error ())
@@ -179,11 +186,11 @@ func (c *MqttClient) Connect() Token {
179
186
180
187
c .obound = make (chan * PacketAndToken , 100 )
181
188
c .oboundP = make (chan * PacketAndToken , 100 )
182
- c .ibound = make (chan ControlPacket )
189
+ c .ibound = make (chan packets. ControlPacket )
183
190
c .errors = make (chan error )
184
191
c .stop = make (chan struct {})
185
192
186
- c .incomingPubChan = make (chan * PublishPacket , 100 )
193
+ c .incomingPubChan = make (chan * packets. PublishPacket , 100 )
187
194
c .msgRouter .matchAndDispatch (c .incomingPubChan , c .options .Order , c )
188
195
189
196
c .workers .Add (1 )
@@ -213,26 +220,26 @@ func (c *MqttClient) Connect() Token {
213
220
c .workers .Add (1 )
214
221
go incoming (c )
215
222
216
- DEBUG .Println (CLI , "exit startMqttClient " )
223
+ DEBUG .Println (CLI , "exit startClient " )
217
224
t .flowComplete ()
218
225
}()
219
226
return t
220
227
}
221
228
222
229
// internal function used to reconnect the client when it loses its connection
223
- func (c * MqttClient ) reconnect () {
230
+ func (c * Client ) reconnect () {
224
231
DEBUG .Println (CLI , "enter reconnect" )
225
232
var rc byte = 1
226
233
var sleep uint = 1
227
234
var err error
228
235
229
236
for rc != 0 {
230
- cm := newConnectMsgFromOptions (c .options )
237
+ cm := newConnectMsgFromOptions (& c .options )
231
238
232
239
for _ , broker := range c .options .Servers {
233
240
CONN:
234
241
DEBUG .Println (CLI , "about to write new connect msg" )
235
- c .conn , err = openConnection (broker , & c .options .TlsConfig )
242
+ c .conn , err = openConnection (broker , & c .options .TLSConfig )
236
243
if err == nil {
237
244
DEBUG .Println (CLI , "socket connected to broker" )
238
245
switch c .options .ProtocolVersion {
@@ -249,12 +256,12 @@ func (c *MqttClient) reconnect() {
249
256
cm .Write (c .conn )
250
257
251
258
rc = c .connect ()
252
- if rc != CONN_ACCEPTED {
259
+ if rc != packets . Accepted {
253
260
c .conn .Close ()
254
261
c .conn = nil
255
262
//if the protocol version was explicitly set don't do any fallback
256
263
if c .options .protocolVersionExplicit {
257
- ERROR .Println (CLI , "Connecting to" , broker , "CONNACK was not CONN_ACCEPTED , but rather" , ConnackReturnCodes [rc ])
264
+ ERROR .Println (CLI , "Connecting to" , broker , "CONNACK was not Accepted , but rather" , packets . ConnackReturnCodes [rc ])
258
265
continue
259
266
}
260
267
if c .options .ProtocolVersion == 4 {
@@ -267,7 +274,7 @@ func (c *MqttClient) reconnect() {
267
274
} else {
268
275
ERROR .Println (CLI , err .Error ())
269
276
WARN .Println (CLI , "failed to connect to broker, trying next" )
270
- rc = CONN_NETWORK_ERROR
277
+ rc = packets . NetworkError
271
278
}
272
279
}
273
280
if rc != 0 {
@@ -286,7 +293,7 @@ func (c *MqttClient) reconnect() {
286
293
go outgoing (c )
287
294
go alllogic (c )
288
295
289
- c .connected = true
296
+ c .setConnected ( true )
290
297
DEBUG .Println (CLI , "client is reconnected" )
291
298
if c .options .OnConnect != nil {
292
299
go c .options .OnConnect (c )
@@ -304,18 +311,18 @@ func (c *MqttClient) reconnect() {
304
311
// when the connection is first started.
305
312
// This prevents receiving incoming data while resume
306
313
// is in progress if clean session is false.
307
- func (c * MqttClient ) connect () byte {
314
+ func (c * Client ) connect () byte {
308
315
DEBUG .Println (NET , "connect started" )
309
316
310
- ca , err := ReadPacket (c .conn )
317
+ ca , err := packets . ReadPacket (c .conn )
311
318
if err != nil {
312
319
ERROR .Println (NET , "connect got error" , err )
313
320
//c.errors <- err
314
- return CONN_NETWORK_ERROR
321
+ return packets . NetworkError
315
322
}
316
- msg := ca .(* ConnackPacket )
323
+ msg := ca .(* packets. ConnackPacket )
317
324
318
- if msg == nil || msg .FixedHeader .MessageType != CONNACK {
325
+ if msg == nil || msg .FixedHeader .MessageType != packets . Connack {
319
326
ERROR .Println (NET , "received msg that was nil or not CONNACK" )
320
327
} else {
321
328
DEBUG .Println (NET , "received connack" )
@@ -326,46 +333,39 @@ func (c *MqttClient) connect() byte {
326
333
// Disconnect will end the connection with the server, but not before waiting
327
334
// the specified number of milliseconds to wait for existing work to be
328
335
// completed.
329
- func (c * MqttClient ) Disconnect (quiesce uint ) {
336
+ func (c * Client ) Disconnect (quiesce uint ) {
330
337
if ! c .IsConnected () {
331
338
WARN .Println (CLI , "already disconnected" )
332
339
return
333
340
}
334
341
DEBUG .Println (CLI , "disconnecting" )
335
- c .connected = false
342
+ c .setConnected (false )
343
+
344
+ dm := packets .NewControlPacket (packets .Disconnect ).(* packets.DisconnectPacket )
345
+ dt := newToken (packets .Disconnect )
346
+ c .oboundP <- & PacketAndToken {p : dm , t : dt }
336
347
337
348
// wait for work to finish, or quiesce time consumed
338
- end := time .After (time .Duration (quiesce ) * time .Millisecond )
339
-
340
- // for now we just wait for the time specified and hope the work is done
341
- select {
342
- case <- end :
343
- DEBUG .Println (CLI , "quiesce expired, forcing disconnect" )
344
- // case <- other:
345
- // DEBUG.Println(CLI, "finished processing work, graceful disconnect")
346
- }
349
+ dt .WaitTimeout (time .Duration (quiesce ) * time .Millisecond )
347
350
c .disconnect ()
348
351
}
349
352
350
353
// ForceDisconnect will end the connection with the mqtt broker immediately.
351
- func (c * MqttClient ) ForceDisconnect () {
354
+ func (c * Client ) ForceDisconnect () {
352
355
if ! c .IsConnected () {
353
356
WARN .Println (CLI , "already disconnected" )
354
357
return
355
358
}
359
+ c .setConnected (false )
360
+ c .conn .Close ()
356
361
DEBUG .Println (CLI , "forcefully disconnecting" )
357
362
c .disconnect ()
358
363
}
359
364
360
- func (c * MqttClient ) disconnect () {
361
- c .connected = false
362
- dm := NewControlPacket (DISCONNECT ).(* DisconnectPacket )
363
-
364
- // Send disconnect message and stop outgoing
365
- c .oboundP <- & PacketAndToken {p : dm , t : nil }
366
- // Stop all go routines
365
+ func (c * Client ) disconnect () {
367
366
close (c .stop )
368
-
367
+ //Wait for all workers to finish before closing connection
368
+ c .workers .Wait ()
369
369
DEBUG .Println (CLI , "disconnected" )
370
370
c .persist .Close ()
371
371
}
@@ -374,9 +374,9 @@ func (c *MqttClient) disconnect() {
374
374
// and content to the specified topic.
375
375
// Returns a read only channel used to track
376
376
// the delivery of the message.
377
- func (c * MqttClient ) Publish (topic string , qos byte , retained bool , payload interface {}) Token {
378
- token := newToken (PUBLISH ).(* PublishToken )
379
- pub := NewControlPacket (PUBLISH ).(* PublishPacket )
377
+ func (c * Client ) Publish (topic string , qos byte , retained bool , payload interface {}) Token {
378
+ token := newToken (packets . Publish ).(* PublishToken )
379
+ pub := packets . NewControlPacket (packets . Publish ).(* packets. PublishPacket )
380
380
pub .Qos = qos
381
381
pub .TopicName = topic
382
382
pub .Retain = retained
@@ -396,16 +396,16 @@ func (c *MqttClient) Publish(topic string, qos byte, retained bool, payload inte
396
396
return token
397
397
}
398
398
399
- // Start a new subscription. Provide a MessageHandler to be executed when
399
+ // Subscribe starts a new subscription. Provide a MessageHandler to be executed when
400
400
// a message is published on the topic provided.
401
- func (c * MqttClient ) Subscribe (topic string , qos byte , callback MessageHandler ) Token {
402
- token := newToken (SUBSCRIBE ).(* SubscribeToken )
401
+ func (c * Client ) Subscribe (topic string , qos byte , callback MessageHandler ) Token {
402
+ token := newToken (packets . Subscribe ).(* SubscribeToken )
403
403
DEBUG .Println (CLI , "enter Subscribe" )
404
404
if ! c .IsConnected () {
405
405
token .err = ErrNotConnected
406
406
return token
407
407
}
408
- sub := NewControlPacket (SUBSCRIBE ).(* SubscribePacket )
408
+ sub := packets . NewControlPacket (packets . Subscribe ).(* packets. SubscribePacket )
409
409
if err := validateTopicAndQos (topic , qos ); err != nil {
410
410
token .err = err
411
411
return token
@@ -424,24 +424,24 @@ func (c *MqttClient) Subscribe(topic string, qos byte, callback MessageHandler)
424
424
return token
425
425
}
426
426
427
- // Start a new subscription for multiple topics. Provide a MessageHandler to
427
+ // SubscribeMultiple starts a new subscription for multiple topics. Provide a MessageHandler to
428
428
// be executed when a message is published on one of the topics provided.
429
- func (c * MqttClient ) SubscribeMultiple (filters map [string ]byte , callback MessageHandler ) Token {
429
+ func (c * Client ) SubscribeMultiple (filters map [string ]byte , callback MessageHandler ) Token {
430
430
var err error
431
- token := newToken (SUBSCRIBE ).(* SubscribeToken )
431
+ token := newToken (packets . Subscribe ).(* SubscribeToken )
432
432
DEBUG .Println (CLI , "enter SubscribeMultiple" )
433
433
if ! c .IsConnected () {
434
434
token .err = ErrNotConnected
435
435
return token
436
436
}
437
- sub := NewControlPacket (SUBSCRIBE ).(* SubscribePacket )
437
+ sub := packets . NewControlPacket (packets . Subscribe ).(* packets. SubscribePacket )
438
438
if sub .Topics , sub .Qoss , err = validateSubscribeMap (filters ); err != nil {
439
439
token .err = err
440
440
return token
441
441
}
442
442
443
443
if callback != nil {
444
- for topic , _ := range filters {
444
+ for topic := range filters {
445
445
c .msgRouter .addRoute (topic , callback )
446
446
}
447
447
}
@@ -455,14 +455,14 @@ func (c *MqttClient) SubscribeMultiple(filters map[string]byte, callback Message
455
455
// Unsubscribe will end the subscription from each of the topics provided.
456
456
// Messages published to those topics from other clients will no longer be
457
457
// received.
458
- func (c * MqttClient ) Unsubscribe (topics ... string ) Token {
459
- token := newToken (UNSUBSCRIBE ).(* UnsubscribeToken )
458
+ func (c * Client ) Unsubscribe (topics ... string ) Token {
459
+ token := newToken (packets . Unsubscribe ).(* UnsubscribeToken )
460
460
DEBUG .Println (CLI , "enter Unsubscribe" )
461
461
if ! c .IsConnected () {
462
462
token .err = ErrNotConnected
463
463
return token
464
464
}
465
- unsub := NewControlPacket (UNSUBSCRIBE ).(* UnsubscribePacket )
465
+ unsub := packets . NewControlPacket (packets . Unsubscribe ).(* packets. UnsubscribePacket )
466
466
unsub .Topics = make ([]string , len (topics ))
467
467
copy (unsub .Topics , topics )
468
468
0 commit comments