Skip to content

Commit 0621310

Browse files
committedOct 11, 2019
client: persist subscribe packets if client is connecting or reconnecting
The MQTT client has all of the logic in place to handle persisting and resuming subscribe packets when the client is connecting or reconnecting, but it's currently unused. This change persists outbound SubscribePackets similar to Client.Publish() and adds an fvt test case to validate the ResumeSubs behavior. fixes eclipse-paho#358 Signed-off-by: Robert Weber <robertweber95@gmail.com>
1 parent 2e9e43b commit 0621310

File tree

5 files changed

+141
-11
lines changed

5 files changed

+141
-11
lines changed
 

‎client.go

+52-3
Original file line numberDiff line numberDiff line change
@@ -668,7 +668,6 @@ func (c *client) Subscribe(topic string, qos byte, callback MessageHandler) Toke
668668
}
669669
sub.Topics = append(sub.Topics, topic)
670670
sub.Qoss = append(sub.Qoss, qos)
671-
DEBUG.Println(CLI, sub.String())
672671

673672
if strings.HasPrefix(topic, "$share/") {
674673
topic = strings.Join(strings.Split(topic, "/")[2:], "/")
@@ -683,7 +682,31 @@ func (c *client) Subscribe(topic string, qos byte, callback MessageHandler) Toke
683682
}
684683

