Skip to content

Commit dd3dbcc

Browse files
authored
[ISSUE apache#1178] feat:pull consumer support Assign subscription type
1.Pull Consumer Support Assign Subscription Type. 2.add several apis for Assign Sub Type: SeekOffset/Assign/OffsetForTimestamp/GetTopicRouteInfo
1 parent 8a0459a commit dd3dbcc

File tree

10 files changed

+384
-129
lines changed

10 files changed

+384
-129
lines changed

api.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,13 +82,18 @@ func NewPushConsumer(opts ...consumer.Option) (PushConsumer, error) {
8282
type PullConsumer interface {
8383
// Start the PullConsumer for consuming message
8484
Start() error
85+
// GetTopicRouteInfo get topic route info
86+
GetTopicRouteInfo(topic string) ([]*primitive.MessageQueue, error)
8587

8688
// Subscribe a topic for consuming
8789
Subscribe(topic string, selector consumer.MessageSelector) error
8890

8991
// Unsubscribe a topic
9092
Unsubscribe(topic string) error
9193

94+
// Assign assign message queue to consumer
95+
Assign(topic string, mqs []*primitive.MessageQueue) error
96+
9297
// Shutdown the PullConsumer, all offset of MessageQueue will be commit to broker before process exit
9398
Shutdown() error
9499

@@ -104,6 +109,12 @@ type PullConsumer interface {
104109
// PullFrom pull messages of queue from the offset to offset + numbers
105110
PullFrom(ctx context.Context, queue *primitive.MessageQueue, offset int64, numbers int) (*primitive.PullResult, error)
106111

112+
// SeekOffset seek offset for specific queue
113+
SeekOffset(queue *primitive.MessageQueue, offset int64)
114+
115+
// OffsetForTimestamp get offset of specific queue with timestamp
116+
OffsetForTimestamp(queue *primitive.MessageQueue, timestamp int64) (int64, error)
117+
107118
// UpdateOffset updateOffset update offset of queue in mem
108119
UpdateOffset(queue *primitive.MessageQueue, offset int64) error
109120

consumer/consumer.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -316,6 +316,14 @@ func (dc *defaultConsumer) shutdown() error {
316316
return nil
317317
}
318318

319+
func (dc *defaultConsumer) isRunning() bool {
320+
return atomic.LoadInt32(&dc.state) == int32(internal.StateRunning)
321+
}
322+
323+
func (dc *defaultConsumer) isStopped() bool {
324+
return atomic.LoadInt32(&dc.state) == int32(internal.StateShutdown)
325+
}
326+
319327
func (dc *defaultConsumer) persistConsumerOffset() error {
320328
err := dc.makeSureStateOK()
321329
if err != nil {

consumer/mock_offset_store.go

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

consumer/pull_consumer.go

Lines changed: 122 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,16 +64,26 @@ func (cr *ConsumeRequest) GetPQ() *processQueue {
6464
return cr.processQueue
6565
}
6666

67+
type SubscriptionType int
68+
69+
const (
70+
None SubscriptionType = iota
71+
Subscribe
72+
Assign
73+
)
74+
6775
type defaultPullConsumer struct {
6876
*defaultConsumer
6977

7078
topic string
7179
selector MessageSelector
7280
GroupName string
7381
Model MessageModel
82+
SubType SubscriptionType
7483
UnitMode bool
7584
nextQueueSequence int64
7685
allocateQueues []*primitive.MessageQueue
86+
mq2seekOffset sync.Map // key:primitive.MessageQueue,value:seekOffset
7787

7888
done chan struct{}
7989
closeOnce sync.Once
@@ -116,18 +126,40 @@ func NewPullConsumer(options ...Option) (*defaultPullConsumer, error) {
116126
defaultConsumer: dc,
117127
done: make(chan struct{}, 1),
118128
consumeRequestCache: make(chan *ConsumeRequest, 4),
129+
GroupName: dc.option.GroupName,
119130
}
120131
dc.mqChanged = c.messageQueueChanged
121132
c.submitToConsume = c.consumeMessageConcurrently
122133
c.interceptor = primitive.ChainInterceptors(c.option.Interceptors...)
123134
return c, nil
124135
}
125136

137+
func (pc *defaultPullConsumer) GetTopicRouteInfo(topic string) ([]*primitive.MessageQueue, error) {
138+
topicWithNs := utils.WrapNamespace(pc.option.Namespace, topic)
139+
value, exist := pc.defaultConsumer.topicSubscribeInfoTable.Load(topicWithNs)
140+
if exist {
141+
return value.([]*primitive.MessageQueue), nil
142+
}
143+
pc.client.UpdateTopicRouteInfo()
144+
value, exist = pc.defaultConsumer.topicSubscribeInfoTable.Load(topicWithNs)
145+
if !exist {
146+
return nil, errors2.ErrRouteNotFound
147+
}
148+
return value.([]*primitive.MessageQueue), nil
149+
}
150+
126151
func (pc *defaultPullConsumer) Subscribe(topic string, selector MessageSelector) error {
127152
if atomic.LoadInt32(&pc.state) == int32(internal.StateStartFailed) ||
128153
atomic.LoadInt32(&pc.state) == int32(internal.StateShutdown) {
129154
return errors2.ErrStartTopic
130155
}
156+
if pc.SubType == Assign {
157+
return errors2.ErrSubscriptionType
158+
}
159+
160+
if pc.SubType == None {
161+
pc.SubType = Subscribe
162+
}
131163
topic = utils.WrapNamespace(pc.option.Namespace, topic)
132164

133165
data := buildSubscriptionData(topic, selector)
@@ -139,11 +171,53 @@ func (pc *defaultPullConsumer) Subscribe(topic string, selector MessageSelector)
139171
}
140172

141173
func (pc *defaultPullConsumer) Unsubscribe(topic string) error {
174+
if pc.SubType == Assign {
175+
return errors2.ErrSubscriptionType
176+
}
142177
topic = utils.WrapNamespace(pc.option.Namespace, topic)
143178
pc.subscriptionDataTable.Delete(topic)
144179
return nil
145180
}
146181

182+
func (pc *defaultPullConsumer) Assign(topic string, mqs []*primitive.MessageQueue) error {
183+
if pc.SubType == Subscribe {
184+
return errors2.ErrSubscriptionType
185+
}
186+
if pc.SubType == None {
187+
pc.SubType = Assign
188+
}
189+
topic = utils.WrapNamespace(pc.option.Namespace, topic)
190+
data := buildSubscriptionData(topic, MessageSelector{TAG, _SubAll})
191+
pc.topic = topic
192+
pc.subscriptionDataTable.Store(topic, data)
193+
oldQueues := pc.allocateQueues
194+
pc.allocateQueues = mqs
195+
rlog.Info("pull consumer assign new mqs", map[string]interface{}{
196+
"topic": topic,
197+
"group": pc.GroupName,
198+
"oldMqs": oldQueues,
199+
"newMqs": mqs,
200+
})
201+
if pc.isRunning() {
202+
pc.Rebalance()
203+
}
204+
return nil
205+
}
206+
207+
func (pc *defaultPullConsumer) nextPullOffset(mq *primitive.MessageQueue, originOffset int64) int64 {
208+
if pc.SubType != Assign {
209+
return originOffset
210+
}
211+
value, exist := pc.mq2seekOffset.LoadAndDelete(mq)
212+
if !exist {
213+
return originOffset
214+
} else {
215+
nextOffset := value.(int64)
216+
_ = pc.updateOffset(mq, nextOffset)
217+
return nextOffset
218+
}
219+
}
220+
147221
func (pc *defaultPullConsumer) Start() error {
148222
var err error
149223
pc.once.Do(func() {
@@ -546,11 +620,34 @@ func (pc *defaultPullConsumer) GetWhere() string {
546620
}
547621

548622
func (pc *defaultPullConsumer) Rebalance() {
549-
pc.defaultConsumer.doBalance()
623+
switch pc.SubType {
624+
case Assign:
625+
pc.RebalanceViaTopic()
626+
break
627+
case Subscribe:
628+
pc.defaultConsumer.doBalance()
629+
break
630+
}
550631
}
551632

552633
func (pc *defaultPullConsumer) RebalanceIfNotPaused() {
553-
pc.defaultConsumer.doBalanceIfNotPaused()
634+
switch pc.SubType {
635+
case Assign:
636+
pc.RebalanceViaTopic()
637+
break
638+
case Subscribe:
639+
pc.defaultConsumer.doBalanceIfNotPaused()
640+
break
641+
}
642+
}
643+
644+
func (pc *defaultPullConsumer) RebalanceViaTopic() {
645+
changed := pc.defaultConsumer.updateProcessQueueTable(pc.topic, pc.allocateQueues)
646+
if changed {
647+
rlog.Info("PullConsumer rebalance result changed ", map[string]interface{}{
648+
rlog.LogKeyAllocateMessageQueue: pc.allocateQueues,
649+
})
650+
}
554651
}
555652

556653
func (pc *defaultPullConsumer) GetConsumerRunningInfo(stack bool) *internal.ConsumerRunningInfo {
@@ -613,7 +710,23 @@ func (pc *defaultPullConsumer) ResetOffset(topic string, table map[primitive.Mes
613710

614711
}
615712

713+
func (pc *defaultPullConsumer) SeekOffset(mq *primitive.MessageQueue, offset int64) {
714+
pc.mq2seekOffset.Store(mq, offset)
715+
rlog.Info("pull consumer seek offset", map[string]interface{}{
716+
"mq": mq,
717+
"offset": offset,
718+
})
719+
}
720+
721+
func (pc *defaultPullConsumer) OffsetForTimestamp(mq *primitive.MessageQueue, timestamp int64) (int64, error) {
722+
return pc.searchOffsetByTimestamp(mq, timestamp)
723+
}
724+
616725
func (pc *defaultPullConsumer) messageQueueChanged(topic string, mqAll, mqDivided []*primitive.MessageQueue) {
726+
if pc.SubType == Assign {
727+
return
728+
}
729+
617730
var allocateQueues []*primitive.MessageQueue
618731
pc.defaultConsumer.processQueueTable.Range(func(key, value interface{}) bool {
619732
mq := key.(primitive.MessageQueue)
@@ -734,6 +847,8 @@ func (pc *defaultPullConsumer) pullMessage(request *PullRequest) {
734847
sleepTime = _PullDelayTimeWhenError
735848
goto NEXT
736849
}
850+
851+
nextOffset := pc.nextPullOffset(request.mq, request.nextOffset)
737852
beginTime := time.Now()
738853
sd := v.(*internal.SubscriptionData)
739854

@@ -743,7 +858,7 @@ func (pc *defaultPullConsumer) pullMessage(request *PullRequest) {
743858
ConsumerGroup: pc.consumerGroup,
744859
Topic: request.mq.Topic,
745860
QueueId: int32(request.mq.QueueId),
746-
QueueOffset: request.nextOffset,
861+
QueueOffset: nextOffset,
747862
MaxMsgNums: pc.option.PullBatchSize.Load(),
748863
SysFlag: sysFlag,
749864
CommitOffset: 0,
@@ -880,5 +995,9 @@ func (pc *defaultPullConsumer) validate() error {
880995
return fmt.Errorf("consumerGroup can't equal [%s], please specify another one", internal.DefaultConsumerGroup)
881996
}
882997

998+
if pc.SubType == None {
999+
return errors2.ErrBlankSubType
1000+
}
1001+
8831002
return nil
8841003
}

consumer/statistics_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -217,9 +217,9 @@ func TestNewStatsManager(t *testing.T) {
217217
stats := NewStatsManager()
218218

219219
st := time.Now()
220-
for {
220+
for {
221221
stats.increasePullTPS("rocketmq", "default", 1)
222-
time.Sleep(500*time.Millisecond)
222+
time.Sleep(500 * time.Millisecond)
223223
if time.Now().Sub(st) > 5*time.Minute {
224224
break
225225
}

errors/errors.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,14 @@ var (
3434
ErrCreated = errors.New("consumer group has been created")
3535
ErrBrokerNotFound = errors.New("broker can not found")
3636
ErrStartTopic = errors.New("cannot subscribe topic since client either failed to start or has been shutdown.")
37+
ErrSubscriptionType = errors.New("subscribe type is not matched")
38+
ErrBlankSubType = errors.New("subscribe type should not be blank")
3739
ErrResponse = errors.New("response error")
3840
ErrCompressLevel = errors.New("unsupported compress level")
3941
ErrUnknownIP = errors.New("unknown IP address")
4042
ErrService = errors.New("service close is not running, please check")
4143
ErrTopicNotExist = errors.New("topic not exist")
44+
ErrRouteNotFound = errors.New("topic route not found")
4245
ErrNotExisted = errors.New("not existed")
4346
ErrNoNameserver = errors.New("nameServerAddrs can't be empty.")
4447
ErrMultiIP = errors.New("multiple IP addr does not support")

0 commit comments

Comments
 (0)