Skip to content

Commit 660697f

Browse files
author
Al S-M
authored
Merge branch 'master' into master
2 parents b95da82 + 08f8223 commit 660697f

8 files changed

+47
-29
lines changed

client.go

+16-11
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package mqtt
1919

2020
import (
21+
"bytes"
2122
"errors"
2223
"fmt"
2324
"net"
@@ -140,9 +141,7 @@ func NewClient(o *ClientOptions) Client {
140141
c.messageIds = messageIds{index: make(map[uint16]tokenCompletor)}
141142
c.msgRouter, c.stopRouter = newRouter()
142143
c.msgRouter.setDefaultHandler(c.options.DefaultPublishHandler)
143-
if !c.options.AutoReconnect {
144-
c.options.MessageChannelDepth = 0
145-
}
144+
146145
return c
147146
}
148147

@@ -224,8 +223,8 @@ func (c *client) Connect() Token {
224223
return t
225224
}
226225

227-
c.obound = make(chan *PacketAndToken, c.options.MessageChannelDepth)
228-
c.oboundP = make(chan *PacketAndToken, c.options.MessageChannelDepth)
226+
c.obound = make(chan *PacketAndToken)
227+
c.oboundP = make(chan *PacketAndToken)
229228
c.ibound = make(chan packets.ControlPacket)
230229

231230
c.persist.Open()
@@ -337,7 +336,7 @@ func (c *client) Connect() Token {
337336
go keepalive(c)
338337
}
339338

340-
c.incomingPubChan = make(chan *packets.PublishPacket, c.options.MessageChannelDepth)
339+
c.incomingPubChan = make(chan *packets.PublishPacket)
341340
c.msgRouter.matchAndDispatch(c.incomingPubChan, c.options.Order, c)
342341

343342
c.setConnected(connected)
@@ -353,7 +352,7 @@ func (c *client) Connect() Token {
353352
go incoming(c)
354353

355354
// Take care of any messages in the store
356-
if c.options.CleanSession == false {
355+
if !c.options.CleanSession {
357356
c.resume(c.options.ResumeSubs)
358357
} else {
359358
c.persist.Reset()
@@ -611,11 +610,13 @@ func (c *client) Publish(topic string, qos byte, retained bool, payload interfac
611610
pub.Qos = qos
612611
pub.TopicName = topic
613612
pub.Retain = retained
614-
switch payload.(type) {
613+
switch p := payload.(type) {
615614
case string:
616-
pub.Payload = []byte(payload.(string))
615+
pub.Payload = []byte(p)
617616
case []byte:
618-
pub.Payload = payload.([]byte)
617+
pub.Payload = p
618+
case bytes.Buffer:
619+
pub.Payload = p.Bytes()
619620
default:
620621
token.setError(fmt.Errorf("Unknown payload type"))
621622
return token
@@ -633,9 +634,13 @@ func (c *client) Publish(topic string, qos byte, retained bool, payload interfac
633634
DEBUG.Println(CLI, "storing publish message (reconnecting), topic:", topic)
634635
default:
635636
DEBUG.Println(CLI, "sending publish message, topic:", topic)
637+
publishWaitTimeout := c.options.WriteTimeout
638+
if publishWaitTimeout == 0 {
639+
publishWaitTimeout = time.Second * 30
640+
}
636641
select {
637642
case c.obound <- &PacketAndToken{p: pub, t: token}:
638-
case <-time.After(c.options.WriteTimeout):
643+
case <-time.After(publishWaitTimeout):
639644
token.setError(errors.New("publish was broken by timeout"))
640645
}
641646
}

cmd/ssl/main.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ func main() {
113113
c.Subscribe("/go-mqtt/sample", 0, nil)
114114

115115
i := 0
116-
for _ = range time.Tick(time.Duration(1) * time.Second) {
116+
for range time.Tick(time.Duration(1) * time.Second) {
117117
if i == 5 {
118118
break
119119
}

filestore.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ func (store *FileStore) all() []string {
166166
for _, f := range files {
167167
DEBUG.Println(STR, "file in All():", f.Name())
168168
name := f.Name()
169-
if name[len(name)-4:len(name)] != msgExt {
169+
if name[len(name)-4:] != msgExt {
170170
DEBUG.Println(STR, "skipping file, doesn't have right extension: ", name)
171171
continue
172172
}

fvt_client_test.go

+19-1
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,24 @@ func Test_Publish_3(t *testing.T) {
144144
c.Disconnect(250)
145145
}
146146

147+
func Test_Publish_BytesBuffer(t *testing.T) {
148+
ops := NewClientOptions()
149+
ops.AddBroker(FVTTCP)
150+
ops.SetClientID("Publish_BytesBuffer")
151+
152+
c := NewClient(ops)
153+
token := c.Connect()
154+
if token.Wait() && token.Error() != nil {
155+
t.Fatalf("Error on Client.Connect(): %v", token.Error())
156+
}
157+
158+
payload := bytes.NewBufferString("Publish qos0")
159+
160+
c.Publish("test/Publish", 0, false, payload)
161+
162+
c.Disconnect(250)
163+
}
164+
147165
func Test_Subscribe(t *testing.T) {
148166
pops := NewClientOptions()
149167
pops.AddBroker(FVTTCP)
@@ -1028,7 +1046,7 @@ func Test_cleanUpMids(t *testing.T) {
10281046
c.(*client).messageIds.Unlock()
10291047
c.(*client).internalConnLost(fmt.Errorf("cleanup test"))
10301048

1031-
time.Sleep(5 * time.Second)
1049+
time.Sleep(1 * time.Second)
10321050
if !c.IsConnected() {
10331051
t.Fail()
10341052
}

options.go

+6-5
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
* Seth Hoenig
1111
* Allan Stockdill-Mander
1212
* Mike Robertson
13+
* Måns Ansgariusson
1314
*/
1415

1516
// Portions copyright © 2018 TIBCO Software Inc.
@@ -20,6 +21,7 @@ import (
2021
"crypto/tls"
2122
"net/http"
2223
"net/url"
24+
"regexp"
2325
"strings"
2426
"time"
2527
)
@@ -113,7 +115,6 @@ func NewClientOptions() *ClientOptions {
113115
OnConnect: nil,
114116
OnConnectionLost: DefaultConnectionLostHandler,
115117
WriteTimeout: 0, // 0 represents timeout disabled
116-
MessageChannelDepth: 100,
117118
ResumeSubs: false,
118119
HTTPHeaders: make(map[string][]string),
119120
}
@@ -129,12 +130,14 @@ func NewClientOptions() *ClientOptions {
129130
//
130131
// An example broker URI would look like: tcp://foobar.com:1883
131132
func (o *ClientOptions) AddBroker(server string) *ClientOptions {
133+
re := regexp.MustCompile(`%(25)?`)
132134
if len(server) > 0 && server[0] == ':' {
133135
server = "127.0.0.1" + server
134136
}
135137
if !strings.Contains(server, "://") {
136138
server = "tcp://" + server
137139
}
140+
server = re.ReplaceAllLiteralString(server, "%25")
138141
brokerURI, err := url.Parse(server)
139142
if err != nil {
140143
ERROR.Println(CLI, "Failed to parse %q broker address: %s", server, err)
@@ -344,10 +347,8 @@ func (o *ClientOptions) SetConnectRetry(a bool) *ClientOptions {
344347
return o
345348
}
346349

347-
// SetMessageChannelDepth sets the size of the internal queue that holds messages while the
348-
// client is temporairily offline, allowing the application to publish when the client is
349-
// reconnecting. This setting is only valid if AutoReconnect is set to true, it is otherwise
350-
// ignored.
350+
// SetMessageChannelDepth DEPRECATED The value set here no longer has any effect, this function
351+
// remains so the API is not altered.
351352
func (o *ClientOptions) SetMessageChannelDepth(s uint) *ClientOptions {
352353
o.MessageChannelDepth = s
353354
return o

router.go

+2-8
Original file line numberDiff line numberDiff line change
@@ -37,17 +37,11 @@ type route struct {
3737
// and returns a boolean of the outcome
3838
func match(route []string, topic []string) bool {
3939
if len(route) == 0 {
40-
if len(topic) == 0 {
41-
return true
42-
}
43-
return false
40+
return len(topic) == 0
4441
}
4542

4643
if len(topic) == 0 {
47-
if route[0] == "#" {
48-
return true
49-
}
50-
return false
44+
return route[0] == "#"
5145
}
5246

5347
if route[0] == "#" {

topic.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ func validateTopicAndQos(topic string, qos byte) error {
7575
}
7676
}
7777

78-
if qos < 0 || qos > 2 {
78+
if qos > 2 {
7979
return ErrInvalidQos
8080
}
8181
return nil

websocket.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ func NewWebsocket(host string, tlsc *tls.Config, timeout time.Duration, requestH
2727
ws, _, err := dialer.Dial(host, requestHeader)
2828

2929
if err != nil {
30-
panic(err)
30+
return nil, err
3131
}
3232

3333
wrapper := &websocketConnector{

0 commit comments

Comments
 (0)