-
Notifications
You must be signed in to change notification settings - Fork 438
Closed
Description
BUG REPORT
-
Please describe the issue you observed:
- What did you do (The steps to reproduce)?
Client consumes message for broker. Sometime, - What did you expect to see?
consumer keep consuming messages. - What did you see instead?
all consumer goroutines block onconsumer/process_queue.go:201 pq.mutex.Lock(), whenupdateProcessQueueTableis called and meetpq.isPullExpired() && dc.cType == _PushConsume, the channel——pq.closeChanwill be closed, but the receiver doesn't releasepq.mutex.
- What did you do (The steps to reproduce)?
please look the deadlock of the following pprof log.
2. Please tell us about your environment:
- What is your OS?
- What is your client version?
- What is your RocketMQ version?
- Other information (e.g. detailed explanation, logs, related issues, suggestions on how to fix, etc):
pprof:
goroutine profile: total 73
20 @ 0x1045aeafc 0x1045aeb88 0x1045bf7dc 0x1045d9fa8 0x1045e6a50 0x1045e66d4 0x1045e8e7c 0x1049905f4 0x10499c5c4 0x104937f1c 0x1045de424
# 0x1045d9fa7 sync.runtime_SemacquireMutex+0x27 /Users/wero/.g/versions/1.18.3/src/runtime/sema.go:71
# 0x1045e6a4f sync.(*Mutex).lockSlow+0x34f /Users/wero/.g/versions/1.18.3/src/sync/mutex.go:162
# 0x1045e66d3 sync.(*Mutex).Lock+0xa3 /Users/wero/.g/versions/1.18.3/src/sync/mutex.go:81
# 0x1045e8e7b sync.(*RWMutex).Lock+0x2b /Users/wero/.g/versions/1.18.3/src/sync/rwmutex.go:139
# 0x1049905f3 github.com/apache/rocketmq-client-go/v2/consumer.(*processQueue).removeMessage+0x53 /Users/wero/workspace/go/rocketmq-client-go/consumer/process_queue.go:201
# 0x10499c5c3 github.com/apache/rocketmq-client-go/v2/consumer.(*pushConsumer).consumeMessageCurrently.func1+0xba3 /Users/wero/workspace/go/rocketmq-client-go/consumer/push_consumer.go:1101
# 0x104937f1b github.com/apache/rocketmq-client-go/v2/primitive.WithRecover+0x4b /Users/wero/workspace/go/rocketmq-client-go/primitive/base.go:104
5 @ 0x1045aeafc 0x1045becfc 0x10499f920 0x104937f1c 0x1045de424
# 0x10499f91f github.com/apache/rocketmq-client-go/v2/consumer.(*statsItemSet).init.func4+0x15f /Users/wero/workspace/go/rocketmq-client-go/consumer/statistics.go:218
# 0x104937f1b github.com/apache/rocketmq-client-go/v2/primitive.WithRecover+0x4b /Users/wero/workspace/go/rocketmq-client-go/primitive/base.go:104
5 @ 0x1045aeafc 0x1045becfc 0x10499fb64 0x104937f1c 0x1045de424
# 0x10499fb63 github.com/apache/rocketmq-client-go/v2/consumer.(*statsItemSet).init.func3+0x103 /Users/wero/workspace/go/rocketmq-client-go/consumer/statistics.go:204
# 0x104937f1b github.com/apache/rocketmq-client-go/v2/primitive.WithRecover+0x4b /Users/wero/workspace/go/rocketmq-client-go/primitive/base.go:104
5 @ 0x1045aeafc 0x1045becfc 0x10499fda4 0x104937f1c 0x1045de424
# 0x10499fda3 github.com/apache/rocketmq-client-go/v2/consumer.(*statsItemSet).init.func2+0x103 /Users/wero/workspace/go/rocketmq-client-go/consumer/statistics.go:191
# 0x104937f1b github.com/apache/rocketmq-client-go/v2/primitive.WithRecover+0x4b /Users/wero/workspace/go/rocketmq-client-go/primitive/base.go:104
5 @ 0x1045aeafc 0x1045becfc 0x10499ffe4 0x104937f1c 0x1045de424
# 0x10499ffe3 github.com/apache/rocketmq-client-go/v2/consumer.(*statsItemSet).init.func1+0x103 /Users/wero/workspace/go/rocketmq-client-go/consumer/statistics.go:178
# 0x104937f1b github.com/apache/rocketmq-client-go/v2/primitive.WithRecover+0x4b /Users/wero/workspace/go/rocketmq-client-go/primitive/base.go:104
5 @ 0x1045aeafc 0x1045daee8 0x10499f308 0x104937f1c 0x1045de424
# 0x1045daee7 time.Sleep+0x117 /Users/wero/.g/versions/1.18.3/src/runtime/time.go:194
# 0x10499f307 github.com/apache/rocketmq-client-go/v2/consumer.(*statsItemSet).init.func6+0x87 /Users/wero/workspace/go/rocketmq-client-go/consumer/statistics.go:242
# 0x104937f1b github.com/apache/rocketmq-client-go/v2/primitive.WithRecover+0x4b /Users/wero/workspace/go/rocketmq-client-go/primitive/base.go:104
5 @ 0x1045aeafc 0x1045daee8 0x10499f5a8 0x104937f1c 0x1045de424
# 0x1045daee7 time.Sleep+0x117 /Users/wero/.g/versions/1.18.3/src/runtime/time.go:194
# 0x10499f5a7 github.com/apache/rocketmq-client-go/v2/consumer.(*statsItemSet).init.func5+0x87 /Users/wero/workspace/go/rocketmq-client-go/consumer/statistics.go:228
# 0x104937f1b github.com/apache/rocketmq-client-go/v2/primitive.WithRecover+0x4b /Users/wero/workspace/go/rocketmq-client-go/primitive/base.go:104
3 @ 0x1045aeafc 0x10457a93c 0x10457a728 0x10499b810 0x104999af0 0x104937f1c 0x1045de424
# 0x10499b80f github.com/apache/rocketmq-client-go/v2/consumer.(*pushConsumer).consumeMessageCurrently+0x1ff /Users/wero/workspace/go/rocketmq-client-go/consumer/push_consumer.go:1029
# 0x104999aef github.com/apache/rocketmq-client-go/v2/consumer.(*pushConsumer).pullMessage.func1+0x15f /Users/wero/workspace/go/rocketmq-client-go/consumer/push_consumer.go:580
# 0x104937f1b github.com/apache/rocketmq-client-go/v2/primitive.WithRecover+0x4b /Users/wero/workspace/go/rocketmq-client-go/primitive/base.go:104
3 @ 0x1045aeafc 0x1045becfc 0x10498fae0 0x104999130 0x104993e64 0x1045de424
# 0x10498fadf github.com/apache/rocketmq-client-go/v2/consumer.(*processQueue).putMessage+0x13f /Users/wero/workspace/go/rocketmq-client-go/consumer/process_queue.go:105
# 0x10499912f github.com/apache/rocketmq-client-go/v2/consumer.(*pushConsumer).pullMessage+0x23ef /Users/wero/workspace/go/rocketmq-client-go/consumer/push_consumer.go:813
# 0x104993e63 github.com/apache/rocketmq-client-go/v2/consumer.(*pushConsumer).Start.func1.1.1+0x33 /Users/wero/workspace/go/rocketmq-client-go/consumer/push_consumer.go:164
2 @ 0x1045aeafc 0x1045a829c 0x1045d8b38 0x10464b0f8 0x10464b188 0x10464c39c 0x10472ffe4 0x1047417e0 0x104645684 0x104645830 0x104943518 0x1049431b4 0x104937f1c 0x1045de424
# 0x1045d8b37 internal/poll.runtime_pollWait+0x47 /Users/wero/.g/versions/1.18.3/src/runtime/netpoll.go:302
# 0x10464b0f7 internal/poll.(*pollDesc).wait+0x87 /Users/wero/.g/versions/1.18.3/src/internal/poll/fd_poll_runtime.go:83
# 0x10464b187 internal/poll.(*pollDesc).waitRead+0x37 /Users/wero/.g/versions/1.18.3/src/internal/poll/fd_poll_runtime.go:88
# 0x10464c39b internal/poll.(*FD).Read+0x33b /Users/wero/.g/versions/1.18.3/src/internal/poll/fd_unix.go:167
# 0x10472ffe3 net.(*netFD).Read+0x53 /Users/wero/.g/versions/1.18.3/src/net/fd_posix.go:55
# 0x1047417df net.(*conn).Read+0x6f /Users/wero/.g/versions/1.18.3/src/net/net.go:183
# 0x104645683 io.ReadAtLeast+0x133 /Users/wero/.g/versions/1.18.3/src/io/io.go:331
# 0x10464582f io.ReadFull+0x5f /Users/wero/.g/versions/1.18.3/src/io/io.go:350
# 0x104943517 github.com/apache/rocketmq-client-go/v2/internal/remote.(*remotingClient).receiveResponse+0x2d7 /Users/wero/workspace/go/rocketmq-client-go/internal/remote/remote_client.go:195
# 0x1049431b3 github.com/apache/rocketmq-client-go/v2/internal/remote.(*remotingClient).connect.func1+0x33 /Users/wero/workspace/go/rocketmq-client-go/internal/remote/remote_client.go:165
# 0x104937f1b github.com/apache/rocketmq-client-go/v2/primitive.WithRecover+0x4b /Users/wero/workspace/go/rocketmq-client-go/primitive/base.go:104
2 @ 0x1045aeafc 0x1045aeb88 0x1045bf7dc 0x1045d9fa8 0x1045e8c48 0x104991e40 0x10499570c 0x1045e6174 0x104995008 0x104962950 0x10495b92c 0x104943d28 0x104937f1c 0x1045de424
# 0x1045d9fa7 sync.runtime_SemacquireMutex+0x27 /Users/wero/.g/versions/1.18.3/src/runtime/sema.go:71
# 0x1045e8c47 sync.(*RWMutex).RLock+0x97 /Users/wero/.g/versions/1.18.3/src/sync/rwmutex.go:63
# 0x104991e3f github.com/apache/rocketmq-client-go/v2/consumer.(*processQueue).currentInfo+0x4f /Users/wero/workspace/go/rocketmq-client-go/consumer/process_queue.go:400
# 0x10499570b github.com/apache/rocketmq-client-go/v2/consumer.(*pushConsumer).GetConsumerRunningInfo.func2+0x10b /Users/wero/workspace/go/rocketmq-client-go/consumer/push_consumer.go:414
# 0x1045e6173 sync.(*Map).Range+0x2a3 /Users/wero/.g/versions/1.18.3/src/sync/map.go:347
# 0x104995007 github.com/apache/rocketmq-client-go/v2/consumer.(*pushConsumer).GetConsumerRunningInfo+0xe7 /Users/wero/workspace/go/rocketmq-client-go/consumer/push_consumer.go:411
# 0x10496294f github.com/apache/rocketmq-client-go/v2/internal.(*rmqClient).getConsumerRunningInfo+0xcf /Users/wero/workspace/go/rocketmq-client-go/internal/client.go:911
# 0x10495b92b github.com/apache/rocketmq-client-go/v2/internal.GetOrNewRocketMQClient.func3+0x18b /Users/wero/workspace/go/rocketmq-client-go/internal/client.go:277
# 0x104943d27 github.com/apache/rocketmq-client-go/v2/internal/remote.(*remotingClient).processCMD.func2+0x87 /Users/wero/workspace/go/rocketmq-client-go/internal/remote/remote_client.go:244
# 0x104937f1b github.com/apache/rocketmq-client-go/v2/primitive.WithRecover+0x4b /Users/wero/workspace/go/rocketmq-client-go/primitive/base.go:104
1 @ 0x1045aeafc 0x10457b308 0x10457b064 0x1049c6aac 0x1045ae714 0x1045de424
# 0x1049c6aab main.main+0x33b /Users/wero/workspace/go/rocketmq-client-go/examples/consumer/simple/main.go:59
# 0x1045ae713 runtime.main+0x223 /Users/wero/.g/versions/1.18.3/src/runtime/proc.go:250
1 @ 0x1045aeafc 0x1045a829c 0x1045d8b38 0x10464b0f8 0x10464b188 0x10464c39c 0x10472ffe4 0x1047417e0 0x1048faf14 0x1045de424
# 0x1045d8b37 internal/poll.runtime_pollWait+0x47 /Users/wero/.g/versions/1.18.3/src/runtime/netpoll.go:302
# 0x10464b0f7 internal/poll.(*pollDesc).wait+0x87 /Users/wero/.g/versions/1.18.3/src/internal/poll/fd_poll_runtime.go:83
# 0x10464b187 internal/poll.(*pollDesc).waitRead+0x37 /Users/wero/.g/versions/1.18.3/src/internal/poll/fd_poll_runtime.go:88
# 0x10464c39b internal/poll.(*FD).Read+0x33b /Users/wero/.g/versions/1.18.3/src/internal/poll/fd_unix.go:167
# 0x10472ffe3 net.(*netFD).Read+0x53 /Users/wero/.g/versions/1.18.3/src/net/fd_posix.go:55
# 0x1047417df net.(*conn).Read+0x6f /Users/wero/.g/versions/1.18.3/src/net/net.go:183
# 0x1048faf13 net/http.(*connReader).backgroundRead+0x73 /Users/wero/.g/versions/1.18.3/src/net/http/server.go:672
1 @ 0x1045aeafc 0x1045a829c 0x1045d8b38 0x10464b0f8 0x10464b188 0x10464c39c 0x10472ffe4 0x1047417e0 0x1048fb6dc 0x104779234 0x10477aa04 0x10477aa98 0x10487fa2c 0x10487f92c 0x1048f55a0 0x1048fcab4 0x104902c80 0x1045de424
# 0x1045d8b37 internal/poll.runtime_pollWait+0x47 /Users/wero/.g/versions/1.18.3/src/runtime/netpoll.go:302
# 0x10464b0f7 internal/poll.(*pollDesc).wait+0x87 /Users/wero/.g/versions/1.18.3/src/internal/poll/fd_poll_runtime.go:83
# 0x10464b187 internal/poll.(*pollDesc).waitRead+0x37 /Users/wero/.g/versions/1.18.3/src/internal/poll/fd_poll_runtime.go:88
# 0x10464c39b internal/poll.(*FD).Read+0x33b /Users/wero/.g/versions/1.18.3/src/internal/poll/fd_unix.go:167
# 0x10472ffe3 net.(*netFD).Read+0x53 /Users/wero/.g/versions/1.18.3/src/net/fd_posix.go:55
# 0x1047417df net.(*conn).Read+0x6f /Users/wero/.g/versions/1.18.3/src/net/net.go:183
# 0x1048fb6db net/http.(*connReader).Read+0x1fb /Users/wero/.g/versions/1.18.3/src/net/http/server.go:780
# 0x104779233 bufio.(*Reader).fill+0x233 /Users/wero/.g/versions/1.18.3/src/bufio/bufio.go:106
# 0x10477aa03 bufio.(*Reader).ReadSlice+0x353 /Users/wero/.g/versions/1.18.3/src/bufio/bufio.go:371
# 0x10477aa97 bufio.(*Reader).ReadLine+0x47 /Users/wero/.g/versions/1.18.3/src/bufio/bufio.go:400
# 0x10487fa2b net/textproto.(*Reader).readLineSlice+0x6b /Users/wero/.g/versions/1.18.3/src/net/textproto/reader.go:57
# 0x10487f92b net/textproto.(*Reader).ReadLine+0x3b /Users/wero/.g/versions/1.18.3/src/net/textproto/reader.go:38
# 0x1048f559f net/http.readRequest+0x5f /Users/wero/.g/versions/1.18.3/src/net/http/request.go:1029
# 0x1048fcab3 net/http.(*conn).readRequest+0x393 /Users/wero/.g/versions/1.18.3/src/net/http/server.go:988
# 0x104902c7f net/http.(*conn).serve+0xa6f /Users/wero/.g/versions/1.18.3/src/net/http/server.go:1891
1 @ 0x1045aeafc 0x1045a829c 0x1045d8b38 0x10464b0f8 0x10464b188 0x10464cf6c 0x10473157c 0x10474b644 0x104749ef0 0x10490831c 0x104907e44 0x1049090ec 0x1045de424
# 0x1045d8b37 internal/poll.runtime_pollWait+0x47 /Users/wero/.g/versions/1.18.3/src/runtime/netpoll.go:302
# 0x10464b0f7 internal/poll.(*pollDesc).wait+0x87 /Users/wero/.g/versions/1.18.3/src/internal/poll/fd_poll_runtime.go:83
# 0x10464b187 internal/poll.(*pollDesc).waitRead+0x37 /Users/wero/.g/versions/1.18.3/src/internal/poll/fd_poll_runtime.go:88
# 0x10464cf6b internal/poll.(*FD).Accept+0x32b /Users/wero/.g/versions/1.18.3/src/internal/poll/fd_unix.go:614
# 0x10473157b net.(*netFD).accept+0x4b /Users/wero/.g/versions/1.18.3/src/net/fd_unix.go:172
# 0x10474b643 net.(*TCPListener).accept+0x43 /Users/wero/.g/versions/1.18.3/src/net/tcpsock_posix.go:139
# 0x104749eef net.(*TCPListener).Accept+0x4f /Users/wero/.g/versions/1.18.3/src/net/tcpsock.go:288
# 0x10490831b net/http.(*Server).Serve+0x3eb /Users/wero/.g/versions/1.18.3/src/net/http/server.go:3039
# 0x104907e43 net/http.(*Server).ListenAndServe+0x123 /Users/wero/.g/versions/1.18.3/src/net/http/server.go:2968
# 0x1049090eb net/http.ListenAndServe+0xfb /Users/wero/.g/versions/1.18.3/src/net/http/server.go:3222
1 @ 0x1045aeafc 0x1045becfc 0x104958b24 0x1045de424
# 0x104958b23 github.com/patrickmn/go-cache.(*janitor).Run+0xa3 /Users/wero/go/pkg/mod/github.com/patrickmn/go-cache@v2.1.0+incompatible/cache.go:1079
1 @ 0x1045aeafc 0x1045becfc 0x10495e3b4 0x104937f1c 0x1045de424
# 0x10495e3b3 github.com/apache/rocketmq-client-go/v2/internal.(*rmqClient).Start.func1.5+0x103 /Users/wero/workspace/go/rocketmq-client-go/internal/client.go:498
# 0x104937f1b github.com/apache/rocketmq-client-go/v2/primitive.WithRecover+0x4b /Users/wero/workspace/go/rocketmq-client-go/primitive/base.go:104
1 @ 0x1045aeafc 0x1045becfc 0x10495e6dc 0x104937f1c 0x1045de424
# 0x10495e6db github.com/apache/rocketmq-client-go/v2/internal.(*rmqClient).Start.func1.4+0x14b /Users/wero/workspace/go/rocketmq-client-go/internal/client.go:482
# 0x104937f1b github.com/apache/rocketmq-client-go/v2/primitive.WithRecover+0x4b /Users/wero/workspace/go/rocketmq-client-go/primitive/base.go:104
1 @ 0x1045aeafc 0x1045becfc 0x10495ea68 0x104937f1c 0x1045de424
# 0x10495ea67 github.com/apache/rocketmq-client-go/v2/internal.(*rmqClient).Start.func1.3+0x147 /Users/wero/workspace/go/rocketmq-client-go/internal/client.go:450
# 0x104937f1b github.com/apache/rocketmq-client-go/v2/primitive.WithRecover+0x4b /Users/wero/workspace/go/rocketmq-client-go/primitive/base.go:104
1 @ 0x1045aeafc 0x1045becfc 0x10495edf8 0x104937f1c 0x1045de424
# 0x10495edf7 github.com/apache/rocketmq-client-go/v2/internal.(*rmqClient).Start.func1.2+0x147 /Users/wero/workspace/go/rocketmq-client-go/internal/client.go:426
# 0x104937f1b github.com/apache/rocketmq-client-go/v2/primitive.WithRecover+0x4b /Users/wero/workspace/go/rocketmq-client-go/primitive/base.go:104
1 @ 0x1045aeafc 0x1045becfc 0x10495f16c 0x104937f1c 0x1045de424
# 0x10495f16b github.com/apache/rocketmq-client-go/v2/internal.(*rmqClient).Start.func1.1+0x14b /Users/wero/workspace/go/rocketmq-client-go/internal/client.go:402
# 0x104937f1b github.com/apache/rocketmq-client-go/v2/primitive.WithRecover+0x4b /Users/wero/workspace/go/rocketmq-client-go/primitive/base.go:104
1 @ 0x1045aeafc 0x1045becfc 0x104993c44 0x1045de424
# 0x104993c43 github.com/apache/rocketmq-client-go/v2/consumer.(*pushConsumer).Start.func1.1+0xb3 /Users/wero/workspace/go/rocketmq-client-go/consumer/push_consumer.go:161
1 @ 0x1045aeafc 0x1045daee8 0x104993920 0x104937f1c 0x1045de424
# 0x1045daee7 time.Sleep+0x117 /Users/wero/.g/versions/1.18.3/src/runtime/time.go:194
# 0x10499391f github.com/apache/rocketmq-client-go/v2/consumer.(*pushConsumer).Start.func1.2+0x6f /Users/wero/workspace/go/rocketmq-client-go/consumer/push_consumer.go:179
# 0x104937f1b github.com/apache/rocketmq-client-go/v2/primitive.WithRecover+0x4b /Users/wero/workspace/go/rocketmq-client-go/primitive/base.go:104
1 @ 0x1045d8778 0x10497c704 0x10497c41c 0x104978150 0x1049c425c 0x1049c4d68 0x104904348 0x104906594 0x104907bd4 0x104903720 0x1045de424
# 0x1045d8777 runtime/pprof.runtime_goroutineProfileWithLabels+0x27 /Users/wero/.g/versions/1.18.3/src/runtime/mprof.go:753
# 0x10497c703 runtime/pprof.writeRuntimeProfile+0x113 /Users/wero/.g/versions/1.18.3/src/runtime/pprof/pprof.go:725
# 0x10497c41b runtime/pprof.writeGoroutine+0x8b /Users/wero/.g/versions/1.18.3/src/runtime/pprof/pprof.go:685
# 0x10497814f runtime/pprof.(*Profile).WriteTo+0x7f /Users/wero/.g/versions/1.18.3/src/runtime/pprof/pprof.go:332
# 0x1049c425b net/http/pprof.handler.ServeHTTP+0x38b /Users/wero/.g/versions/1.18.3/src/net/http/pprof/pprof.go:253
# 0x1049c4d67 net/http/pprof.Index+0xb7 /Users/wero/.g/versions/1.18.3/src/net/http/pprof/pprof.go:371
# 0x104904347 net/http.HandlerFunc.ServeHTTP+0x47 /Users/wero/.g/versions/1.18.3/src/net/http/server.go:2084
# 0x104906593 net/http.(*ServeMux).ServeHTTP+0x133 /Users/wero/.g/versions/1.18.3/src/net/http/server.go:2462
# 0x104907bd3 net/http.serverHandler.ServeHTTP+0x423 /Users/wero/.g/versions/1.18.3/src/net/http/server.go:2916
# 0x10490371f net/http.(*conn).serve+0x150f /Users/wero/.g/versions/1.18.3/src/net/http/server.go:1966
there should Unlock
REPRODUCE
Run the consumer, then run the producer, and after a while there will be a deadlock for the following reasons.
producer:
package main
import (
"context"
"fmt"
"os"
"strconv"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
)
// Package main implements a simple producer to send message.
func main() {
p, _ := rocketmq.NewProducer(
producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
producer.WithRetry(2),
)
err := p.Start()
if err != nil {
fmt.Printf("start producer error: %s", err.Error())
os.Exit(1)
}
topic := "test"
for i := 0; i < 100000; i++ {
msg := &primitive.Message{
Topic: topic,
Body: []byte("H" + strconv.Itoa(i)),
}
res, err := p.SendSync(context.Background(), msg)
if err != nil {
fmt.Printf("send message error: %s\n", err)
} else {
fmt.Printf("send message success: result=%s\n", res.String())
}
}
err = p.Shutdown()
if err != nil {
fmt.Printf("shutdown producer error: %s", err.Error())
}
}consumer, If it doesn't reproduce, we can run it a few more times:
package main
import (
"context"
"fmt"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/consumer"
"github.com/apache/rocketmq-client-go/v2/primitive"
"net/http"
_ "net/http/pprof"
"os"
)
func main() {
sig := make(chan os.Signal)
c, _ := rocketmq.NewPushConsumer(
consumer.WithGroupName("testGroup"),
consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
consumer.WithConsumeMessageBatchMaxSize(5),
)
go http.ListenAndServe("0.0.0.0:6060", nil)
err := c.Subscribe("test", consumer.MessageSelector{}, func(ctx context.Context,
msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
for i := range msgs {
fmt.Printf("subscribe callback: %v \n", msgs[i].MsgId)
}
return consumer.ConsumeRetryLater, nil
})
if err != nil {
fmt.Println(err.Error())
}
// Note: start after subscribe
err = c.Start()
if err != nil {
fmt.Println(err.Error())
os.Exit(-1)
}
<-sig
err = c.Shutdown()
if err != nil {
fmt.Printf("shutdown Consumer error: %s", err.Error())
}
}Metadata
Metadata
Assignees
Labels
No labels