Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
fix crash on selecting queue.
// Random numbers are generated by a Source. Top-level functions, such as
// Float64 and Int, use a default shared Source that produces a deterministic
// sequence of values each time a program is run. Use the Seed function to
// initialize the default Source if different behavior is required for each run.
// The default Source is safe for concurrent use by multiple goroutines, but
// Sources created by NewSource are not.
//
// Mathematical interval notation such as [0, n) is used throughout the
// documentation for this package.
//
// For random numbers suitable for security-sensitive work, see the crypto/rand
// package.
package rand

The default Source is safe for concurrent use by multiple goroutines, but Sources created by NewSource are not.
如上所述,使用NewSource 之后的rand不是线程安全的。多协程发送消息时,不加锁的情况下同时select queue会导致panic。
  • Loading branch information
NAND86 authored Apr 8, 2021
commit 3ae17c3ccba4af0045d87d90ad1a07ec13a37460
5 changes: 4 additions & 1 deletion producer/selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func (manualQueueSelector) Select(message *primitive.Message, queues []*primitiv

// randomQueueSelector choose a random queue each time.
type randomQueueSelector struct {
mux sync.Mutex
rander *rand.Rand
}

Expand All @@ -53,8 +54,10 @@ func NewRandomQueueSelector() QueueSelector {
return s
}

func (r randomQueueSelector) Select(message *primitive.Message, queues []*primitive.MessageQueue) *primitive.MessageQueue {
func (r *randomQueueSelector) Select(message *primitive.Message, queues []*primitive.MessageQueue) *primitive.MessageQueue {
r.mux.Lock()
i := r.rander.Intn(len(queues))
r.mux.Unlock()
return queues[i]
}

Expand Down