Skip to content

Commit 48fdc9b

Browse files
committed
Let the user manage publish acks
Move the ack logic away from the main net loop so the ack is only sent when calling Ack on the actual message. Signed-off-by: Xabier Eizmendi <xeizmendi@gmail.com>
1 parent 9ec68b7 commit 48fdc9b

File tree

3 files changed

+42
-21
lines changed

3 files changed

+42
-21
lines changed

message.go

+10-1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package mqtt
1616

1717
import (
1818
"github.com/eclipse/paho.mqtt.golang/packets"
19+
"sync"
1920
)
2021

2122
// Message defines the externals that a message implementation must support
@@ -28,6 +29,7 @@ type Message interface {
2829
Topic() string
2930
MessageID() uint16
3031
Payload() []byte
32+
Ack()
3133
}
3234

3335
type message struct {
@@ -37,6 +39,8 @@ type message struct {
3739
topic string
3840
messageID uint16
3941
payload []byte
42+
once sync.Once
43+
ack func()
4044
}
4145

4246
func (m *message) Duplicate() bool {
@@ -63,14 +67,19 @@ func (m *message) Payload() []byte {
6367
return m.payload
6468
}
6569

66-
func messageFromPublish(p *packets.PublishPacket) Message {
70+
func (m *message) Ack() {
71+
m.once.Do(m.ack)
72+
}
73+
74+
func messageFromPublish(p *packets.PublishPacket, ack func()) Message {
6775
return &message{
6876
duplicate: p.Dup,
6977
qos: p.Qos,
7078
retained: p.Retain,
7179
topic: p.TopicName,
7280
messageID: p.MessageID,
7381
payload: p.Payload,
82+
ack: ack,
7483
}
7584
}
7685

net.go

+28-17
Original file line numberDiff line numberDiff line change
@@ -259,26 +259,9 @@ func alllogic(c *client) {
259259
case 2:
260260
c.incomingPubChan <- m
261261
DEBUG.Println(NET, "done putting msg on incomingPubChan")
262-
pr := packets.NewControlPacket(packets.Pubrec).(*packets.PubrecPacket)
263-
pr.MessageID = m.MessageID
264-
DEBUG.Println(NET, "putting pubrec msg on obound")
265-
select {
266-
case c.oboundP <- &PacketAndToken{p: pr, t: nil}:
267-
case <-c.stop:
268-
}
269-
DEBUG.Println(NET, "done putting pubrec msg on obound")
270262
case 1:
271263
c.incomingPubChan <- m
272264
DEBUG.Println(NET, "done putting msg on incomingPubChan")
273-
pa := packets.NewControlPacket(packets.Puback).(*packets.PubackPacket)
274-
pa.MessageID = m.MessageID
275-
DEBUG.Println(NET, "putting puback msg on obound")
276-
persistOutbound(c.persist, pa)
277-
select {
278-
case c.oboundP <- &PacketAndToken{p: pa, t: nil}:
279-
case <-c.stop:
280-
}
281-
DEBUG.Println(NET, "done putting puback msg on obound")
282265
case 0:
283266
select {
284267
case c.incomingPubChan <- m:
@@ -321,6 +304,34 @@ func alllogic(c *client) {
321304
}
322305
}
323306

307+
func (c *client) ackFunc(packet *packets.PublishPacket) func() {
308+
return func() {
309+
switch packet.Qos {
310+
case 2:
311+
pr := packets.NewControlPacket(packets.Pubrec).(*packets.PubrecPacket)
312+
pr.MessageID = packet.MessageID
313+
DEBUG.Println(NET, "putting pubrec msg on obound")
314+
select {
315+
case c.oboundP <- &PacketAndToken{p: pr, t: nil}:
316+
case <-c.stop:
317+
}
318+
DEBUG.Println(NET, "done putting pubrec msg on obound")
319+
case 1:
320+
pa := packets.NewControlPacket(packets.Puback).(*packets.PubackPacket)
321+
pa.MessageID = packet.MessageID
322+
DEBUG.Println(NET, "putting puback msg on obound")
323+
persistOutbound(c.persist, pa)
324+
select {
325+
case c.oboundP <- &PacketAndToken{p: pa, t: nil}:
326+
case <-c.stop:
327+
}
328+
DEBUG.Println(NET, "done putting puback msg on obound")
329+
case 0:
330+
// do nothing, since there is no need to send an ack packet back
331+
}
332+
}
333+
}
334+
324335
func errorWatch(c *client) {
325336
defer c.workers.Done()
326337
select {

router.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -146,13 +146,14 @@ func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order
146146
case message := <-messages:
147147
sent := false
148148
r.RLock()
149+
m := messageFromPublish(message, client.ackFunc(message))
149150
handlers := []MessageHandler{}
150151
for e := r.routes.Front(); e != nil; e = e.Next() {
151152
if e.Value.(*route).match(message.TopicName) {
152153
if order {
153154
handlers = append(handlers, e.Value.(*route).callback)
154155
} else {
155-
go e.Value.(*route).callback(client, messageFromPublish(message))
156+
go e.Value.(*route).callback(client, m)
156157
}
157158
sent = true
158159
}
@@ -161,12 +162,12 @@ func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order
161162
if order {
162163
handlers = append(handlers, r.defaultHandler)
163164
} else {
164-
go r.defaultHandler(client, messageFromPublish(message))
165+
go r.defaultHandler(client, m)
165166
}
166167
}
167168
r.RUnlock()
168169
for _, handler := range handlers {
169-
handler(client, messageFromPublish(message))
170+
handler(client, m)
170171
}
171172
case <-r.stop:
172173
return

0 commit comments

Comments
 (0)