Skip to content

Commit a7b2db6

Browse files
authored
send csListLock *sync.Mutex to computeStatsData for lock csList (#820)
1 parent b2bf873 commit a7b2db6

File tree

2 files changed

+21
-9
lines changed

2 files changed

+21
-9
lines changed

consumer/statistics.go

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -134,9 +134,7 @@ func (mgr *StatsManager) getConsumeFailedTPS(group, topic string) statsSnapshot
134134
return mgr.topicAndGroupConsumeFailedTPS.getStatsDataInMinute(topic + "@" + group)
135135
}
136136

137-
var csListLock sync.Mutex
138-
139-
func computeStatsData(csList *list.List) statsSnapshot {
137+
func computeStatsData(csListLock *sync.Mutex, csList *list.List) statsSnapshot {
140138
csListLock.Lock()
141139
defer csListLock.Unlock()
142140
tps, avgpt, sum := 0.0, 0.0, int64(0)
@@ -362,15 +360,15 @@ type statsItem struct {
362360
}
363361

364362
func (si *statsItem) getStatsDataInMinute() statsSnapshot {
365-
return computeStatsData(si.csListMinute)
363+
return computeStatsData(&si.csListMinuteLock, si.csListMinute)
366364
}
367365

368366
func (si *statsItem) getStatsDataInHour() statsSnapshot {
369-
return computeStatsData(si.csListHour)
367+
return computeStatsData(&si.csListHourLock, si.csListHour)
370368
}
371369

372370
func (si *statsItem) getStatsDataInDay() statsSnapshot {
373-
return computeStatsData(si.csListDay)
371+
return computeStatsData(&si.csListDayLock, si.csListDay)
374372
}
375373

376374
func newStatsItem(statsName, statsKey string) *statsItem {
@@ -423,7 +421,7 @@ func (si *statsItem) samplingInHour() {
423421
}
424422

425423
func (si *statsItem) printAtMinutes() {
426-
ss := computeStatsData(si.csListMinute)
424+
ss := computeStatsData(&si.csListMinuteLock, si.csListMinute)
427425
rlog.Info("Stats In One Minute, SUM: %d TPS: AVGPT: %.2f", map[string]interface{}{
428426
"statsName": si.statsName,
429427
"statsKey": si.statsKey,
@@ -434,7 +432,7 @@ func (si *statsItem) printAtMinutes() {
434432
}
435433

436434
func (si *statsItem) printAtHour() {
437-
ss := computeStatsData(si.csListHour)
435+
ss := computeStatsData(&si.csListHourLock, si.csListHour)
438436
rlog.Info("Stats In One Hour, SUM: %d TPS: AVGPT: %.2f", map[string]interface{}{
439437
"statsName": si.statsName,
440438
"statsKey": si.statsKey,
@@ -445,7 +443,7 @@ func (si *statsItem) printAtHour() {
445443
}
446444

447445
func (si *statsItem) printAtDay() {
448-
ss := computeStatsData(si.csListDay)
446+
ss := computeStatsData(&si.csListDayLock, si.csListDay)
449447
rlog.Info("Stats In One Day, SUM: %d TPS: AVGPT: %.2f", map[string]interface{}{
450448
"statsName": si.statsName,
451449
"statsKey": si.statsKey,

consumer/statistics_test.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,3 +212,17 @@ func TestGetConsumeStatus(t *testing.T) {
212212
}
213213
}
214214
}
215+
216+
func TestNewStatsManager(t *testing.T) {
217+
stats := NewStatsManager()
218+
219+
st := time.Now()
220+
for {
221+
stats.increasePullTPS("rocketmq", "default", 1)
222+
time.Sleep(500*time.Millisecond)
223+
if time.Now().Sub(st) > 5*time.Minute {
224+
break
225+
}
226+
}
227+
stats.ShutDownStat()
228+
}

0 commit comments

Comments
 (0)