18
18
package mqtt
19
19
20
20
import (
21
+ "bytes"
21
22
"errors"
22
23
"fmt"
23
24
"net"
@@ -95,8 +96,8 @@ type Client interface {
95
96
96
97
// client implements the Client interface
97
98
type client struct {
98
- lastSent int64
99
- lastReceived int64
99
+ lastSent atomic. Value
100
+ lastReceived atomic. Value
100
101
pingOutstanding int32
101
102
status uint32
102
103
sync.RWMutex
@@ -140,9 +141,7 @@ func NewClient(o *ClientOptions) Client {
140
141
c .messageIds = messageIds {index : make (map [uint16 ]tokenCompletor )}
141
142
c .msgRouter , c .stopRouter = newRouter ()
142
143
c .msgRouter .setDefaultHandler (c .options .DefaultPublishHandler )
143
- if ! c .options .AutoReconnect {
144
- c .options .MessageChannelDepth = 0
145
- }
144
+
146
145
return c
147
146
}
148
147
@@ -157,6 +156,8 @@ func (c *client) AddRoute(topic string, callback MessageHandler) {
157
156
158
157
// IsConnected returns a bool signifying whether
159
158
// the client is connected or not.
159
+ // connected means that the connection is up now OR it will
160
+ // be established/reestablished automatically when possible
160
161
func (c * client ) IsConnected () bool {
161
162
c .RLock ()
162
163
defer c .RUnlock ()
@@ -166,6 +167,8 @@ func (c *client) IsConnected() bool {
166
167
return true
167
168
case c .options .AutoReconnect && status > connecting :
168
169
return true
170
+ case c .options .ConnectRetry && status == connecting :
171
+ return true
169
172
default :
170
173
return false
171
174
}
@@ -210,14 +213,27 @@ func (c *client) Connect() Token {
210
213
t := newToken (packets .Connect ).(* ConnectToken )
211
214
DEBUG .Println (CLI , "Connect()" )
212
215
213
- c .obound = make (chan * PacketAndToken , c .options .MessageChannelDepth )
214
- c .oboundP = make (chan * PacketAndToken , c .options .MessageChannelDepth )
216
+ if c .options .ConnectRetry && atomic .LoadUint32 (& c .status ) != disconnected {
217
+ // if in any state other than disconnected and ConnectRetry is
218
+ // enabled then the connection will come up automatically
219
+ // client can assume connection is up
220
+ WARN .Println (CLI , "Connect() called but not disconnected" )
221
+ t .returnCode = packets .Accepted
222
+ t .flowComplete ()
223
+ return t
224
+ }
225
+
226
+ c .obound = make (chan * PacketAndToken )
227
+ c .oboundP = make (chan * PacketAndToken )
215
228
c .ibound = make (chan packets.ControlPacket )
216
229
217
- go func () {
218
- c .persist .Open ()
230
+ c .persist .Open ()
231
+ if c .options .ConnectRetry {
232
+ c .reserveStoredPublishIDs () // Reserve IDs to allow publish before connect complete
233
+ }
234
+ c .setConnected (connecting )
219
235
220
- c . setConnected ( connecting )
236
+ go func () {
221
237
c .errors = make (chan error , 1 )
222
238
c .stop = make (chan struct {})
223
239
@@ -229,12 +245,16 @@ func (c *client) Connect() Token {
229
245
return
230
246
}
231
247
248
+ RETRYCONN:
232
249
for _ , broker := range c .options .Servers {
233
250
cm := newConnectMsgFromOptions (& c .options , broker )
234
251
c .options .ProtocolVersion = protocolVersion
235
252
CONN:
236
253
DEBUG .Println (CLI , "about to write new connect msg" )
237
- c .conn , err = openConnection (broker , c .options .TLSConfig , c .options .ConnectTimeout , c .options .HTTPHeaders )
254
+ c .Lock ()
255
+ c .conn , err = openConnection (broker , c .options .TLSConfig , c .options .ConnectTimeout ,
256
+ c .options .HTTPHeaders )
257
+ c .Unlock ()
238
258
if err == nil {
239
259
DEBUG .Println (CLI , "socket connected to broker" )
240
260
switch c .options .ProtocolVersion {
@@ -260,10 +280,12 @@ func (c *client) Connect() Token {
260
280
261
281
rc , t .sessionPresent = c .connect ()
262
282
if rc != packets .Accepted {
283
+ c .Lock ()
263
284
if c .conn != nil {
264
285
c .conn .Close ()
265
286
c .conn = nil
266
287
}
288
+ c .Unlock ()
267
289
//if the protocol version was explicitly set don't do any fallback
268
290
if c .options .protocolVersionExplicit {
269
291
ERROR .Println (CLI , "Connecting to" , broker , "CONNACK was not CONN_ACCEPTED, but rather" , packets .ConnackReturnCodes [rc ])
@@ -284,6 +306,14 @@ func (c *client) Connect() Token {
284
306
}
285
307
286
308
if c .conn == nil {
309
+ if c .options .ConnectRetry {
310
+ DEBUG .Println (CLI , "Connect failed, sleeping for" , int (c .options .ConnectRetryInterval .Seconds ()), "seconds and will then retry" )
311
+ time .Sleep (c .options .ConnectRetryInterval )
312
+
313
+ if atomic .LoadUint32 (& c .status ) == connecting {
314
+ goto RETRYCONN
315
+ }
316
+ }
287
317
ERROR .Println (CLI , "Failed to connect to a broker" )
288
318
c .setConnected (disconnected )
289
319
c .persist .Close ()
@@ -300,13 +330,13 @@ func (c *client) Connect() Token {
300
330
301
331
if c .options .KeepAlive != 0 {
302
332
atomic .StoreInt32 (& c .pingOutstanding , 0 )
303
- atomic . StoreInt64 ( & c .lastReceived , time . Now (). Unix ())
304
- atomic . StoreInt64 ( & c .lastSent , time . Now (). Unix ())
333
+ c .lastReceived . Store ( time . Now ())
334
+ c .lastSent . Store ( time . Now ())
305
335
c .workers .Add (1 )
306
336
go keepalive (c )
307
337
}
308
338
309
- c .incomingPubChan = make (chan * packets.PublishPacket , c . options . MessageChannelDepth )
339
+ c .incomingPubChan = make (chan * packets.PublishPacket )
310
340
c .msgRouter .matchAndDispatch (c .incomingPubChan , c .options .Order , c )
311
341
312
342
c .setConnected (connected )
@@ -322,7 +352,7 @@ func (c *client) Connect() Token {
322
352
go incoming (c )
323
353
324
354
// Take care of any messages in the store
325
- if c .options .CleanSession == false {
355
+ if ! c .options .CleanSession {
326
356
c .resume (c .options .ResumeSubs )
327
357
} else {
328
358
c .persist .Reset ()
@@ -375,8 +405,10 @@ func (c *client) reconnect() {
375
405
376
406
rc , _ = c .connect ()
377
407
if rc != packets .Accepted {
378
- c .conn .Close ()
379
- c .conn = nil
408
+ if c .conn != nil {
409
+ c .conn .Close ()
410
+ c .conn = nil
411
+ }
380
412
//if the protocol version was explicitly set don't do any fallback
381
413
if c .options .protocolVersionExplicit {
382
414
ERROR .Println (CLI , "Connecting to" , broker , "CONNACK was not Accepted, but rather" , packets .ConnackReturnCodes [rc ])
@@ -412,8 +444,8 @@ func (c *client) reconnect() {
412
444
413
445
if c .options .KeepAlive != 0 {
414
446
atomic .StoreInt32 (& c .pingOutstanding , 0 )
415
- atomic . StoreInt64 ( & c .lastReceived , time . Now (). Unix ())
416
- atomic . StoreInt64 ( & c .lastSent , time . Now (). Unix ())
447
+ c .lastReceived . Store ( time . Now ())
448
+ c .lastSent . Store ( time . Now ())
417
449
c .workers .Add (1 )
418
450
go keepalive (c )
419
451
}
@@ -580,11 +612,13 @@ func (c *client) Publish(topic string, qos byte, retained bool, payload interfac
580
612
pub .Qos = qos
581
613
pub .TopicName = topic
582
614
pub .Retain = retained
583
- switch payload .(type ) {
615
+ switch p := payload .(type ) {
584
616
case string :
585
- pub .Payload = []byte (payload .( string ) )
617
+ pub .Payload = []byte (p )
586
618
case []byte :
587
- pub .Payload = payload .([]byte )
619
+ pub .Payload = p
620
+ case bytes.Buffer :
621
+ pub .Payload = p .Bytes ()
588
622
default :
589
623
token .setError (fmt .Errorf ("Unknown payload type" ))
590
624
return token
@@ -595,11 +629,22 @@ func (c *client) Publish(topic string, qos byte, retained bool, payload interfac
595
629
token .messageID = pub .MessageID
596
630
}
597
631
persistOutbound (c .persist , pub )
598
- if c .connectionStatus () == reconnecting {
632
+ switch c .connectionStatus () {
633
+ case connecting :
634
+ DEBUG .Println (CLI , "storing publish message (connecting), topic:" , topic )
635
+ case reconnecting :
599
636
DEBUG .Println (CLI , "storing publish message (reconnecting), topic:" , topic )
600
- } else {
637
+ default :
601
638
DEBUG .Println (CLI , "sending publish message, topic:" , topic )
602
- c .obound <- & PacketAndToken {p : pub , t : token }
639
+ publishWaitTimeout := c .options .WriteTimeout
640
+ if publishWaitTimeout == 0 {
641
+ publishWaitTimeout = time .Second * 30
642
+ }
643
+ select {
644
+ case c .obound <- & PacketAndToken {p : pub , t : token }:
645
+ case <- time .After (publishWaitTimeout ):
646
+ token .setError (errors .New ("publish was broken by timeout" ))
647
+ }
603
648
}
604
649
return token
605
650
}
@@ -664,27 +709,58 @@ func (c *client) SubscribeMultiple(filters map[string]byte, callback MessageHand
664
709
return token
665
710
}
666
711
712
+ // reserveStoredPublishIDs reserves the ids for publish packets in the persistant store to ensure these are not duplicated
713
+ func (c * client ) reserveStoredPublishIDs () {
714
+ // The resume function sets the stored id for publish packets only (some other packets
715
+ // will get new ids in net code). This means that the only keys we need to ensure are
716
+ // unique are the publish ones (and these will completed/replaced in resume() )
717
+ if c .options .CleanSession == false {
718
+ storedKeys := c .persist .All ()
719
+ for _ , key := range storedKeys {
720
+ packet := c .persist .Get (key )
721
+ if packet == nil {
722
+ continue
723
+ }
724
+ switch packet .(type ) {
725
+ case * packets.PublishPacket :
726
+ details := packet .Details ()
727
+ token := & PlaceHolderToken {id : details .MessageID }
728
+ c .claimID (token , details .MessageID )
729
+ }
730
+ }
731
+ }
732
+ }
733
+
667
734
// Load all stored messages and resend them
668
735
// Call this to ensure QOS > 1,2 even after an application crash
669
736
func (c * client ) resume (subscription bool ) {
670
737
671
738
storedKeys := c .persist .All ()
672
739
for _ , key := range storedKeys {
673
740
packet := c .persist .Get (key )
741
+ if packet == nil {
742
+ continue
743
+ }
674
744
details := packet .Details ()
675
745
if isKeyOutbound (key ) {
676
746
switch packet .(type ) {
677
747
case * packets.SubscribePacket :
678
748
if subscription {
679
749
DEBUG .Println (STR , fmt .Sprintf ("loaded pending subscribe (%d)" , details .MessageID ))
680
750
token := newToken (packets .Subscribe ).(* SubscribeToken )
681
- c .oboundP <- & PacketAndToken {p : packet , t : token }
751
+ select {
752
+ case c .oboundP <- & PacketAndToken {p : packet , t : token }:
753
+ case <- c .stop :
754
+ }
682
755
}
683
756
case * packets.UnsubscribePacket :
684
757
if subscription {
685
758
DEBUG .Println (STR , fmt .Sprintf ("loaded pending unsubscribe (%d)" , details .MessageID ))
686
759
token := newToken (packets .Unsubscribe ).(* UnsubscribeToken )
687
- c .oboundP <- & PacketAndToken {p : packet , t : token }
760
+ select {
761
+ case c .oboundP <- & PacketAndToken {p : packet , t : token }:
762
+ case <- c .stop :
763
+ }
688
764
}
689
765
case * packets.PubrelPacket :
690
766
DEBUG .Println (STR , fmt .Sprintf ("loaded pending pubrel (%d)" , details .MessageID ))
@@ -698,7 +774,10 @@ func (c *client) resume(subscription bool) {
698
774
c .claimID (token , details .MessageID )
699
775
DEBUG .Println (STR , fmt .Sprintf ("loaded pending publish (%d)" , details .MessageID ))
700
776
DEBUG .Println (STR , details )
701
- c .obound <- & PacketAndToken {p : packet , t : token }
777
+ select {
778
+ case c .obound <- & PacketAndToken {p : packet , t : token }:
779
+ case <- c .stop :
780
+ }
702
781
default :
703
782
ERROR .Println (STR , "invalid message type in store (discarded)" )
704
783
c .persist .Del (key )
0 commit comments