Skip to content

Commit 120fc90

Browse files
committedMay 15, 2023
http2: change default frame scheduler to round robin
The priority scheduler allows stream starvation (see golang/go#58804) and is CPU intensive. In addition, the RFC 7540 prioritization scheme it implements was deprecated in RFC 9113 and does not appear to have ever had significant adoption. Add a simple round-robin scheduler and enable it by default. For golang/go#58804 Change-Id: I5c5143aa9bc339fc0894f70d773fa7c0d7d87eef Reviewed-on: https://go-review.googlesource.com/c/net/+/478735 TryBot-Result: Gopher Robot <gobot@golang.org> Reviewed-by: Bryan Mills <bcmills@google.com> Run-TryBot: Damien Neil <dneil@google.com>
1 parent 2b0b97d commit 120fc90

File tree

4 files changed

+187
-2
lines changed

4 files changed

+187
-2
lines changed
 

‎http2/server.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -441,7 +441,7 @@ func (s *Server) ServeConn(c net.Conn, opts *ServeConnOpts) {
441441
if s.NewWriteScheduler != nil {
442442
sc.writeSched = s.NewWriteScheduler()
443443
} else {
444-
sc.writeSched = NewPriorityWriteScheduler(nil)
444+
sc.writeSched = newRoundRobinWriteScheduler()
445445
}
446446

447447
// These start at the RFC-specified defaults. If there is a higher

‎http2/writesched.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,8 @@ func (wr *FrameWriteRequest) replyToWriter(err error) {
184184

185185
// writeQueue is used by implementations of WriteScheduler.
186186
type writeQueue struct {
187-
s []FrameWriteRequest
187+
s []FrameWriteRequest
188+
prev, next *writeQueue
188189
}
189190

190191
func (q *writeQueue) empty() bool { return len(q.s) == 0 }

‎http2/writesched_roundrobin.go

+119
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
// Copyright 2023 The Go Authors. All rights reserved.
2+
// Use of this source code is governed by a BSD-style
3+
// license that can be found in the LICENSE file.
4+
5+
package http2
6+
7+
import (
8+
"fmt"
9+
"math"
10+
)
11+
12+
type roundRobinWriteScheduler struct {
13+
// control contains control frames (SETTINGS, PING, etc.).
14+
control writeQueue
15+
16+
// streams maps stream ID to a queue.
17+
streams map[uint32]*writeQueue
18+
19+
// stream queues are stored in a circular linked list.
20+
// head is the next stream to write, or nil if there are no streams open.
21+
head *writeQueue
22+
23+
// pool of empty queues for reuse.
24+
queuePool writeQueuePool
25+
}
26+
27+
// newRoundRobinWriteScheduler constructs a new write scheduler.
28+
// The round robin scheduler priorizes control frames
29+
// like SETTINGS and PING over DATA frames.
30+
// When there are no control frames to send, it performs a round-robin
31+
// selection from the ready streams.
32+
func newRoundRobinWriteScheduler() WriteScheduler {
33+
ws := &roundRobinWriteScheduler{
34+
streams: make(map[uint32]*writeQueue),
35+
}
36+
return ws
37+
}
38+
39+
func (ws *roundRobinWriteScheduler) OpenStream(streamID uint32, options OpenStreamOptions) {
40+
if ws.streams[streamID] != nil {
41+
panic(fmt.Errorf("stream %d already opened", streamID))
42+
}
43+
q := ws.queuePool.get()
44+
ws.streams[streamID] = q
45+
if ws.head == nil {
46+
ws.head = q
47+
q.next = q
48+
q.prev = q
49+
} else {
50+
// Queues are stored in a ring.
51+
// Insert the new stream before ws.head, putting it at the end of the list.
52+
q.prev = ws.head.prev
53+
q.next = ws.head
54+
q.prev.next = q
55+
q.next.prev = q
56+
}
57+
}
58+
59+
func (ws *roundRobinWriteScheduler) CloseStream(streamID uint32) {
60+
q := ws.streams[streamID]
61+
if q == nil {
62+
return
63+
}
64+
if q.next == q {
65+
// This was the only open stream.
66+
ws.head = nil
67+
} else {
68+
q.prev.next = q.next
69+
q.next.prev = q.prev
70+
if ws.head == q {
71+
ws.head = q.next
72+
}
73+
}
74+
delete(ws.streams, streamID)
75+
ws.queuePool.put(q)
76+
}
77+
78+
func (ws *roundRobinWriteScheduler) AdjustStream(streamID uint32, priority PriorityParam) {}
79+
80+
func (ws *roundRobinWriteScheduler) Push(wr FrameWriteRequest) {
81+
if wr.isControl() {
82+
ws.control.push(wr)
83+
return
84+
}
85+
q := ws.streams[wr.StreamID()]
86+
if q == nil {
87+
// This is a closed stream.
88+
// wr should not be a HEADERS or DATA frame.
89+
// We push the request onto the control queue.
90+
if wr.DataSize() > 0 {
91+
panic("add DATA on non-open stream")
92+
}
93+
ws.control.push(wr)
94+
return
95+
}
96+
q.push(wr)
97+
}
98+
99+
func (ws *roundRobinWriteScheduler) Pop() (FrameWriteRequest, bool) {
100+
// Control and RST_STREAM frames first.
101+
if !ws.control.empty() {
102+
return ws.control.shift(), true
103+
}
104+
if ws.head == nil {
105+
return FrameWriteRequest{}, false
106+
}
107+
q := ws.head
108+
for {
109+
if wr, ok := q.consume(math.MaxInt32); ok {
110+
ws.head = q.next
111+
return wr, true
112+
}
113+
q = q.next
114+
if q == ws.head {
115+
break
116+
}
117+
}
118+
return FrameWriteRequest{}, false
119+
}

‎http2/writesched_roundrobin_test.go

+65
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
// Copyright 2023 The Go Authors. All rights reserved.
2+
// Use of this source code is governed by a BSD-style
3+
// license that can be found in the LICENSE file.
4+
5+
package http2
6+
7+
import (
8+
"reflect"
9+
"testing"
10+
)
11+
12+
func TestRoundRobinScheduler(t *testing.T) {
13+
const maxFrameSize = 16
14+
sc := &serverConn{maxFrameSize: maxFrameSize}
15+
ws := newRoundRobinWriteScheduler()
16+
streams := make([]*stream, 4)
17+
for i := range streams {
18+
streamID := uint32(i) + 1
19+
streams[i] = &stream{
20+
id: streamID,
21+
sc: sc,
22+
}
23+
streams[i].flow.add(1 << 20) // arbitrary large value
24+
ws.OpenStream(streamID, OpenStreamOptions{})
25+
wr := FrameWriteRequest{
26+
write: &writeData{
27+
streamID: streamID,
28+
p: make([]byte, maxFrameSize*(i+1)),
29+
endStream: false,
30+
},
31+
stream: streams[i],
32+
}
33+
ws.Push(wr)
34+
}
35+
const controlFrames = 2
36+
for i := 0; i < controlFrames; i++ {
37+
ws.Push(makeWriteNonStreamRequest())
38+
}
39+
40+
// We should get the control frames first.
41+
for i := 0; i < controlFrames; i++ {
42+
wr, ok := ws.Pop()
43+
if !ok || wr.StreamID() != 0 {
44+
t.Fatalf("wr.Pop() = stream %v, %v; want 0, true", wr.StreamID(), ok)
45+
}
46+
}
47+
48+
// Each stream should write maxFrameSize bytes until it runs out of data.
49+
// Stream 1 has one frame of data, 2 has two frames, etc.
50+
want := []uint32{1, 2, 3, 4, 2, 3, 4, 3, 4, 4}
51+
var got []uint32
52+
for {
53+
wr, ok := ws.Pop()
54+
if !ok {
55+
break
56+
}
57+
if wr.DataSize() != maxFrameSize {
58+
t.Fatalf("wr.Pop() = %v data bytes, want %v", wr.DataSize(), maxFrameSize)
59+
}
60+
got = append(got, wr.StreamID())
61+
}
62+
if !reflect.DeepEqual(got, want) {
63+
t.Fatalf("popped streams %v, want %v", got, want)
64+
}
65+
}

0 commit comments

Comments
 (0)