Skip to content

Commit d6e66a2

Browse files
authored
[ISSUE #568] Update lastPullTime use atomic.Value as same with lastConsumeTime and lastLockTime (#613)
1 parent 05929da commit d6e66a2

File tree

2 files changed

+16
-5
lines changed

2 files changed

+16
-5
lines changed

consumer/process_queue.go

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ type processQueue struct {
5050
consumeLock sync.Mutex
5151
consumingMsgOrderlyTreeMap *treemap.Map
5252
dropped *uatomic.Bool
53-
lastPullTime time.Time
53+
lastPullTime atomic.Value
5454
lastConsumeTime atomic.Value
5555
locked *uatomic.Bool
5656
lastLockTime atomic.Value
@@ -69,9 +69,12 @@ func newProcessQueue(order bool) *processQueue {
6969
lastLockTime := atomic.Value{}
7070
lastLockTime.Store(time.Now())
7171

72+
lastPullTime := atomic.Value{}
73+
lastPullTime.Store(time.Now())
74+
7275
pq := &processQueue{
7376
msgCache: treemap.NewWith(utils.Int64Comparator),
74-
lastPullTime: time.Now(),
77+
lastPullTime: lastPullTime,
7578
lastConsumeTime: lastConsumeTime,
7679
lastLockTime: lastLockTime,
7780
msgCh: make(chan []*primitive.MessageExt, 32),
@@ -157,6 +160,14 @@ func (pq *processQueue) LastLockTime() time.Time {
157160
return pq.lastLockTime.Load().(time.Time)
158161
}
159162

163+
func (pq *processQueue) LastPullTime() time.Time {
164+
return pq.lastPullTime.Load().(time.Time)
165+
}
166+
167+
func (pq *processQueue) UpdateLastPullTime() {
168+
pq.lastPullTime.Store(time.Now())
169+
}
170+
160171
func (pq *processQueue) makeMessageToCosumeAgain(messages ...*primitive.MessageExt) {
161172
pq.mutex.Lock()
162173
for _, msg := range messages {
@@ -199,7 +210,7 @@ func (pq *processQueue) isLockExpired() bool {
199210
}
200211

201212
func (pq *processQueue) isPullExpired() bool {
202-
return time.Now().Sub(pq.lastPullTime) > _PullMaxIdleTime
213+
return time.Now().Sub(pq.LastPullTime()) > _PullMaxIdleTime
203214
}
204215

205216
func (pq *processQueue) cleanExpiredMsg(consumer defaultConsumer) {
@@ -360,7 +371,7 @@ func (pq *processQueue) currentInfo() internal.ProcessQueueInfo {
360371
TryUnlockTimes: pq.tryUnlockTimes,
361372
LastLockTimestamp: pq.LastLockTime().UnixNano() / int64(time.Millisecond),
362373
Dropped: pq.dropped.Load(),
363-
LastPullTimestamp: pq.lastPullTime.UnixNano() / int64(time.Millisecond),
374+
LastPullTimestamp: pq.LastPullTime().UnixNano() / int64(time.Millisecond),
364375
LastConsumeTimestamp: pq.LastConsumeTime().UnixNano() / int64(time.Millisecond),
365376
}
366377

consumer/push_consumer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -562,7 +562,7 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
562562
}
563563
// reset time
564564
sleepTime = pc.option.PullInterval
565-
pq.lastPullTime = time.Now()
565+
pq.lastPullTime.Store(time.Now())
566566
err := pc.makeSureStateOK()
567567
if err != nil {
568568
rlog.Warning("consumer state error", map[string]interface{}{

0 commit comments

Comments
 (0)