Skip to content

Commit 08638a1

Browse files
committed
[ISSUE #568] Update lastPullTime use atomic.Value as same with lastConsumeTime and lastLockTime
1 parent 6cd3181 commit 08638a1

File tree

2 files changed

+16
-5
lines changed

2 files changed

+16
-5
lines changed

consumer/process_queue.go

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ type processQueue struct {
5050
consumeLock sync.Mutex
5151
consumingMsgOrderlyTreeMap *treemap.Map
5252
dropped *uatomic.Bool
53-
lastPullTime time.Time
53+
lastPullTime atomic.Value
5454
lastConsumeTime atomic.Value
5555
locked *uatomic.Bool
5656
lastLockTime atomic.Value
@@ -69,9 +69,12 @@ func newProcessQueue(order bool) *processQueue {
6969
lastLockTime := atomic.Value{}
7070
lastLockTime.Store(time.Now())
7171

72+
lastPullTime := atomic.Value{}
73+
lastPullTime.Store(time.Now())
74+
7275
pq := &processQueue{
7376
msgCache: treemap.NewWith(utils.Int64Comparator),
74-
lastPullTime: time.Now(),
77+
lastPullTime: lastPullTime,
7578
lastConsumeTime: lastConsumeTime,
7679
lastLockTime: lastLockTime,
7780
msgCh: make(chan []*primitive.MessageExt, 32),
@@ -153,6 +156,14 @@ func (pq *processQueue) LastLockTime() time.Time {
153156
return pq.lastLockTime.Load().(time.Time)
154157
}
155158

159+
func (pq *processQueue) LastPullTime() time.Time {
160+
return pq.lastPullTime.Load().(time.Time)
161+
}
162+
163+
func (pq *processQueue) UpdateLastPullTime() {
164+
pq.lastPullTime.Store(time.Now())
165+
}
166+
156167
func (pq *processQueue) makeMessageToCosumeAgain(messages ...*primitive.MessageExt) {
157168
pq.mutex.Lock()
158169
for _, msg := range messages {
@@ -195,7 +206,7 @@ func (pq *processQueue) isLockExpired() bool {
195206
}
196207

197208
func (pq *processQueue) isPullExpired() bool {
198-
return time.Now().Sub(pq.lastPullTime) > _PullMaxIdleTime
209+
return time.Now().Sub(pq.LastPullTime()) > _PullMaxIdleTime
199210
}
200211

201212
func (pq *processQueue) cleanExpiredMsg(consumer defaultConsumer) {
@@ -356,7 +367,7 @@ func (pq *processQueue) currentInfo() internal.ProcessQueueInfo {
356367
TryUnlockTimes: pq.tryUnlockTimes,
357368
LastLockTimestamp: pq.LastLockTime().UnixNano() / int64(time.Millisecond),
358369
Dropped: pq.dropped.Load(),
359-
LastPullTimestamp: pq.lastPullTime.UnixNano() / int64(time.Millisecond),
370+
LastPullTimestamp: pq.LastPullTime().UnixNano() / int64(time.Millisecond),
360371
LastConsumeTimestamp: pq.LastConsumeTime().UnixNano() / int64(time.Millisecond),
361372
}
362373

consumer/push_consumer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -498,7 +498,7 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
498498
}
499499
// reset time
500500
sleepTime = pc.option.PullInterval
501-
pq.lastPullTime = time.Now()
501+
pq.lastPullTime.Store(time.Now())
502502
err := pc.makeSureStateOK()
503503
if err != nil {
504504
rlog.Warning("consumer state error", map[string]interface{}{

0 commit comments

Comments
 (0)