Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
language: go

go:
- "1.11.x"
- "1.12.x"
- "1.13.x"
go_import_path: github.com/apache/rocketmq-client-go/v2
env:
Expand Down
21 changes: 20 additions & 1 deletion admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/apache/rocketmq-client-go/v2/internal"
"github.com/apache/rocketmq-client-go/v2/internal/remote"
"github.com/apache/rocketmq-client-go/v2/internal/utils"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/rlog"
)
Expand All @@ -35,6 +36,7 @@ type Admin interface {
//TODO
//TopicList(ctx context.Context, mq *primitive.MessageQueue) (*remote.RemotingCommand, error)
//GetBrokerClusterInfo(ctx context.Context) (*remote.RemotingCommand, error)
FetchPublishMessageQueues(ctx context.Context, topic string) ([]*primitive.MessageQueue, error)
Close() error
}

Expand All @@ -61,6 +63,19 @@ func WithResolver(resolver primitive.NsResolver) AdminOption {
}
}

func WithCredentials(c primitive.Credentials) AdminOption {
return func(options *adminOptions) {
options.ClientOptions.Credentials = c
}
}

// WithNamespace set the namespace of admin
func WithNamespace(namespace string) AdminOption {
return func(options *adminOptions) {
options.ClientOptions.Namespace = namespace
}
}

