Skip to content

Commit ad6f78e

Browse files
committed
Try and remove blocking operations that can deadlock and data races
eclipse-paho#122
1 parent 5255135 commit ad6f78e

6 files changed

+118
-82
lines changed

client.go

+20-18
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,14 @@ import (
2020
"fmt"
2121
"net"
2222
"sync"
23+
"sync/atomic"
2324
"time"
2425

2526
"github.com/eclipse/paho.mqtt.golang/packets"
2627
)
2728

28-
type connStatus uint
29-
3029
const (
31-
disconnected connStatus = iota
30+
disconnected uint32 = iota
3231
connecting
3332
reconnecting
3433
connected
@@ -78,10 +77,10 @@ type client struct {
7877
stop chan struct{}
7978
persist Store
8079
options ClientOptions
81-
pingResp chan struct{}
82-
packetResp chan struct{}
83-
keepaliveReset chan struct{}
84-
status connStatus
80+
pingResp *sync.Cond
81+
packetResp *sync.Cond
82+
keepaliveReset *sync.Cond
83+
status uint32
8584
workers sync.WaitGroup
8685
}
8786

@@ -125,26 +124,28 @@ func (c *client) AddRoute(topic string, callback MessageHandler) {
125124
func (c *client) IsConnected() bool {
126125
c.RLock()
127126
defer c.RUnlock()
127+
status := atomic.LoadUint32(&c.status)
128128
switch {
129-
case c.status == connected:
129+
case status == connected:
130130
return true
131-
case c.options.AutoReconnect && c.status > disconnected:
131+
case c.options.AutoReconnect && status > disconnected:
132132
return true
133133
default:
134134
return false
135135
}
136136
}
137137

138-
func (c *client) connectionStatus() connStatus {
138+
func (c *client) connectionStatus() uint32 {
139139
c.RLock()
140140
defer c.RUnlock()
141-
return c.status
141+
status := atomic.LoadUint32(&c.status)
142+
return status
142143
}
143144

144-
func (c *client) setConnected(status connStatus) {
145+
func (c *client) setConnected(status uint32) {
145146
c.Lock()
146147
defer c.Unlock()
147-
c.status = status
148+
atomic.StoreUint32(&c.status, uint32(status))
148149
}
149150

150151
//ErrNotConnected is the error returned from function calls that are
@@ -236,9 +237,9 @@ func (c *client) Connect() Token {
236237
c.ibound = make(chan packets.ControlPacket)
237238
c.errors = make(chan error, 1)
238239
c.stop = make(chan struct{})
239-
c.pingResp = make(chan struct{}, 1)
240-
c.packetResp = make(chan struct{}, 1)
241-
c.keepaliveReset = make(chan struct{}, 1)
240+
c.pingResp = sync.NewCond(&sync.Mutex{})
241+
c.packetResp = sync.NewCond(&sync.Mutex{})
242+
c.keepaliveReset = sync.NewCond(&sync.Mutex{})
242243

243244
if c.options.KeepAlive != 0 {
244245
c.workers.Add(1)
@@ -336,7 +337,7 @@ func (c *client) reconnect() {
336337
}
337338
}
338339
// Disconnect() must have been called while we were trying to reconnect.
339-
if c.status == disconnected {
340+
if c.connectionStatus() == disconnected {
340341
DEBUG.Println(CLI, "Client moved to disconnected state while reconnecting, abandoning reconnect")
341342
return
342343
}
@@ -393,7 +394,8 @@ func (c *client) connect() byte {
393394
// the specified number of milliseconds to wait for existing work to be
394395
// completed.
395396
func (c *client) Disconnect(quiesce uint) {
396-
if c.status == connected {
397+
status := atomic.LoadUint32(&c.status)
398+
if status == connected {
397399
DEBUG.Println(CLI, "disconnecting")
398400
c.setConnected(disconnected)
399401

fvt_client_test.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -866,6 +866,7 @@ func Test_PublishEmptyMessage(t *testing.T) {
866866
wait(choke)
867867

868868
p.Disconnect(250)
869+
s.Disconnect(250)
869870
}
870871

871872
// func Test_Cleanstore(t *testing.T) {
@@ -962,14 +963,14 @@ func Test_ping1_idle5(t *testing.T) {
962963
ops.SetConnectionLostHandler(func(c Client, err error) {
963964
t.Fatalf("Connection-lost handler was called: %s", err)
964965
})
965-
ops.SetKeepAlive(2 * time.Second)
966+
ops.SetKeepAlive(3 * time.Second)
966967

967968
c := NewClient(ops)
968969

969970
if token := c.Connect(); token.Wait() && token.Error() != nil {
970971
t.Fatalf("Error on Client.Connect(): %v", token.Error())
971972
}
972-
time.Sleep(5 * time.Second)
973+
time.Sleep(8 * time.Second)
973974
c.Disconnect(250)
974975
}
975976

net.go

+17-12
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ func incoming(c *client) {
125125
case c.ibound <- cp:
126126
// Notify keepalive logic that we recently received a packet
127127
if c.options.KeepAlive != 0 {
128-
c.packetResp <- struct{}{}
128+
c.packetResp.Broadcast()
129129
}
130130
case <-c.stop:
131131
// This avoids a deadlock should a message arrive while shutting down.
@@ -205,11 +205,7 @@ func outgoing(c *client) {
205205
}
206206
// Reset ping timer after sending control packet.
207207
if c.options.KeepAlive != 0 {
208-
select {
209-
case c.keepaliveReset <- struct{}{}:
210-
default:
211-
DEBUG.Println(NET, "couldn't send keepalive signal in outbound as channel full")
212-
}
208+
c.keepaliveReset.Broadcast()
213209
}
214210
}
215211
}
@@ -232,7 +228,7 @@ func alllogic(c *client) {
232228
switch m := msg.(type) {
233229
case *packets.PingrespPacket:
234230
DEBUG.Println(NET, "received pingresp")
235-
c.pingResp <- struct{}{}
231+
c.pingResp.Broadcast()
236232
case *packets.SubackPacket:
237233
DEBUG.Println(NET, "received suback, id:", m.MessageID)
238234
token := c.getToken(m.MessageID).(*SubscribeToken)
@@ -257,18 +253,27 @@ func alllogic(c *client) {
257253
pr := packets.NewControlPacket(packets.Pubrec).(*packets.PubrecPacket)
258254
pr.MessageID = m.MessageID
259255
DEBUG.Println(NET, "putting pubrec msg on obound")
260-
c.oboundP <- &PacketAndToken{p: pr, t: nil}
256+
select {
257+
case c.oboundP <- &PacketAndToken{p: pr, t: nil}:
258+
case <-c.stop:
259+
}
261260
DEBUG.Println(NET, "done putting pubrec msg on obound")
262261
case 1:
263262
c.incomingPubChan <- m
264263
DEBUG.Println(NET, "done putting msg on incomingPubChan")
265264
pa := packets.NewControlPacket(packets.Puback).(*packets.PubackPacket)
266265
pa.MessageID = m.MessageID
267266
DEBUG.Println(NET, "putting puback msg on obound")
268-
c.oboundP <- &PacketAndToken{p: pa, t: nil}
267+
select {
268+
case c.oboundP <- &PacketAndToken{p: pa, t: nil}:
269+
case <-c.stop:
270+
}
269271
DEBUG.Println(NET, "done putting puback msg on obound")
270272
case 0:
271-
c.incomingPubChan <- m
273+
select {
274+
case c.incomingPubChan <- m:
275+
case <-c.stop:
276+
}
272277
DEBUG.Println(NET, "done putting msg on incomingPubChan")
273278
}
274279
case *packets.PubackPacket:
@@ -283,15 +288,15 @@ func alllogic(c *client) {
283288
prel.MessageID = m.MessageID
284289
select {
285290
case c.oboundP <- &PacketAndToken{p: prel, t: nil}:
286-
case <-time.After(time.Second):
291+
case <-c.stop:
287292
}
288293
case *packets.PubrelPacket:
289294
DEBUG.Println(NET, "received pubrel, id:", m.MessageID)
290295
pc := packets.NewControlPacket(packets.Pubcomp).(*packets.PubcompPacket)
291296
pc.MessageID = m.MessageID
292297
select {
293298
case c.oboundP <- &PacketAndToken{p: pc, t: nil}:
294-
case <-time.After(time.Second):
299+
case <-c.stop:
295300
}
296301
case *packets.PubcompPacket:
297302
DEBUG.Println(NET, "received pubcomp, id:", m.MessageID)

options_reader.go

+5
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ type ClientOptionsReader struct {
2525
options *ClientOptions
2626
}
2727

28+
//Servers returns a slice of the servers defined in the clientoptions
2829
func (r *ClientOptionsReader) Servers() []*url.URL {
2930
s := make([]*url.URL, len(r.options.Servers))
3031

@@ -36,21 +37,25 @@ func (r *ClientOptionsReader) Servers() []*url.URL {
3637
return s
3738
}
3839

40+
//ClientID returns the set client id
3941
func (r *ClientOptionsReader) ClientID() string {
4042
s := r.options.ClientID
4143
return s
4244
}
4345

46+
//Username returns the set username
4447
func (r *ClientOptionsReader) Username() string {
4548
s := r.options.Username
4649
return s
4750
}
4851

52+
//Password returns the set password
4953
func (r *ClientOptionsReader) Password() string {
5054
s := r.options.Password
5155
return s
5256
}
5357

58+
//CleanSession returns whether Cleansession is set
5459
func (r *ClientOptionsReader) CleanSession() bool {
5560
s := r.options.CleanSession
5661
return s

ping.go

+73-13
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package mqtt
1616

1717
import (
1818
"errors"
19+
"sync"
1920
"time"
2021

2122
"github.com/eclipse/paho.mqtt.golang/packets"
@@ -24,32 +25,81 @@ import (
2425
func keepalive(c *client) {
2526
DEBUG.Println(PNG, "keepalive starting")
2627

28+
var condWG sync.WaitGroup
29+
30+
defer func() {
31+
c.keepaliveReset.Broadcast()
32+
c.pingResp.Broadcast()
33+
c.packetResp.Broadcast()
34+
condWG.Wait()
35+
c.workers.Done()
36+
}()
37+
2738
receiveInterval := c.options.KeepAlive + (1 * time.Second)
2839
pingTimer := timer{Timer: time.NewTimer(c.options.KeepAlive)}
2940
receiveTimer := timer{Timer: time.NewTimer(receiveInterval)}
3041
pingRespTimer := timer{Timer: time.NewTimer(c.options.PingTimeout)}
3142

3243
pingRespTimer.Stop()
3344

45+
condWG.Add(3)
46+
go func() {
47+
defer condWG.Done()
48+
for {
49+
c.pingResp.L.Lock()
50+
c.pingResp.Wait()
51+
c.pingResp.L.Unlock()
52+
select {
53+
case <-c.stop:
54+
return
55+
default:
56+
}
57+
DEBUG.Println(NET, "resetting ping timeout timer")
58+
pingRespTimer.Stop()
59+
pingTimer.Reset(c.options.KeepAlive)
60+
receiveTimer.Reset(receiveInterval)
61+
}
62+
}()
63+
64+
go func() {
65+
defer condWG.Done()
66+
for {
67+
c.packetResp.L.Lock()
68+
c.packetResp.Wait()
69+
c.packetResp.L.Unlock()
70+
select {
71+
case <-c.stop:
72+
return
73+
default:
74+
}
75+
DEBUG.Println(NET, "resetting receive timer")
76+
receiveTimer.Reset(receiveInterval)
77+
}
78+
}()
79+
80+
go func() {
81+
defer condWG.Done()
82+
for {
83+
c.keepaliveReset.L.Lock()
84+
c.keepaliveReset.Wait()
85+
c.keepaliveReset.L.Unlock()
86+
select {
87+
case <-c.stop:
88+
return
89+
default:
90+
}
91+
DEBUG.Println(NET, "resetting ping timer")
92+
pingTimer.Reset(c.options.KeepAlive)
93+
}
94+
}()
95+
3496
for {
3597
select {
3698
case <-c.stop:
3799
DEBUG.Println(PNG, "keepalive stopped")
38-
c.workers.Done()
39100
return
40101
case <-pingTimer.C:
41102
sendPing(&pingTimer, &pingRespTimer, c)
42-
case <-c.keepaliveReset:
43-
DEBUG.Println(NET, "resetting ping timer")
44-
pingTimer.Reset(c.options.KeepAlive)
45-
case <-c.pingResp:
46-
DEBUG.Println(NET, "resetting ping timeout timer")
47-
pingRespTimer.Stop()
48-
pingTimer.Reset(c.options.KeepAlive)
49-
receiveTimer.Reset(receiveInterval)
50-
case <-c.packetResp:
51-
DEBUG.Println(NET, "resetting receive timer")
52-
receiveTimer.Reset(receiveInterval)
53103
case <-receiveTimer.C:
54104
receiveTimer.SetRead(true)
55105
receiveTimer.Reset(receiveInterval)
@@ -66,16 +116,21 @@ func keepalive(c *client) {
66116
}
67117

68118
type timer struct {
119+
sync.Mutex
69120
*time.Timer
70121
readFrom bool
71122
}
72123

73124
func (t *timer) SetRead(v bool) {
125+
t.Lock()
74126
t.readFrom = v
127+
t.Unlock()
75128
}
76129

77130
func (t *timer) Stop() bool {
131+
t.Lock()
78132
defer t.SetRead(true)
133+
defer t.Unlock()
79134

80135
if !t.Timer.Stop() && !t.readFrom {
81136
<-t.C
@@ -85,8 +140,13 @@ func (t *timer) Stop() bool {
85140
}
86141

87142
func (t *timer) Reset(d time.Duration) bool {
143+
t.Lock()
88144
defer t.SetRead(false)
89-
t.Stop()
145+
defer t.Unlock()
146+
if !t.Timer.Stop() && !t.readFrom {
147+
<-t.C
148+
}
149+
90150
return t.Timer.Reset(d)
91151
}
92152

0 commit comments

Comments
 (0)