685684
token.subs = append(token.subs, topic)
686-
c.oboundP <- &PacketAndToken{p: sub, t: token}
685+
686+
if sub.MessageID == 0 {
687+
sub.MessageID = c.getID(token)
688+
token.messageID = sub.MessageID
689+
}
690+
DEBUG.Println(CLI, sub.String())
691+
692+
persistOutbound(c.persist, sub)
693+
switch c.connectionStatus() {
694+
case connecting:
695+
DEBUG.Println(CLI, "storing subscribe message (connecting), topic:", topic)
696+
case reconnecting:
697+
DEBUG.Println(CLI, "storing subscribe message (reconnecting), topic:", topic)
698+
default:
699+
DEBUG.Println(CLI, "sending subscribe message, topic:", topic)
700+
subscribeWaitTimeout := c.options.WriteTimeout
701+
if subscribeWaitTimeout == 0 {
702+
subscribeWaitTimeout = time.Second * 30
703+
}
704+
select {
705+
case c.oboundP <- &PacketAndToken{p: sub, t: token}:
706+
case <-time.After(subscribeWaitTimeout):
707+
token.setError(errors.New("subscribe was broken by timeout"))
708+
}
709+
}
687710
DEBUG.Println(CLI, "exit Subscribe")
688711
return token
689712
}
@@ -711,7 +734,29 @@ func (c *client) SubscribeMultiple(filters map[string]byte, callback MessageHand
711734
}
712735
token.subs = make([]string, len(sub.Topics))
713736
copy(token.subs, sub.Topics)
714-
c.oboundP <- &PacketAndToken{p: sub, t: token}
737+
738+
if sub.MessageID == 0 {
739+
sub.MessageID = c.getID(token)
740+
token.messageID = sub.MessageID
741+
}
742+
persistOutbound(c.persist, sub)
743+
switch c.connectionStatus() {
744+
case connecting:
745+
DEBUG.Println(CLI, "storing subscribe message (connecting), topics:", sub.Topics)
746+
case reconnecting:
747+
DEBUG.Println(CLI, "storing subscribe message (reconnecting), topics:", sub.Topics)
748+
default:
749+
DEBUG.Println(CLI, "sending subscribe message, topics:", sub.Topics)
750+
subscribeWaitTimeout := c.options.WriteTimeout
751+
if subscribeWaitTimeout == 0 {
752+
subscribeWaitTimeout = time.Second * 30
753+
}
754+
select {
755+
case c.oboundP <- &PacketAndToken{p: sub, t: token}:
756+
case <-time.After(subscribeWaitTimeout):
757+
token.setError(errors.New("subscribe was broken by timeout"))
758+
}
759+
}
715760
DEBUG.Println(CLI, "exit SubscribeMultiple")
716761
return token
717762
}
@@ -754,7 +799,11 @@ func (c *client) resume(subscription bool) {
754799
case *packets.SubscribePacket:
755800
if subscription {
756801
DEBUG.Println(STR, fmt.Sprintf("loaded pending subscribe (%d)", details.MessageID))
802+
subPacket := packet.(*packets.SubscribePacket)
757803
token := newToken(packets.Subscribe).(*SubscribeToken)
804+
token.messageID = details.MessageID
805+
token.subs = append(token.subs, subPacket.Topics...)
806+
c.claimID(token, details.MessageID)
758807
select {
759808
case c.oboundP <- &PacketAndToken{p: packet, t: token}:
760809
case <-c.stop:

‎fvt_client_test.go

+87-1
Original file line numberDiff line numberDiff line change
@@ -1181,7 +1181,7 @@ func Test_ConnectRetryPublish(t *testing.T) {
11811181

11821182
// disconnect and then reconnect with correct server
11831183
p.Disconnect(250)
1184-
1184+
11851185
pops = NewClientOptions().AddBroker(FVTTCP).SetClientID("crp-pub").SetCleanSession(false).
11861186
SetStore(memStore2).SetConnectRetry(true).SetConnectRetryInterval(time.Second / 2)
11871187
p = NewClient(pops).(*client)
@@ -1198,3 +1198,89 @@ func Test_ConnectRetryPublish(t *testing.T) {
11981198
s.Disconnect(250)
11991199
memStore.Close()
12001200
}
1201+
1202+
func Test_ResumeSubs(t *testing.T) {
1203+
topic := "/test/ResumeSubs"
1204+
var qos byte = 1
1205+
payload := "sample Payload"
1206+
choke := make(chan bool)
1207+
1208+
// subscribe to topic before establishing a connection, and publish a message after the publish client has connected successfully
1209+
subMemStore := NewMemoryStore()
1210+
subMemStore.Open()
1211+
sops := NewClientOptions().AddBroker("256.256.256.256").SetClientID("resumesubs-sub").SetConnectRetry(true).
1212+
SetConnectRetryInterval(time.Second / 2).SetResumeSubs(true).SetStore(subMemStore)
1213+
1214+
s := NewClient(sops)
1215+
sConnToken := s.Connect()
1216+
1217+
subToken := s.Subscribe(topic, qos, nil)
1218+
1219+
// Verify the subscribe packet exists in the memorystore
1220+
ids := subMemStore.All()
1221+
if len(ids) == 0 {
1222+
t.Fatalf("Expected subscribe packet to be in store")
1223+
} else if len(ids) != 1 {
1224+
t.Fatalf("Expected 1 packet to be in store")
1225+
}
1226+
packet := subMemStore.Get(ids[0])
1227+
if packet == nil {
1228+
t.Fatal("Failed to retrieve packet from store")
1229+
}
1230+
sp, ok := packet.(*packets.SubscribePacket)
1231+
if !ok {
1232+
t.Fatalf("Packet in store not of the expected type (%T)", packet)
1233+
}
1234+
if len(sp.Topics) != 1 || sp.Topics[0] != topic || len(sp.Qoss) != 1 || sp.Qoss[0] != qos {
1235+
t.Fatalf("Stored Subscribe Packet contents not as expected (%v, %v)", sp.Topics, sp.Qoss)
1236+
}
1237+
1238+
time.Sleep(time.Second) // Wait a second to ensure we are past SetConnectRetryInterval
1239+
if sConnToken.Error() != nil {
1240+
t.Fatalf("Connect returned error (should be retrying) (%v)", sConnToken.Error())
1241+
}
1242+
if subToken.Error() != nil {
1243+
t.Fatalf("Subscribe returned error (should be persisted) (%v)", sConnToken.Error())
1244+
}
1245+
1246+
// test that the stored subscribe packet gets sent to the broker after connecting
1247+
subMemStore2 := NewMemoryStore()
1248+
subMemStore2.Open()
1249+
subMemStore2.Put(ids[0], packet)
1250+
1251+
s.Disconnect(250)
1252+
1253+
// Connect to broker and test that subscription was resumed
1254+
sops = NewClientOptions().AddBroker(FVTTCP).SetClientID("resumesubs-sub").
1255+
SetStore(subMemStore2).SetResumeSubs(true).SetCleanSession(false).SetConnectRetry(true).
1256+
SetConnectRetryInterval(time.Second / 2)
1257+
1258+
var f MessageHandler = func(client Client, msg Message) {
1259+
if msg.Topic() != topic || string(msg.Payload()) != payload {
1260+
t.Fatalf("Received unexpected message: %v, %v", msg.Topic(), msg.Payload())
1261+
}
1262+
choke <- true
1263+
}
1264+
sops.SetDefaultPublishHandler(f)
1265+
s = NewClient(sops).(*client)
1266+
if sConnToken = s.Connect(); sConnToken.Wait() && sConnToken.Error() != nil {
1267+
t.Fatalf("Error on valid subscribe Connect(): %v", sConnToken.Error())
1268+
}
1269+
1270+
// publish message to subscribed topic to verify subscription
1271+
pops := NewClientOptions().AddBroker(FVTTCP).SetClientID("resumesubs-pub").SetCleanSession(true).
1272+
SetConnectRetry(true).SetConnectRetryInterval(time.Second / 2)
1273+
p := NewClient(pops).(*client)
1274+
if pConnToken := p.Connect(); pConnToken.Wait() && pConnToken.Error() != nil {
1275+
t.Fatalf("Error on valid Publish.Connect(): %v", pConnToken.Error())
1276+
}
1277+
1278+
if pubToken := p.Publish(topic, 1, false, payload); pubToken.Wait() && pubToken.Error() != nil {
1279+
t.Fatalf("Error on valid Client.Publish(): %v", pubToken.Error())
1280+
}
1281+
1282+
wait(choke)
1283+
1284+
s.Disconnect(250)
1285+
p.Disconnect(250)
1286+
}

‎net.go

-6
Original file line numberDiff line numberDiff line change
@@ -178,12 +178,6 @@ func outgoing(c *client) {
178178
}
179179
DEBUG.Println(NET, "obound wrote msg, id:", msg.MessageID)
180180
case msg := <-c.oboundP:
181-
switch msg.p.(type) {
182-
case *packets.SubscribePacket:
183-
msg.p.(*packets.SubscribePacket).MessageID = c.getID(msg.t)
184-
case *packets.UnsubscribePacket:
185-
msg.p.(*packets.UnsubscribePacket).MessageID = c.getID(msg.t)
186-
}
187181
DEBUG.Println(NET, "obound priority msg to write, type", reflect.TypeOf(msg.p))
188182
if err := msg.p.Write(c.conn); err != nil {
189183
ERROR.Println(NET, "outgoing stopped with error", err)

‎token.go

+1
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,7 @@ type SubscribeToken struct {
160160
baseToken
161161
subs []string
162162
subResult map[string]byte
163+
messageID uint16
163164
}
164165

165166
// Result returns a map of topics that were subscribed to along with

‎unit_client_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -104,4 +104,4 @@ func Test_isConnectionOpenNegative(t *testing.T) {
104104
if c.IsConnectionOpen() {
105105
t.Fail()
106106
}
107-
}
107+
}

0 commit comments

Comments
 (0)
Please sign in to comment.