type admin struct {
cli internal.RMQClient

Expand All @@ -70,7 +85,7 @@ type admin struct {
}

// NewAdmin initialize admin
func NewAdmin(opts ...AdminOption) (Admin, error) {
func NewAdmin(opts ...AdminOption) (*admin, error) {
defaultOpts := defaultAdminOptions()
for _, opt := range opts {
opt(defaultOpts)
Expand Down Expand Up @@ -202,6 +217,10 @@ func (a *admin) DeleteTopic(ctx context.Context, opts ...OptionDelete) error {
return nil
}

func (a *admin) FetchPublishMessageQueues(ctx context.Context, topic string) ([]*primitive.MessageQueue, error) {
return a.cli.GetNameSrv().FetchPublishMessageQueues(utils.WrapNamespace(a.opts.Namespace, topic))
}

func (a *admin) Close() error {
a.closeOnce.Do(func() {
a.cli.Shutdown()
Expand Down
79 changes: 24 additions & 55 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"time"

"github.com/apache/rocketmq-client-go/v2/consumer"
"github.com/apache/rocketmq-client-go/v2/errors"
"github.com/apache/rocketmq-client-go/v2/internal"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
Expand Down Expand Up @@ -81,67 +80,37 @@ type PullConsumer interface {
// Start the PullConsumer for consuming message
Start() error

// Shutdown the PullConsumer, all offset of MessageQueue will be commit to broker before process exit
Shutdown() error

// Subscribe a topic for consuming
Subscribe(topic string, selector consumer.MessageSelector) error

// Unsubscribe a topic
Unsubscribe(topic string) error

// MessageQueues get MessageQueue list about for a given topic. This method will issue a remote call to the server
// if it does not already have any MessageQueue about the given topic.
MessageQueues(topic string) []primitive.MessageQueue

// Pull message for the topic specified. It is an error to not have subscribed to any topics before pull for message
//
// Specified numbers of messages is returned if message greater that numbers, and the offset will auto forward.
// It means that if you meeting messages consuming failed, you should process failed messages by yourself.
Pull(ctx context.Context, topic string, numbers int) (*primitive.PullResult, error)

// Pull message for the topic specified from a specified MessageQueue and offset. It is an error to not have
// subscribed to any topics before pull for message. the method will not affect the offset recorded
//
// Specified numbers of messages is returned.
PullFrom(ctx context.Context, mq primitive.MessageQueue, offset int64, numbers int) (*primitive.PullResult, error)

// Lookup offset for the given message queue by timestamp. The returned offset for the message queue is the
// earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding message
// queue.
//
// Timestamp must be millisecond level, if you want to lookup the earliest offset of the mq, you could set the
// timestamp 0, and if you want to the latest offset the mq, you could set the timestamp math.MaxInt64.
Lookup(ctx context.Context, mq primitive.MessageQueue, timestamp int64) (int64, error)

// Commit the offset of specified mqs to broker, if auto-commit is disable, you must commit the offset manually.
Commit(ctx context.Context, mqs ...primitive.MessageQueue) (int64, error)

// CommittedOffset return the offset of specified Message
CommittedOffset(mq primitive.MessageQueue) (int64, error)

// Seek set offset of the mq, if you wanna re-consuming your message form one position, the method may help you.
// if you want re-consuming from one time, you cloud Lookup() then seek it.
Seek(mq primitive.MessageQueue, offset int64) error

// Pause consuming for specified MessageQueues, after pause, client will not fetch any message from the specified
// message queues
//
// Note that this method does not affect message queue subscription. In particular, it does not cause a group
// rebalance.
//
// if a MessageQueue belong a topic that has not been subscribed, an error will be returned
//Pause(mqs ...primitive.MessageQueue) error

// Resume specified message queues which have been paused with Pause, if a MessageQueue that not paused,
// it will be ignored. if not subscribed, an error will be returned
//Resume(mqs ...primitive.MessageQueue) error
// Shutdown the PullConsumer, all offset of MessageQueue will be commit to broker before process exit
Shutdown() error

// Poll messages with timeout.
Poll(ctx context.Context, timeout time.Duration) (*consumer.ConsumeRequest, error)

//ACK ACK
ACK(ctx context.Context, cr *consumer.ConsumeRequest, consumeResult consumer.ConsumeResult)

// Pull message of topic, selector indicate which queue to pull.
Pull(ctx context.Context, numbers int) (*primitive.PullResult, error)

// PullFrom pull messages of queue from the offset to offset + numbers
PullFrom(ctx context.Context, queue *primitive.MessageQueue, offset int64, numbers int) (*primitive.PullResult, error)

// UpdateOffset updateOffset update offset of queue in mem
UpdateOffset(queue *primitive.MessageQueue, offset int64) error

// PersistOffset persist all offset in mem.
PersistOffset(ctx context.Context, topic string) error

// CurrentOffset return the current offset of queue in mem.
CurrentOffset(queue *primitive.MessageQueue) (int64, error)
}

// The PullConsumer has not implemented completely, if you want have an experience of PullConsumer, you could use
// consumer.NewPullConsumer(...), but it may changed in the future.
//
// The PullConsumer will be supported in next release
func NewPullConsumer(opts ...consumer.Option) (PullConsumer, error) {
return nil, errors.ErrPullConsumer
return consumer.NewPullConsumer(opts...)
}
77 changes: 37 additions & 40 deletions consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ type defaultConsumer struct {
}

func (dc *defaultConsumer) start() error {
dc.consumerGroup = utils.WrapNamespace(dc.option.Namespace, dc.consumerGroup)
if dc.model == Clustering {
// set retry topic
retryTopic := internal.GetRetryTopic(dc.consumerGroup)
Expand Down Expand Up @@ -694,53 +695,51 @@ func (dc *defaultConsumer) updateProcessQueueTable(topic string, mqs []*primitiv
return true
})

if dc.cType == _PushConsume {
for item := range mqSet {
// BUG: the mq will send to channel, if not copy once, the next iter will modify the mq in the channel.
mq := item
for item := range mqSet {
// BUG: the mq will send to channel, if not copy once, the next iter will modify the mq in the channel.
mq := item
_, exist := dc.processQueueTable.Load(mq)
if exist {
continue
}
if dc.consumeOrderly && !dc.lock(&mq) {
rlog.Warning("do defaultConsumer, add a new mq failed, because lock failed", map[string]interface{}{
rlog.LogKeyConsumerGroup: dc.consumerGroup,
rlog.LogKeyMessageQueue: mq.String(),
})
continue
}
dc.storage.remove(&mq)
nextOffset, err := dc.computePullFromWhereWithException(&mq)

if nextOffset >= 0 && err == nil {
_, exist := dc.processQueueTable.Load(mq)
if exist {
continue
}
if dc.consumeOrderly && !dc.lock(&mq) {
rlog.Warning("do defaultConsumer, add a new mq failed, because lock failed", map[string]interface{}{
rlog.Debug("updateProcessQueueTable do defaultConsumer, mq already exist", map[string]interface{}{
rlog.LogKeyConsumerGroup: dc.consumerGroup,
rlog.LogKeyMessageQueue: mq.String(),
})
continue
}
dc.storage.remove(&mq)
nextOffset, err := dc.computePullFromWhereWithException(&mq)

if nextOffset >= 0 && err == nil {
_, exist := dc.processQueueTable.Load(mq)
if exist {
rlog.Debug("do defaultConsumer, mq already exist", map[string]interface{}{
rlog.LogKeyConsumerGroup: dc.consumerGroup,
rlog.LogKeyMessageQueue: mq.String(),
})
} else {
rlog.Debug("do defaultConsumer, add a new mq", map[string]interface{}{
rlog.LogKeyConsumerGroup: dc.consumerGroup,
rlog.LogKeyMessageQueue: mq.String(),
})
pq := newProcessQueue(dc.consumeOrderly)
dc.processQueueTable.Store(mq, pq)
pr := PullRequest{
consumerGroup: dc.consumerGroup,
mq: &mq,
pq: pq,
nextOffset: nextOffset,
}
dc.prCh <- pr
changed = true
}
} else {
rlog.Warning("do defaultConsumer, add a new mq failed", map[string]interface{}{
rlog.Debug("updateProcessQueueTable do defaultConsumer, add a new mq", map[string]interface{}{
rlog.LogKeyConsumerGroup: dc.consumerGroup,
rlog.LogKeyMessageQueue: mq.String(),
})
pq := newProcessQueue(dc.consumeOrderly)
dc.processQueueTable.Store(mq, pq)
pr := PullRequest{
consumerGroup: dc.consumerGroup,
mq: &mq,
pq: pq,
nextOffset: nextOffset,
}
dc.prCh <- pr
changed = true
}
} else {
rlog.Warning("do defaultConsumer, add a new mq failed", map[string]interface{}{
rlog.LogKeyConsumerGroup: dc.consumerGroup,
rlog.LogKeyMessageQueue: mq.String(),
})
}
}

Expand All @@ -760,9 +759,6 @@ func (dc *defaultConsumer) computePullFromWhere(mq *primitive.MessageQueue) int6
}

func (dc *defaultConsumer) computePullFromWhereWithException(mq *primitive.MessageQueue) (int64, error) {
if dc.cType == _PullConsume {
return 0, nil
}
result := int64(-1)
lastOffset, err := dc.storage.readWithException(mq, _ReadFromStore)
if err != nil {
Expand Down Expand Up @@ -898,6 +894,7 @@ func (dc *defaultConsumer) processPullResult(mq *primitive.MessageQueue, result

// TODO: add filter message hook
for _, msg := range msgListFilterAgain {
msg.Queue = mq
traFlag, _ := strconv.ParseBool(msg.GetProperty(primitive.PropertyTransactionPrepared))
if traFlag {
msg.TransactionId = msg.GetProperty(primitive.PropertyUniqueClientMessageIdKeyIndex)
Expand Down
2 changes: 2 additions & 0 deletions consumer/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ func defaultPullConsumerOptions() consumerOptions {
opts := consumerOptions{
ClientOptions: internal.DefaultClientOptions(),
Resolver: primitive.NewHttpResolver("DEFAULT"),
ConsumerModel: Clustering,
Strategy: AllocateByAveragely,
}
opts.ClientOptions.GroupName = "DEFAULT_CONSUMER"
return opts
Expand Down
Loading