Skip to content

Commit d5ef55a

Browse files
authored
Merge pull request eclipse-paho#214 from TheSilentForest/manually_ack_messages
Fix: The acknowledgement for incoming messages are sent even if the associated MessageHandler panics eclipse-paho#16
2 parents b4cdefb + 48fdc9b commit d5ef55a

File tree

3 files changed

+42
-21
lines changed

3 files changed

+42
-21
lines changed

message.go

+10-1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"net/url"
1919

2020
"github.com/eclipse/paho.mqtt.golang/packets"
21+
"sync"
2122
)
2223

2324
// Message defines the externals that a message implementation must support
@@ -30,6 +31,7 @@ type Message interface {
3031
Topic() string
3132
MessageID() uint16
3233
Payload() []byte
34+
Ack()
3335
}
3436

3537
type message struct {
@@ -39,6 +41,8 @@ type message struct {
3941
topic string
4042
messageID uint16
4143
payload []byte
44+
once sync.Once
45+
ack func()
4246
}
4347

4448
func (m *message) Duplicate() bool {
@@ -65,14 +69,19 @@ func (m *message) Payload() []byte {
6569
return m.payload
6670
}
6771

68-
func messageFromPublish(p *packets.PublishPacket) Message {
72+
func (m *message) Ack() {
73+
m.once.Do(m.ack)
74+
}
75+
76+
func messageFromPublish(p *packets.PublishPacket, ack func()) Message {
6977
return &message{
7078
duplicate: p.Dup,
7179
qos: p.Qos,
7280
retained: p.Retain,
7381
topic: p.TopicName,
7482
messageID: p.MessageID,
7583
payload: p.Payload,
84+
ack: ack,
7685
}
7786
}
7887

net.go

+28-17
Original file line numberDiff line numberDiff line change
@@ -266,26 +266,9 @@ func alllogic(c *client) {
266266
case 2:
267267
c.incomingPubChan <- m
268268
DEBUG.Println(NET, "done putting msg on incomingPubChan")
269-
pr := packets.NewControlPacket(packets.Pubrec).(*packets.PubrecPacket)
270-
pr.MessageID = m.MessageID
271-
DEBUG.Println(NET, "putting pubrec msg on obound")
272-
select {
273-
case c.oboundP <- &PacketAndToken{p: pr, t: nil}:
274-
case <-c.stop:
275-
}
276-
DEBUG.Println(NET, "done putting pubrec msg on obound")
277269
case 1:
278270
c.incomingPubChan <- m
279271
DEBUG.Println(NET, "done putting msg on incomingPubChan")
280-
pa := packets.NewControlPacket(packets.Puback).(*packets.PubackPacket)
281-
pa.MessageID = m.MessageID
282-
DEBUG.Println(NET, "putting puback msg on obound")
283-
persistOutbound(c.persist, pa)
284-
select {
285-
case c.oboundP <- &PacketAndToken{p: pa, t: nil}:
286-
case <-c.stop:
287-
}
288-
DEBUG.Println(NET, "done putting puback msg on obound")
289272
case 0:
290273
select {
291274
case c.incomingPubChan <- m:
@@ -328,6 +311,34 @@ func alllogic(c *client) {
328311
}
329312
}
330313

314+
func (c *client) ackFunc(packet *packets.PublishPacket) func() {
315+
return func() {
316+
switch packet.Qos {
317+
case 2:
318+
pr := packets.NewControlPacket(packets.Pubrec).(*packets.PubrecPacket)
319+
pr.MessageID = packet.MessageID
320+
DEBUG.Println(NET, "putting pubrec msg on obound")
321+
select {
322+
case c.oboundP <- &PacketAndToken{p: pr, t: nil}:
323+
case <-c.stop:
324+
}
325+
DEBUG.Println(NET, "done putting pubrec msg on obound")
326+
case 1:
327+
pa := packets.NewControlPacket(packets.Puback).(*packets.PubackPacket)
328+
pa.MessageID = packet.MessageID
329+
DEBUG.Println(NET, "putting puback msg on obound")
330+
persistOutbound(c.persist, pa)
331+
select {
332+
case c.oboundP <- &PacketAndToken{p: pa, t: nil}:
333+
case <-c.stop:
334+
}
335+
DEBUG.Println(NET, "done putting puback msg on obound")
336+
case 0:
337+
// do nothing, since there is no need to send an ack packet back
338+
}
339+
}
340+
}
341+
331342
func errorWatch(c *client) {
332343
defer c.workers.Done()
333344
select {

router.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -146,13 +146,14 @@ func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order
146146
case message := <-messages:
147147
sent := false
148148
r.RLock()
149+
m := messageFromPublish(message, client.ackFunc(message))
149150
handlers := []MessageHandler{}
150151
for e := r.routes.Front(); e != nil; e = e.Next() {
151152
if e.Value.(*route).match(message.TopicName) {
152153
if order {
153154
handlers = append(handlers, e.Value.(*route).callback)
154155
} else {
155-
go e.Value.(*route).callback(client, messageFromPublish(message))
156+
go e.Value.(*route).callback(client, m)
156157
}
157158
sent = true
158159
}
@@ -161,12 +162,12 @@ func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order
161162
if order {
162163
handlers = append(handlers, r.defaultHandler)
163164
} else {
164-
go r.defaultHandler(client, messageFromPublish(message))
165+
go r.defaultHandler(client, m)
165166
}
166167
}
167168
r.RUnlock()
168169
for _, handler := range handlers {
169-
handler(client, messageFromPublish(message))
170+
handler(client, m)
170171
}
171172
case <-r.stop:
172173
return

0 commit comments

Comments
 (0)