Skip to content

Commit 222d3c1

Browse files
committed
Handle connection loss during call to Disconnect() (including tests). Also reduce noise from tests.
Ref issue eclipse-paho#501
1 parent 4d373b3 commit 222d3c1

4 files changed

+61
-12
lines changed

client.go

+14-4
Original file line numberDiff line numberDiff line change
@@ -439,12 +439,22 @@ func (c *client) Disconnect(quiesce uint) {
439439

440440
dm := packets.NewControlPacket(packets.Disconnect).(*packets.DisconnectPacket)
441441
dt := newToken(packets.Disconnect)
442-
c.oboundP <- &PacketAndToken{p: dm, t: dt}
442+
disconnectSent := false
443+
select {
444+
case c.oboundP <- &PacketAndToken{p: dm, t: dt}:
445+
disconnectSent = true
446+
case <-c.commsStopped:
447+
WARN.Println("Disconnect packet could not be sent because comms stopped")
448+
case <-time.After(time.Duration(quiesce) * time.Millisecond):
449+
WARN.Println("Disconnect packet not sent due to timeout")
450+
}
443451

444452
// wait for work to finish, or quiesce time consumed
445-
DEBUG.Println(CLI, "calling WaitTimeout")
446-
dt.WaitTimeout(time.Duration(quiesce) * time.Millisecond)
447-
DEBUG.Println(CLI, "WaitTimeout done")
453+
if disconnectSent {
454+
DEBUG.Println(CLI, "calling WaitTimeout")
455+
dt.WaitTimeout(time.Duration(quiesce) * time.Millisecond)
456+
DEBUG.Println(CLI, "WaitTimeout done")
457+
}
448458
} else {
449459
WARN.Println(CLI, "Disconnect() called but not connected (disconnected/reconnecting)")
450460
c.setConnected(disconnected)

fvt_client_test.go

+41-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,19 @@ func Test_Start(t *testing.T) {
3131
t.Fatalf("Error on Client.Connect(): %v", token.Error())
3232
}
3333

34-
c.Disconnect(250)
34+
// Disconnect should return within 250ms and calling a second time should not block
35+
disconnectC := make(chan struct{}, 1)
36+
go func() {
37+
c.Disconnect(250)
38+
c.Disconnect(5)
39+
close(disconnectC)
40+
}()
41+
42+
select {
43+
case <-time.After(time.Millisecond * 300):
44+
t.Errorf("disconnect did not finnish within 300ms")
45+
case <-disconnectC:
46+
}
3547
}
3648

3749
/* uncomment this if you have connection policy disallowing FailClientID
@@ -90,6 +102,34 @@ func Test_Start(t *testing.T) {
90102
}
91103
*/
92104

105+
// Disconnect should not block under any circumstance
106+
// This is triggered by issue #501; there is a very slight chance that Disconnect could get through the
107+
// `status == connected` check and then the connection drops...
108+
func Test_Disconnect(t *testing.T) {
109+
ops := NewClientOptions().SetClientID("Disconnect").AddBroker(FVTTCP)
110+
c := NewClient(ops)
111+
112+
if token := c.Connect(); token.Wait() && token.Error() != nil {
113+
t.Fatalf("Error on Client.Connect(): %v", token.Error())
114+
}
115+
116+
// Attempt to disconnect twice simultaneously and ensure this does not block
117+
disconnectC := make(chan struct{}, 1)
118+
go func() {
119+
c.Disconnect(250)
120+
cli := c.(*client)
121+
cli.status = connected
122+
c.Disconnect(250)
123+
close(disconnectC)
124+
}()
125+
126+
select {
127+
case <-time.After(time.Millisecond * 300):
128+
t.Errorf("disconnect did not finnish within 300ms")
129+
case <-disconnectC:
130+
}
131+
}
132+
93133
func Test_Publish_1(t *testing.T) {
94134
ops := NewClientOptions()
95135
ops.AddBroker(FVTTCP)

unit_client_test.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,15 @@ import (
1818
"log"
1919
"net/http"
2020
_ "net/http/pprof"
21-
"os"
2221
"testing"
2322
)
2423

2524
func init() {
26-
DEBUG = log.New(os.Stderr, "DEBUG ", log.Ltime)
27-
WARN = log.New(os.Stderr, "WARNING ", log.Ltime)
28-
CRITICAL = log.New(os.Stderr, "CRITICAL ", log.Ltime)
29-
ERROR = log.New(os.Stderr, "ERROR ", log.Ltime)
25+
// Logging is off by default as this makes things simpler when you just want to confirm that tests pass
26+
// DEBUG = log.New(os.Stderr, "DEBUG ", log.Ltime)
27+
// WARN = log.New(os.Stderr, "WARNING ", log.Ltime)
28+
// CRITICAL = log.New(os.Stderr, "CRITICAL ", log.Ltime)
29+
// ERROR = log.New(os.Stderr, "ERROR ", log.Ltime)
3030

3131
go func() {
3232
log.Println(http.ListenAndServe("localhost:6060", nil))

unit_messageids_test.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ package mqtt
1616

1717
import (
1818
"fmt"
19-
"log"
2019
"testing"
2120
)
2221

@@ -63,7 +62,7 @@ func Test_noFreeID(t *testing.T) {
6362
mids := &messageIds{index: make(map[uint16]tokenCompletor)}
6463

6564
for i := midMin; i != 0; i++ {
66-
log.Println(i)
65+
// Uncomment to see all message IDS log.Println(i)
6766
mids.index[i] = &d
6867
}
6968

0 commit comments

Comments
 (0)