Skip to content

Commit 4de354a

Browse files
authored
add lock to protect data race (apache#1088)
1 parent 7eedaf9 commit 4de354a

File tree

2 files changed

+14
-0
lines changed

2 files changed

+14
-0
lines changed

consumer/push_consumer.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -502,7 +502,9 @@ func (pc *pushConsumer) messageQueueChanged(topic string, mqAll, mqDivided []*pr
502502
rlog.LogKeyValueChangedFrom: data.SubVersion,
503503
rlog.LogKeyValueChangedTo: newVersion,
504504
})
505+
data.Lock()
505506
data.SubVersion = newVersion
507+
data.Unlock()
506508

507509
// TODO: optimize
508510
count := 0

internal/model.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"sort"
2626
"strconv"
2727
"strings"
28+
"sync"
2829

2930
"github.com/apache/rocketmq-client-go/v2/internal/utils"
3031
"github.com/apache/rocketmq-client-go/v2/primitive"
@@ -60,9 +61,20 @@ type SubscriptionData struct {
6061
Codes utils.Set `json:"codeSet"`
6162
SubVersion int64 `json:"subVersion"`
6263
ExpType string `json:"expressionType"`
64+
mux sync.RWMutex
65+
}
66+
67+
func (sd *SubscriptionData) Lock() {
68+
sd.mux.Lock()
69+
}
70+
71+
func (sd *SubscriptionData) Unlock() {
72+
sd.mux.Unlock()
6373
}
6474

6575
func (sd *SubscriptionData) Clone() *SubscriptionData {
76+
sd.mux.RLock()
77+
defer sd.mux.RUnlock()
6678
cloned := &SubscriptionData{
6779
ClassFilterMode: sd.ClassFilterMode,
6880
Topic: sd.Topic,

0 commit comments

Comments
 (0)