Skip to content

Commit ae76839

Browse files
NickShannonDing
andauthored
feat: support lite pull consumer (#881)
* feat: support lite pull consumer * remove the real ip. * go.mod require go 1.13,so remove old versions. * 1.no need to persist offset,rocketmq client do it when start;2.add pprof. * Update .travis.yml Co-authored-by: dinglei <libya_003@163.com>
1 parent ffd7241 commit ae76839

File tree

17 files changed

+1293
-292
lines changed

17 files changed

+1293
-292
lines changed

.travis.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
language: go
22

33
go:
4-
- "1.11.x"
5-
- "1.12.x"
64
- "1.13.x"
5+
- "1.16.x"
6+
- "1.18.x"
77
go_import_path: github.com/apache/rocketmq-client-go/v2
88
env:
99
global:

admin/admin.go

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525

2626
"github.com/apache/rocketmq-client-go/v2/internal"
2727
"github.com/apache/rocketmq-client-go/v2/internal/remote"
28+
"github.com/apache/rocketmq-client-go/v2/internal/utils"
2829
"github.com/apache/rocketmq-client-go/v2/primitive"
2930
"github.com/apache/rocketmq-client-go/v2/rlog"
3031
)
@@ -35,6 +36,7 @@ type Admin interface {
3536
//TODO
3637
//TopicList(ctx context.Context, mq *primitive.MessageQueue) (*remote.RemotingCommand, error)
3738
//GetBrokerClusterInfo(ctx context.Context) (*remote.RemotingCommand, error)
39+
FetchPublishMessageQueues(ctx context.Context, topic string) ([]*primitive.MessageQueue, error)
3840
Close() error
3941
}
4042

@@ -61,6 +63,19 @@ func WithResolver(resolver primitive.NsResolver) AdminOption {
6163
}
6264
}
6365

