Skip to content

Commit 5c775c2

Browse files
committed
Adding AutoAck option in ClientOptions
Signed-off-by: shivam <shivamkm07@gmail.com>
1 parent 7b1c0eb commit 5c775c2

File tree

2 files changed

+18
-3
lines changed

2 files changed

+18
-3
lines changed

options.go

+9
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ type ClientOptions struct {
104104
MaxResumePubInFlight int // // 0 = no limit; otherwise this is the maximum simultaneous messages sent while resuming
105105
Dialer *net.Dialer
106106
CustomOpenConnectionFn OpenConnectionFunc
107+
AutoAck bool
107108
}
108109

109110
// NewClientOptions will create a new ClientClientOptions type with some
@@ -147,6 +148,7 @@ func NewClientOptions() *ClientOptions {
147148
WebsocketOptions: &WebsocketOptions{},
148149
Dialer: &net.Dialer{Timeout: 30 * time.Second},
149150
CustomOpenConnectionFn: nil,
151+
AutoAck: true,
150152
}
151153
return o
152154
}
@@ -446,3 +448,10 @@ func (o *ClientOptions) SetCustomOpenConnectionFn(customOpenConnectionFn OpenCon
446448
}
447449
return o
448450
}
451+
452+
// SetAutoAck enables or disables the Automated Acking of Messages received by the handler.
453+
// By default it is set to true. Setting it to false will disable the auto-ack globally.
454+
func (o *ClientOptions) SetAutoAck(autoAck bool) *ClientOptions {
455+
o.AutoAck = autoAck
456+
return o
457+
}

router.go

+9-3
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,9 @@ func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order
186186
wg.Add(1)
187187
go func() {
188188
hd(client, m)
189-
m.Ack()
189+
if client.options.AutoAck {
190+
m.Ack()
191+
}
190192
wg.Done()
191193
}()
192194
}
@@ -201,7 +203,9 @@ func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order
201203
wg.Add(1)
202204
go func() {
203205
r.defaultHandler(client, m)
204-
m.Ack()
206+
if client.options.AutoAck {
207+
m.Ack()
208+
}
205209
wg.Done()
206210
}()
207211
}
@@ -212,7 +216,9 @@ func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order
212216
r.RUnlock()
213217
for _, handler := range handlers {
214218
handler(client, m)
215-
m.Ack()
219+
if client.options.AutoAck {
220+
m.Ack()
221+
}
216222
}
217223
// DEBUG.Println(ROU, "matchAndDispatch handled message")
218224
}

0 commit comments

Comments
 (0)