Skip to content

Commit 8019e59

Browse files
authored
[ISSUE #487] Implement Unsubscribe method for push consumer (#626)
* Implement Unsubscribe method for push consumer
1 parent c97d492 commit 8019e59

File tree

2 files changed

+29
-1
lines changed

2 files changed

+29
-1
lines changed

consumer/push_consumer.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,14 @@ func (pc *pushConsumer) Subscribe(topic string, selector MessageSelector,
227227
return errors.New("cannot subscribe topic since client either failed to start or has been shutdown.")
228228
}
229229

230+
// add retry topic subscription for resubscribe
231+
retryTopic := internal.GetRetryTopic(pc.consumerGroup)
232+
_, exists := pc.subscriptionDataTable.Load(retryTopic)
233+
if !exists {
234+
sub := buildSubscriptionData(retryTopic, MessageSelector{TAG, _SubAll})
235+
pc.subscriptionDataTable.Store(retryTopic, sub)
236+
}
237+
230238
if pc.option.Namespace != "" {
231239
topic = pc.option.Namespace + "%" + topic
232240
}
@@ -241,7 +249,10 @@ func (pc *pushConsumer) Subscribe(topic string, selector MessageSelector,
241249
return nil
242250
}
243251

244-
func (pc *pushConsumer) Unsubscribe(string) error {
252+
func (pc *pushConsumer) Unsubscribe(topic string) error {
253+
pc.subscriptionDataTable.Delete(topic)
254+
retryTopic := internal.GetRetryTopic(pc.consumerGroup)
255+
pc.subscriptionDataTable.Delete(retryTopic)
245256
return nil
246257
}
247258

consumer/push_consumer_test.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,23 @@ func TestStart(t *testing.T) {
5252
return ConsumeSuccess, nil
5353
})
5454

55+
_, exists := c.subscriptionDataTable.Load("TopicTest")
56+
So(exists, ShouldBeTrue)
57+
58+
err = c.Unsubscribe("TopicTest")
59+
So(err, ShouldBeNil)
60+
_, exists = c.subscriptionDataTable.Load("TopicTest")
61+
So(exists, ShouldBeFalse)
62+
63+
err = c.Subscribe("TopicTest", MessageSelector{}, func(ctx context.Context,
64+
msgs ...*primitive.MessageExt) (ConsumeResult, error) {
65+
fmt.Printf("subscribe callback: %v \n", msgs)
66+
return ConsumeSuccess, nil
67+
})
68+
69+
_, exists = c.subscriptionDataTable.Load("TopicTest")
70+
So(exists, ShouldBeTrue)
71+
5572
client.EXPECT().ClientID().Return("127.0.0.1@DEFAULT")
5673
client.EXPECT().Start().Return()
5774
client.EXPECT().RegisterConsumer(gomock.Any(), gomock.Any()).Return(nil)

0 commit comments

Comments
 (0)