Skip to content

Commit 925d8a9

Browse files
author
Al S-M
committed
Rather than having SetBroker and SetStandbyBroker for HA/clustered setups there is instead just AddBroker. The client now stores a slice of URLs and will iterate through all of them when trying to connect.
1 parent 55c6851 commit 925d8a9

5 files changed

+80
-86
lines changed

client.go

+7-20
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ package mqtt
1818
import (
1919
"bufio"
2020
"errors"
21-
"math/rand"
2221
"net"
2322
"sync"
2423
"time"
@@ -54,7 +53,6 @@ type MqttClient struct {
5453
stop chan struct{}
5554
receipts *receiptMap
5655
t *Tracer
57-
sessId uint
5856
persist Store
5957
options ClientOptions
6058
lastContact lastcontact
@@ -68,8 +66,6 @@ type MqttClient struct {
6866
// connection) are created before the application is actually ready.
6967
func NewClient(ops *ClientOptions) *MqttClient {
7068
c := &MqttClient{}
71-
c.sessId = uint(rand.Int())
72-
c.sessId = 0
7369
c.options = *ops
7470

7571
if c.options.store == nil {
@@ -104,24 +100,15 @@ func (c *MqttClient) Start() ([]Receipt, error) {
104100

105101
c.trace_v(CLI, "Start()")
106102

107-
c1, err1 := openConnection(c.options.server, c.options.tlsconfig)
108-
if err1 != nil {
109-
c.trace_w(CLI, "failed to connect to primary broker")
110-
if c.options.server2 != nil {
111-
c2, err2 := openConnection(c.options.server2, c.options.tlsconfig)
112-
if err2 != nil {
113-
c.trace_w(CLI, "failed to connect to standby broker")
114-
return nil, err1
115-
}
116-
c.conn = c2
117-
c.trace_v(CLI, "connected to standby broker")
103+
for _, broker := range c.options.servers {
104+
conn, err := openConnection(broker, c.options.tlsconfig)
105+
if err == nil {
106+
c.conn = conn
107+
c.trace_v(CLI, "connected to broker")
108+
break
118109
} else {
119-
c.trace_w(CLI, "standby broker is not configured")
120-
return nil, err1
110+
c.trace_w(CLI, "failed to connect to broker, trying next")
121111
}
122-
} else {
123-
c.conn = c1
124-
c.trace_v(CLI, "connected to primary broker")
125112
}
126113

127114
if c.conn == nil {

fvt_client_test.go

+57-39
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import "testing"
2525

2626
func Test_Start(t *testing.T) {
2727
ops := NewClientOptions().SetClientId("Start").
28-
SetBroker(FVT_TCP).
28+
AddBroker(FVT_TCP).
2929
SetStore(NewFileStore("/tmp/fvt/Start"))
3030
c := NewClient(ops)
3131

@@ -40,7 +40,7 @@ func Test_Start(t *testing.T) {
4040
/* uncomment this if you have connection policy disallowing FailClientID
4141
func Test_InvalidConnRc(t *testing.T) {
4242
ops := NewClientOptions().SetClientId("FailClientID").
43-
SetBroker("tcp://" + FVT_IP + ":17003").
43+
AddBroker("tcp://" + FVT_IP + ":17003").
4444
SetStore(NewFileStore("/tmp/fvt/InvalidConnRc"))
4545
4646
c := NewClient(ops)
@@ -78,7 +78,7 @@ func NewTlsConfig() *tls.Config {
7878
func Test_Start_Ssl(t *testing.T) {
7979
tlsconfig := NewTlsConfig()
8080
ops := NewClientOptions().SetClientId("StartSsl").
81-
SetBroker(FVT_SSL).
81+
AddBroker(FVT_SSL).
8282
SetStore(NewFileStore("/tmp/fvt/Start_Ssl")).
8383
SetTlsConfig(tlsconfig)
8484
@@ -95,7 +95,7 @@ func Test_Start_Ssl(t *testing.T) {
9595

9696
func Test_Publish_1(t *testing.T) {
9797
ops := NewClientOptions()
98-
ops.SetBroker(FVT_TCP)
98+
ops.AddBroker(FVT_TCP)
9999
ops.SetClientId("Publish_1")
100100
ops.SetStore(NewFileStore("/tmp/fvt/Publish_1"))
101101

@@ -112,7 +112,7 @@ func Test_Publish_1(t *testing.T) {
112112

113113
func Test_Publish_2(t *testing.T) {
114114
ops := NewClientOptions()
115-
ops.SetBroker(FVT_TCP)
115+
ops.AddBroker(FVT_TCP)
116116
ops.SetClientId("Publish_2")
117117
ops.SetStore(NewFileStore("/tmp/fvt/Publish_2"))
118118

@@ -130,7 +130,7 @@ func Test_Publish_2(t *testing.T) {
130130

131131
func Test_Publish_3(t *testing.T) {
132132
ops := NewClientOptions()
133-
ops.SetBroker(FVT_TCP)
133+
ops.AddBroker(FVT_TCP)
134134
ops.SetClientId("Publish_3")
135135
ops.SetStore(NewFileStore("/tmp/fvt/Publish_3"))
136136

@@ -149,13 +149,13 @@ func Test_Publish_3(t *testing.T) {
149149

150150
func Test_Subscribe(t *testing.T) {
151151
pops := NewClientOptions()
152-
pops.SetBroker(FVT_TCP)
152+
pops.AddBroker(FVT_TCP)
153153
pops.SetClientId("Subscribe_tx")
154154
pops.SetStore(NewFileStore("/tmp/fvt/Subscribe/p"))
155155
p := NewClient(pops)
156156

157157
sops := NewClientOptions()
158-
sops.SetBroker(FVT_TCP)
158+
sops.AddBroker(FVT_TCP)
159159
sops.SetClientId("Subscribe_rx")
160160
sops.SetStore(NewFileStore("/tmp/fvt/Subscribe/s"))
161161
var f MessageHandler = func(client *MqttClient, msg Message) {
@@ -187,7 +187,7 @@ func Test_Subscribe(t *testing.T) {
187187
func Test_Will(t *testing.T) {
188188
willmsgc := make(chan string)
189189

190-
sops := NewClientOptions().SetBroker(FVT_TCP)
190+
sops := NewClientOptions().AddBroker(FVT_TCP)
191191
sops.SetClientId("will-giver")
192192
sops.SetWill("/wills", "good-byte!", QOS_ZERO, false)
193193
sops.SetOnConnectionLost(func(client *MqttClient, err error) {
@@ -196,7 +196,7 @@ func Test_Will(t *testing.T) {
196196
c := NewClient(sops)
197197

198198
wops := NewClientOptions()
199-
wops.SetBroker(FVT_TCP)
199+
wops.AddBroker(FVT_TCP)
200200
wops.SetClientId("will-subscriber")
201201
wops.SetStore(NewFileStore("/tmp/fvt/Will"))
202202
wops.SetDefaultPublishHandler(func(client *MqttClient, msg Message) {
@@ -238,14 +238,14 @@ func Test_Binary_Will(t *testing.T) {
238238
0xEF,
239239
}
240240

241-
sops := NewClientOptions().SetBroker(FVT_TCP)
241+
sops := NewClientOptions().AddBroker(FVT_TCP)
242242
sops.SetClientId("will-giver")
243243
sops.SetBinaryWill("/wills", will, QOS_ZERO, false)
244244
sops.SetOnConnectionLost(func(client *MqttClient, err error) {
245245
})
246246
c := NewClient(sops)
247247

248-
wops := NewClientOptions().SetBroker(FVT_TCP)
248+
wops := NewClientOptions().AddBroker(FVT_TCP)
249249
wops.SetClientId("will-subscriber")
250250
wops.SetStore(NewFileStore("/tmp/fvt/Binary_Will"))
251251
wops.SetDefaultPublishHandler(func(client *MqttClient, msg Message) {
@@ -302,13 +302,13 @@ func Test_p0s0(t *testing.T) {
302302
choke := make(chan bool)
303303

304304
pops := NewClientOptions()
305-
pops.SetBroker(FVT_TCP)
305+
pops.AddBroker(FVT_TCP)
306306
pops.SetClientId("p0s0-pub")
307307
pops.SetStore(NewFileStore(store + "/p"))
308308
p := NewClient(pops)
309309

310310
sops := NewClientOptions()
311-
sops.SetBroker(FVT_TCP)
311+
sops.AddBroker(FVT_TCP)
312312
sops.SetClientId("p0s0-sub")
313313
sops.SetStore(NewFileStore(store + "/s"))
314314
var f MessageHandler = func(client *MqttClient, msg Message) {
@@ -362,13 +362,13 @@ func Test_p0s1(t *testing.T) {
362362
choke := make(chan bool)
363363

364364
pops := NewClientOptions()
365-
pops.SetBroker(FVT_TCP)
365+
pops.AddBroker(FVT_TCP)
366366
pops.SetClientId("p0s1-pub")
367367
pops.SetStore(NewFileStore(store + "/p"))
368368
p := NewClient(pops)
369369

370370
sops := NewClientOptions()
371-
sops.SetBroker(FVT_TCP)
371+
sops.AddBroker(FVT_TCP)
372372
sops.SetClientId("p0s1-sub")
373373
sops.SetStore(NewFileStore(store + "/s"))
374374
var f MessageHandler = func(client *MqttClient, msg Message) {
@@ -421,13 +421,13 @@ func Test_p0s2(t *testing.T) {
421421
choke := make(chan bool)
422422

423423
pops := NewClientOptions()
424-
pops.SetBroker(FVT_TCP)
424+
pops.AddBroker(FVT_TCP)
425425
pops.SetClientId("p0s2-pub")
426426
pops.SetStore(NewFileStore(store + "/p"))
427427
p := NewClient(pops)
428428

429429
sops := NewClientOptions()
430-
sops.SetBroker(FVT_TCP)
430+
sops.AddBroker(FVT_TCP)
431431
sops.SetClientId("p0s2-sub")
432432
sops.SetStore(NewFileStore(store + "/s"))
433433
var f MessageHandler = func(client *MqttClient, msg Message) {
@@ -480,13 +480,13 @@ func Test_p1s0(t *testing.T) {
480480
choke := make(chan bool)
481481

482482
pops := NewClientOptions()
483-
pops.SetBroker(FVT_TCP)
483+
pops.AddBroker(FVT_TCP)
484484
pops.SetClientId("p1s0-pub")
485485
pops.SetStore(NewFileStore(store + "/p"))
486486
p := NewClient(pops)
487487

488488
sops := NewClientOptions()
489-
sops.SetBroker(FVT_TCP)
489+
sops.AddBroker(FVT_TCP)
490490
sops.SetClientId("p1s0-sub")
491491
sops.SetStore(NewFileStore(store + "/s"))
492492
var f MessageHandler = func(client *MqttClient, msg Message) {
@@ -539,13 +539,13 @@ func Test_p1s1(t *testing.T) {
539539
choke := make(chan bool)
540540

541541
pops := NewClientOptions()
542-
pops.SetBroker(FVT_TCP)
542+
pops.AddBroker(FVT_TCP)
543543
pops.SetClientId("p1s1-pub")
544544
pops.SetStore(NewFileStore(store + "/p"))
545545
p := NewClient(pops)
546546

547547
sops := NewClientOptions()
548-
sops.SetBroker(FVT_TCP)
548+
sops.AddBroker(FVT_TCP)
549549
sops.SetClientId("p1s1-sub")
550550
sops.SetStore(NewFileStore(store + "/s"))
551551
var f MessageHandler = func(client *MqttClient, msg Message) {
@@ -598,13 +598,13 @@ func Test_p1s2(t *testing.T) {
598598
choke := make(chan bool)
599599

600600
pops := NewClientOptions()
601-
pops.SetBroker(FVT_TCP)
601+
pops.AddBroker(FVT_TCP)
602602
pops.SetClientId("p1s2-pub")
603603
pops.SetStore(NewFileStore(store + "/p"))
604604
p := NewClient(pops)
605605

606606
sops := NewClientOptions()
607-
sops.SetBroker(FVT_TCP)
607+
sops.AddBroker(FVT_TCP)
608608
sops.SetClientId("p1s2-sub")
609609
sops.SetStore(NewFileStore(store + "/s"))
610610
var f MessageHandler = func(client *MqttClient, msg Message) {
@@ -656,13 +656,13 @@ func Test_p2s0(t *testing.T) {
656656
choke := make(chan bool)
657657

658658
pops := NewClientOptions()
659-
pops.SetBroker(FVT_TCP)
659+
pops.AddBroker(FVT_TCP)
660660
pops.SetClientId("p2s0-pub")
661661
pops.SetStore(NewFileStore(store + "/p"))
662662
p := NewClient(pops)
663663

664664
sops := NewClientOptions()
665-
sops.SetBroker(FVT_TCP)
665+
sops.AddBroker(FVT_TCP)
666666
sops.SetClientId("p2s0-sub")
667667
sops.SetStore(NewFileStore(store + "/s"))
668668
var f MessageHandler = func(client *MqttClient, msg Message) {
@@ -712,13 +712,13 @@ func Test_p2s1(t *testing.T) {
712712
choke := make(chan bool)
713713

714714
pops := NewClientOptions()
715-
pops.SetBroker(FVT_TCP)
715+
pops.AddBroker(FVT_TCP)
716716
pops.SetClientId("p2s1-pub")
717717
pops.SetStore(NewFileStore(store + "/p"))
718718
p := NewClient(pops)
719719

720720
sops := NewClientOptions()
721-
sops.SetBroker(FVT_TCP)
721+
sops.AddBroker(FVT_TCP)
722722
sops.SetClientId("p2s1-sub")
723723
sops.SetStore(NewFileStore(store + "/s"))
724724
var f MessageHandler = func(client *MqttClient, msg Message) {
@@ -771,13 +771,13 @@ func Test_p2s2(t *testing.T) {
771771
choke := make(chan bool)
772772

773773
pops := NewClientOptions()
774-
pops.SetBroker(FVT_TCP)
774+
pops.AddBroker(FVT_TCP)
775775
pops.SetClientId("p2s2-pub")
776776
pops.SetStore(NewFileStore(store + "/p"))
777777
p := NewClient(pops)
778778

779779
sops := NewClientOptions()
780-
sops.SetBroker(FVT_TCP)
780+
sops.AddBroker(FVT_TCP)
781781
sops.SetClientId("p2s2-sub")
782782
sops.SetStore(NewFileStore(store + "/s"))
783783
var f MessageHandler = func(client *MqttClient, msg Message) {
@@ -828,13 +828,13 @@ func Test_PublishMessage(t *testing.T) {
828828
choke := make(chan bool)
829829

830830
pops := NewClientOptions()
831-
pops.SetBroker(FVT_TCP)
831+
pops.AddBroker(FVT_TCP)
832832
pops.SetClientId("pubmsg-pub")
833833
pops.SetStore(NewFileStore(store + "/p"))
834834
p := NewClient(pops)
835835

836836
sops := NewClientOptions()
837-
sops.SetBroker(FVT_TCP)
837+
sops.AddBroker(FVT_TCP)
838838
sops.SetClientId("pubmsg-sub")
839839
sops.SetStore(NewFileStore(store + "/s"))
840840
var f MessageHandler = func(client *MqttClient, msg Message) {
@@ -888,13 +888,13 @@ func Test_PublishEmptyMessage(t *testing.T) {
888888
choke := make(chan bool)
889889

890890
pops := NewClientOptions()
891-
pops.SetBroker(FVT_TCP)
891+
pops.AddBroker(FVT_TCP)
892892
pops.SetClientId("pubmsgempty-pub")
893893
pops.SetStore(NewFileStore(store + "/p"))
894894
p := NewClient(pops)
895895

896896
sops := NewClientOptions()
897-
sops.SetBroker(FVT_TCP)
897+
sops.AddBroker(FVT_TCP)
898898
sops.SetClientId("pubmsgempty-sub")
899899
sops.SetStore(NewFileStore(store + "/s"))
900900
var f MessageHandler = func(client *MqttClient, msg Message) {
@@ -942,14 +942,14 @@ func Test_Cleanstore(t *testing.T) {
942942
topic := "/test/cleanstore"
943943

944944
pops := NewClientOptions()
945-
pops.SetBroker(FVT_TCP)
945+
pops.AddBroker(FVT_TCP)
946946
pops.SetClientId("cleanstore-pub")
947947
pops.SetStore(NewFileStore(store + "/p"))
948948
p := NewClient(pops)
949949

950950
var s *MqttClient
951951
sops := NewClientOptions()
952-
sops.SetBroker(FVT_TCP)
952+
sops.AddBroker(FVT_TCP)
953953
sops.SetClientId("cleanstore-sub")
954954
sops.SetCleanSession(false)
955955
sops.SetStore(NewFileStore(store + "/s"))
@@ -994,7 +994,7 @@ func Test_Cleanstore(t *testing.T) {
994994
p.Disconnect(250)
995995

996996
sops = NewClientOptions()
997-
sops.SetBroker(FVT_TCP)
997+
sops.AddBroker(FVT_TCP)
998998
sops.SetClientId("cleanstore-sub")
999999
sops.SetCleanSession(true)
10001000
sops.SetStore(NewFileStore(store + "/s"))
@@ -1010,13 +1010,31 @@ func Test_Cleanstore(t *testing.T) {
10101010
// how to check?
10111011
}
10121012

1013+
func Test_MultipleURLs(t *testing.T) {
1014+
ops := NewClientOptions()
1015+
ops.AddBroker("tcp://127.0.0.1:10000")
1016+
ops.AddBroker(FVT_TCP)
1017+
ops.SetClientId("MutliURL")
1018+
ops.SetStore(NewFileStore("/tmp/fvt/MultiURL"))
1019+
1020+
c := NewClient(ops)
1021+
_, err := c.Start()
1022+
if err != nil {
1023+
t.Fatalf("Error on MqttClient.Start(): %v", err)
1024+
}
1025+
1026+
c.Publish(QOS_ZERO, "/test/MultiURL", []byte("Publish qo0"))
1027+
1028+
c.Disconnect(250)
1029+
}
1030+
10131031
/*
10141032
// A test to make sure ping mechanism is working
10151033
// This test can be left commented out because it's annoying to wait for
10161034
func Test_ping3_idle10(t *testing.T) {
10171035
ops := NewClientOptions()
1018-
ops.SetBroker(FVT_TCP)
1019-
//ops.SetBroker("tcp://test.mosquitto.org:1883")
1036+
ops.AddBroker(FVT_TCP)
1037+
//ops.AddBroker("tcp://test.mosquitto.org:1883")
10201038
ops.SetClientId("p3i10")
10211039
ops.SetTimeout(4)
10221040

0 commit comments

Comments
 (0)