File tree 2 files changed +43
-1
lines changed
pkg/controller/priorityqueue
2 files changed +43
-1
lines changed Original file line number Diff line number Diff line change @@ -203,6 +203,9 @@ func (w *priorityqueue[T]) spin() {
203
203
w .lockedLock .Lock ()
204
204
defer w .lockedLock .Unlock ()
205
205
206
+ // manipulating the tree from within Ascend might lead to panics, so
207
+ // track what we want to delete and do it after we are done ascending.
208
+ var toDelete []* item [T ]
206
209
w .queue .Ascend (func (item * item [T ]) bool {
207
210
if item .readyAt != nil {
208
211
if readyAt := item .readyAt .Sub (w .now ()); readyAt > 0 {
@@ -230,12 +233,16 @@ func (w *priorityqueue[T]) spin() {
230
233
w .locked .Insert (item .key )
231
234
w .waiters .Add (- 1 )
232
235
delete (w .items , item .key )
233
- w . queue . Delete ( item )
236
+ toDelete = append ( toDelete , item )
234
237
w .becameReady .Delete (item .key )
235
238
w .get <- * item
236
239
237
240
return true
238
241
})
242
+
243
+ for _ , item := range toDelete {
244
+ w .queue .Delete (item )
245
+ }
239
246
}()
240
247
}
241
248
}
Original file line number Diff line number Diff line change @@ -2,6 +2,7 @@ package priorityqueue
2
2
3
3
import (
4
4
"fmt"
5
+ "math/rand/v2"
5
6
"sync"
6
7
"testing"
7
8
"time"
@@ -344,6 +345,40 @@ var _ = Describe("Controllerworkqueue", func() {
344
345
Expect (metrics .depth ["test" ]).To (Equal (4 ))
345
346
metrics .mu .Unlock ()
346
347
})
348
+
349
+ It ("returns many items" , func () {
350
+ // This test ensures the queue is able to drain a large queue without panic'ing.
351
+ // In a previous version of the code we were calling queue.Delete within q.Ascend
352
+ // which led to a panic in queue.Ascend > iterate:
353
+ // "panic: runtime error: index out of range [0] with length 0"
354
+ q , _ := newQueue ()
355
+ defer q .ShutDown ()
356
+
357
+ for range 20 {
358
+ for i := range 1000 {
359
+ rn := rand .N (100 ) //nolint:gosec // We don't need cryptographically secure entropy here
360
+ if rn < 10 {
361
+ q .AddWithOpts (AddOpts {After : time .Duration (rn ) * time .Millisecond }, fmt .Sprintf ("foo%d" , i ))
362
+ } else {
363
+ q .AddWithOpts (AddOpts {Priority : rn }, fmt .Sprintf ("foo%d" , i ))
364
+ }
365
+ }
366
+
367
+ wg := sync.WaitGroup {}
368
+ for range 100 { // The panic only occurred relatively frequently with a high number of go routines.
369
+ wg .Add (1 )
370
+ go func () {
371
+ defer wg .Done ()
372
+ for range 10 {
373
+ obj , _ , _ := q .GetWithPriority ()
374
+ q .Done (obj )
375
+ }
376
+ }()
377
+ }
378
+
379
+ wg .Wait ()
380
+ }
381
+ })
347
382
})
348
383
349
384
func BenchmarkAddGetDone (b * testing.B ) {
You can’t perform that action at this time.
0 commit comments