Skip to content

Commit 0d6c6e7

Browse files
author
cdeiters
committed
Bugfix: c.stop closed twice in some startup conditions.
Sometimes net.connect sees an issue and pushes an error on c.errors before returning an rc of CONN_FAILURE. It is also possible to exit net.connect with a non-successful rc but NOT get an error on c.errors. So I updated net.connect to not push errors on c.errors, but instead just return CONN_FAILURE and allow client.Start to put the erorr on the channel. In addition, I made c.errors a buffer size 1 and updated all pushes to that channel to be a select with an empty default. This allows us to use c.errors as a flag channel so that the first error can always get through, but subsequent attempts to push errors will not block forever. Also, the begin chan ConnRC in MqttClient was not longer being initialized so I removed it. I also removed the call to close it in net.connect, since closing a nil channel was causing a crash. Change-Id: I0988ad51e7740b03fff832084ea65dcbce173884 Signed-off-by: Christie Deiters <cdeiters@lutron.com>
1 parent 4572889 commit 0d6c6e7

File tree

2 files changed

+38
-22
lines changed

2 files changed

+38
-22
lines changed

client.go

+9-12
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@ type MqttClient struct {
4848
ibound chan *Message
4949
obound chan sendable
5050
oboundP chan *Message
51-
begin chan ConnRC
5251
errors chan error
5352
stop chan struct{}
5453
receipts *receiptMap
@@ -116,7 +115,7 @@ func (c *MqttClient) Start() ([]Receipt, error) {
116115
c.obound = make(chan sendable)
117116
c.ibound = make(chan *Message)
118117
c.oboundP = make(chan *Message)
119-
c.errors = make(chan error)
118+
c.errors = make(chan error, 1) // all we need is one error to trigger alllogic to clean up
120119
c.stop = make(chan struct{})
121120

122121
go outgoing(c)
@@ -127,11 +126,14 @@ func (c *MqttClient) Start() ([]Receipt, error) {
127126
c.oboundP <- cm
128127

129128
rc := connect(c)
130-
if rc != CONN_ACCEPTED {
129+
if chkrc(rc) != nil {
131130
CRITICAL.Println(CLI, "CONNACK was not CONN_ACCEPTED, but rather", rc2str(rc))
132-
// Stop all go routines except outgoing
133-
close(c.stop)
134-
c.conn.Close()
131+
err := errors.New("CONNACK was not CONN_ACCEPTED, but rather " + rc2str(rc))
132+
select {
133+
case c.errors <- err:
134+
default:
135+
// c.errors is a buffer of one, so there must already be an error closing this connection.
136+
}
135137
return nil, chkrc(rc)
136138
}
137139

@@ -157,12 +159,7 @@ func (c *MqttClient) Start() ([]Receipt, error) {
157159
go incoming(c)
158160

159161
DEBUG.Println(CLI, "exit startMqttClient")
160-
if chkrc(rc) != nil {
161-
// Cleanup before returning.
162-
close(c.stop)
163-
c.conn.Close()
164-
}
165-
return leftovers, chkrc(rc)
162+
return leftovers, nil
166163
}
167164

168165
// Disconnect will end the connection with the server, but not before waiting

net.go

+29-10
Original file line numberDiff line numberDiff line change
@@ -56,19 +56,22 @@ func connect(c *MqttClient) (rc ConnRC) {
5656
_, err := io.ReadFull(c.bufferedConn, ca)
5757
if err != nil {
5858
ERROR.Println(NET, "connect got error")
59-
c.errors <- err
59+
select {
60+
case c.errors <- err:
61+
default:
62+
// c.errors is a buffer of one, so there must already be an error closing this connection.
63+
}
6064
return
6165
}
6266
msg := decode(ca)
6367

6468
if msg == nil || msg.msgType() != CONNACK {
65-
close(c.begin)
6669
ERROR.Println(NET, "received msg that was nil or not CONNACK")
67-
} else {
68-
DEBUG.Println(NET, "received connack")
69-
rc = msg.connRC()
70+
return
7071
}
71-
return
72+
73+
DEBUG.Println(NET, "received connack")
74+
return msg.connRC()
7275
}
7376

7477
// actually read incoming messages off the wire
@@ -121,7 +124,11 @@ func incoming(c *MqttClient) {
121124
// Not trying to disconnect, send the error to the errors channel
122125
default:
123126
ERROR.Println(NET, "incoming stopped with error")
124-
c.errors <- err
127+
select {
128+
case c.errors <- err:
129+
default:
130+
// c.errors is a buffer of one, so there must already be an error closing this connection.
131+
}
125132
return
126133
}
127134
}
@@ -154,7 +161,11 @@ func outgoing(c *MqttClient) {
154161

155162
if _, err := c.conn.Write(msg.Bytes()); err != nil {
156163
ERROR.Println(NET, "outgoing stopped with error")
157-
c.errors <- err
164+
select {
165+
case c.errors <- err:
166+
default:
167+
// c.errors is a buffer of one, so there must already be an error closing this connection.
168+
}
158169
return
159170
}
160171

@@ -177,7 +188,11 @@ func outgoing(c *MqttClient) {
177188
_, err := c.conn.Write(msg.Bytes())
178189
if err != nil {
179190
ERROR.Println(NET, "outgoing stopped with error")
180-
c.errors <- err
191+
select {
192+
case c.errors <- err:
193+
default:
194+
// c.errors is a buffer of one, so there must already be an error closing this connection.
195+
}
181196
return
182197
}
183198
c.lastContact.update()
@@ -249,7 +264,11 @@ func alllogic(c *MqttClient) {
249264
// select can handle it appropriately.
250265
if ok {
251266
go func(errVal error, errChan chan error) {
252-
errChan <- errVal
267+
select {
268+
case errChan <- errVal:
269+
default:
270+
// c.errors is a buffer of one, so there must already be an error closing this connection.
271+
}
253272
}(err, c.errors)
254273
}
255274
}

0 commit comments

Comments
 (0)