66+
func WithCredentials(c primitive.Credentials) AdminOption {
67+
return func(options *adminOptions) {
68+
options.ClientOptions.Credentials = c
69+
}
70+
}
71+
72+
// WithNamespace set the namespace of admin
73+
func WithNamespace(namespace string) AdminOption {
74+
return func(options *adminOptions) {
75+
options.ClientOptions.Namespace = namespace
76+
}
77+
}
78+
6479
type admin struct {
6580
cli internal.RMQClient
6681

@@ -70,7 +85,7 @@ type admin struct {
7085
}
7186

7287
// NewAdmin initialize admin
73-
func NewAdmin(opts ...AdminOption) (Admin, error) {
88+
func NewAdmin(opts ...AdminOption) (*admin, error) {
7489
defaultOpts := defaultAdminOptions()
7590
for _, opt := range opts {
7691
opt(defaultOpts)
@@ -202,6 +217,10 @@ func (a *admin) DeleteTopic(ctx context.Context, opts ...OptionDelete) error {
202217
return nil
203218
}
204219

220+
func (a *admin) FetchPublishMessageQueues(ctx context.Context, topic string) ([]*primitive.MessageQueue, error) {
221+
return a.cli.GetNameSrv().FetchPublishMessageQueues(utils.WrapNamespace(a.opts.Namespace, topic))
222+
}
223+
205224
func (a *admin) Close() error {
206225
a.closeOnce.Do(func() {
207226
a.cli.Shutdown()

api.go

Lines changed: 24 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import (
2222
"time"
2323

2424
"github.com/apache/rocketmq-client-go/v2/consumer"
25-
"github.com/apache/rocketmq-client-go/v2/errors"
2625
"github.com/apache/rocketmq-client-go/v2/internal"
2726
"github.com/apache/rocketmq-client-go/v2/primitive"
2827
"github.com/apache/rocketmq-client-go/v2/producer"
@@ -81,67 +80,37 @@ type PullConsumer interface {
8180
// Start the PullConsumer for consuming message
8281
Start() error
8382

84-
// Shutdown the PullConsumer, all offset of MessageQueue will be commit to broker before process exit
85-
Shutdown() error
86-
8783
// Subscribe a topic for consuming
8884
Subscribe(topic string, selector consumer.MessageSelector) error
8985

9086
// Unsubscribe a topic
9187
Unsubscribe(topic string) error
9288

93-
// MessageQueues get MessageQueue list about for a given topic. This method will issue a remote call to the server
94-
// if it does not already have any MessageQueue about the given topic.
95-
MessageQueues(topic string) []primitive.MessageQueue
96-
97-
// Pull message for the topic specified. It is an error to not have subscribed to any topics before pull for message
98-
//
99-
// Specified numbers of messages is returned if message greater that numbers, and the offset will auto forward.
100-
// It means that if you meeting messages consuming failed, you should process failed messages by yourself.
101-
Pull(ctx context.Context, topic string, numbers int) (*primitive.PullResult, error)
102-
103-
// Pull message for the topic specified from a specified MessageQueue and offset. It is an error to not have
104-
// subscribed to any topics before pull for message. the method will not affect the offset recorded
105-
//
106-
// Specified numbers of messages is returned.
107-
PullFrom(ctx context.Context, mq primitive.MessageQueue, offset int64, numbers int) (*primitive.PullResult, error)
108-
109-
// Lookup offset for the given message queue by timestamp. The returned offset for the message queue is the
110-
// earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding message
111-
// queue.
112-
//
113-
// Timestamp must be millisecond level, if you want to lookup the earliest offset of the mq, you could set the
114-
// timestamp 0, and if you want to the latest offset the mq, you could set the timestamp math.MaxInt64.
115-
Lookup(ctx context.Context, mq primitive.MessageQueue, timestamp int64) (int64, error)
116-
117-
// Commit the offset of specified mqs to broker, if auto-commit is disable, you must commit the offset manually.
118-
Commit(ctx context.Context, mqs ...primitive.MessageQueue) (int64, error)
119-
120-
// CommittedOffset return the offset of specified Message
121-
CommittedOffset(mq primitive.MessageQueue) (int64, error)
122-
123-
// Seek set offset of the mq, if you wanna re-consuming your message form one position, the method may help you.
124-
// if you want re-consuming from one time, you cloud Lookup() then seek it.
125-
Seek(mq primitive.MessageQueue, offset int64) error
126-
127-
// Pause consuming for specified MessageQueues, after pause, client will not fetch any message from the specified
128-
// message queues
129-
//
130-
// Note that this method does not affect message queue subscription. In particular, it does not cause a group
131-
// rebalance.
132-
//
133-
// if a MessageQueue belong a topic that has not been subscribed, an error will be returned
134-
//Pause(mqs ...primitive.MessageQueue) error
135-
136-
// Resume specified message queues which have been paused with Pause, if a MessageQueue that not paused,
137-
// it will be ignored. if not subscribed, an error will be returned
138-
//Resume(mqs ...primitive.MessageQueue) error
89+
// Shutdown the PullConsumer, all offset of MessageQueue will be commit to broker before process exit
90+
Shutdown() error
91+
92+
// Poll messages with timeout.
93+
Poll(ctx context.Context, timeout time.Duration) (*consumer.ConsumeRequest, error)
94+
95+
//ACK ACK
96+
ACK(ctx context.Context, cr *consumer.ConsumeRequest, consumeResult consumer.ConsumeResult)
97+
98+
// Pull message of topic, selector indicate which queue to pull.
99+
Pull(ctx context.Context, numbers int) (*primitive.PullResult, error)
100+
101+
// PullFrom pull messages of queue from the offset to offset + numbers
102+
PullFrom(ctx context.Context, queue *primitive.MessageQueue, offset int64, numbers int) (*primitive.PullResult, error)
103+
104+
// UpdateOffset updateOffset update offset of queue in mem
105+
UpdateOffset(queue *primitive.MessageQueue, offset int64) error
106+
107+
// PersistOffset persist all offset in mem.
108+
PersistOffset(ctx context.Context, topic string) error
109+
110+
// CurrentOffset return the current offset of queue in mem.
111+
CurrentOffset(queue *primitive.MessageQueue) (int64, error)
139112
}
140113

141-
// The PullConsumer has not implemented completely, if you want have an experience of PullConsumer, you could use
142-
// consumer.NewPullConsumer(...), but it may changed in the future.
143-
//
144-
// The PullConsumer will be supported in next release
145114
func NewPullConsumer(opts ...consumer.Option) (PullConsumer, error) {
146-
return nil, errors.ErrPullConsumer
115+
return consumer.NewPullConsumer(opts...)
147116
}

consumer/consumer.go

Lines changed: 37 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,7 @@ type defaultConsumer struct {
269269
}
270270

271271
func (dc *defaultConsumer) start() error {
272+
dc.consumerGroup = utils.WrapNamespace(dc.option.Namespace, dc.consumerGroup)
272273
if dc.model == Clustering {
273274
// set retry topic
274275
retryTopic := internal.GetRetryTopic(dc.consumerGroup)
@@ -694,53 +695,51 @@ func (dc *defaultConsumer) updateProcessQueueTable(topic string, mqs []*primitiv
694695
return true
695696
})
696697

697-
if dc.cType == _PushConsume {
698-
for item := range mqSet {
699-
// BUG: the mq will send to channel, if not copy once, the next iter will modify the mq in the channel.
700-
mq := item
698+
for item := range mqSet {
699+
// BUG: the mq will send to channel, if not copy once, the next iter will modify the mq in the channel.
700+
mq := item
701+
_, exist := dc.processQueueTable.Load(mq)
702+
if exist {
703+
continue
704+
}
705+
if dc.consumeOrderly && !dc.lock(&mq) {
706+
rlog.Warning("do defaultConsumer, add a new mq failed, because lock failed", map[string]interface{}{
707+
rlog.LogKeyConsumerGroup: dc.consumerGroup,
708+
rlog.LogKeyMessageQueue: mq.String(),
709+
})
710+
continue
711+
}
712+
dc.storage.remove(&mq)
713+
nextOffset, err := dc.computePullFromWhereWithException(&mq)
714+
715+
if nextOffset >= 0 && err == nil {
701716
_, exist := dc.processQueueTable.Load(mq)
702717
if exist {
703-
continue
704-
}
705-
if dc.consumeOrderly && !dc.lock(&mq) {
706-
rlog.Warning("do defaultConsumer, add a new mq failed, because lock failed", map[string]interface{}{
718+
rlog.Debug("updateProcessQueueTable do defaultConsumer, mq already exist", map[string]interface{}{
707719
rlog.LogKeyConsumerGroup: dc.consumerGroup,
708720
rlog.LogKeyMessageQueue: mq.String(),
709721
})
710-
continue
711-
}
712-
dc.storage.remove(&mq)
713-
nextOffset, err := dc.computePullFromWhereWithException(&mq)
714-
715-
if nextOffset >= 0 && err == nil {
716-
_, exist := dc.processQueueTable.Load(mq)
717-
if exist {
718-
rlog.Debug("do defaultConsumer, mq already exist", map[string]interface{}{
719-
rlog.LogKeyConsumerGroup: dc.consumerGroup,
720-
rlog.LogKeyMessageQueue: mq.String(),
721-
})
722-
} else {
723-
rlog.Debug("do defaultConsumer, add a new mq", map[string]interface{}{
724-
rlog.LogKeyConsumerGroup: dc.consumerGroup,
725-
rlog.LogKeyMessageQueue: mq.String(),
726-
})
727-
pq := newProcessQueue(dc.consumeOrderly)
728-
dc.processQueueTable.Store(mq, pq)
729-
pr := PullRequest{
730-
consumerGroup: dc.consumerGroup,
731-
mq: &mq,
732-
pq: pq,
733-
nextOffset: nextOffset,
734-
}
735-
dc.prCh <- pr
736-
changed = true
737-
}
738722
} else {
739-
rlog.Warning("do defaultConsumer, add a new mq failed", map[string]interface{}{
723+
rlog.Debug("updateProcessQueueTable do defaultConsumer, add a new mq", map[string]interface{}{
740724
rlog.LogKeyConsumerGroup: dc.consumerGroup,
741725
rlog.LogKeyMessageQueue: mq.String(),
742726
})
727+
pq := newProcessQueue(dc.consumeOrderly)
728+
dc.processQueueTable.Store(mq, pq)
729+
pr := PullRequest{
730+
consumerGroup: dc.consumerGroup,
731+
mq: &mq,
732+
pq: pq,
733+
nextOffset: nextOffset,
734+
}
735+
dc.prCh <- pr
736+
changed = true
743737
}
738+
} else {
739+
rlog.Warning("do defaultConsumer, add a new mq failed", map[string]interface{}{
740+
rlog.LogKeyConsumerGroup: dc.consumerGroup,
741+
rlog.LogKeyMessageQueue: mq.String(),
742+
})
744743
}
745744
}
746745

@@ -760,9 +759,6 @@ func (dc *defaultConsumer) computePullFromWhere(mq *primitive.MessageQueue) int6
760759
}
761760

762761
func (dc *defaultConsumer) computePullFromWhereWithException(mq *primitive.MessageQueue) (int64, error) {
763-
if dc.cType == _PullConsume {
764-
return 0, nil
765-
}
766762
result := int64(-1)
767763
lastOffset, err := dc.storage.readWithException(mq, _ReadFromStore)
768764
if err != nil {
@@ -898,6 +894,7 @@ func (dc *defaultConsumer) processPullResult(mq *primitive.MessageQueue, result
898894

899895
// TODO: add filter message hook
900896
for _, msg := range msgListFilterAgain {
897+
msg.Queue = mq
901898
traFlag, _ := strconv.ParseBool(msg.GetProperty(primitive.PropertyTransactionPrepared))
902899
if traFlag {
903900
msg.TransactionId = msg.GetProperty(primitive.PropertyUniqueClientMessageIdKeyIndex)

consumer/option.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,8 @@ func defaultPullConsumerOptions() consumerOptions {
134134
opts := consumerOptions{
135135
ClientOptions: internal.DefaultClientOptions(),
136136
Resolver: primitive.NewHttpResolver("DEFAULT"),
137+
ConsumerModel: Clustering,
138+
Strategy: AllocateByAveragely,
137139
}
138140
opts.ClientOptions.GroupName = "DEFAULT_CONSUMER"
139141
return opts

0 commit comments

Comments
 (0)