Skip to content

Commit b1d7f56

Browse files
authored
transport: Fix deadlock in transport caused by GOAWAY race with new stream creation (#5652)
* transport: Fix deadlock in transport caused by GOAWAY race with new stream creation
1 parent 9c3e589 commit b1d7f56

File tree

3 files changed

+181
-7
lines changed

3 files changed

+181
-7
lines changed

internal/transport/http2_client.go

+17-6
Original file line numberDiff line numberDiff line change
@@ -1232,18 +1232,29 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
12321232
if upperLimit == 0 { // This is the first GoAway Frame.
12331233
upperLimit = math.MaxUint32 // Kill all streams after the GoAway ID.
12341234
}
1235+
1236+
t.prevGoAwayID = id
1237+
if len(t.activeStreams) == 0 {
1238+
t.mu.Unlock()
1239+
t.Close(connectionErrorf(true, nil, "received goaway and there are no active streams"))
1240+
return
1241+
}
1242+
1243+
streamsToClose := make([]*Stream, 0)
12351244
for streamID, stream := range t.activeStreams {
12361245
if streamID > id && streamID <= upperLimit {
12371246
// The stream was unprocessed by the server.
1238-
atomic.StoreUint32(&stream.unprocessed, 1)
1239-
t.closeStream(stream, errStreamDrain, false, http2.ErrCodeNo, statusGoAway, nil, false)
1247+
if streamID > id && streamID <= upperLimit {
1248+
atomic.StoreUint32(&stream.unprocessed, 1)
1249+
streamsToClose = append(streamsToClose, stream)
1250+
}
12401251
}
12411252
}
1242-
t.prevGoAwayID = id
1243-
active := len(t.activeStreams)
12441253
t.mu.Unlock()
1245-
if active == 0 {
1246-
t.Close(connectionErrorf(true, nil, "received goaway and there are no active streams"))
1254+
// Called outside t.mu because closeStream can take controlBuf's mu, which
1255+
// could induce deadlock and is not allowed.
1256+
for _, stream := range streamsToClose {
1257+
t.closeStream(stream, errStreamDrain, false, http2.ErrCodeNo, statusGoAway, nil, false)
12471258
}
12481259
}
12491260

test/clienttester.go

+109
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
/*
2+
* Copyright 2022 gRPC authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package test
18+
19+
import (
20+
"bytes"
21+
"io"
22+
"net"
23+
"testing"
24+
25+
"golang.org/x/net/http2"
26+
)
27+
28+
var (
29+
clientPreface = []byte(http2.ClientPreface)
30+
)
31+
32+
func newClientTester(t *testing.T, conn net.Conn) *clientTester {
33+
ct := &clientTester{
34+
t: t,
35+
conn: conn,
36+
}
37+
ct.fr = http2.NewFramer(conn, conn)
38+
ct.greet()
39+
return ct
40+
}
41+
42+
type clientTester struct {
43+
t *testing.T
44+
conn net.Conn
45+
fr *http2.Framer
46+
}
47+
48+
// greet() performs the necessary steps for http2 connection establishment on
49+
// the server side.
50+
func (ct *clientTester) greet() {
51+
ct.wantClientPreface()
52+
ct.wantSettingsFrame()
53+
ct.writeSettingsFrame()
54+
ct.writeSettingsAck()
55+
56+
for {
57+
f, err := ct.fr.ReadFrame()
58+
if err != nil {
59+
ct.t.Errorf("error reading frame from client side: %v", err)
60+
}
61+
switch f := f.(type) {
62+
case *http2.SettingsFrame:
63+
if f.IsAck() { // HTTP/2 handshake completed.
64+
return
65+
}
66+
default:
67+
ct.t.Errorf("during greet, unexpected frame type %T", f)
68+
}
69+
}
70+
}
71+
72+
func (ct *clientTester) wantClientPreface() {
73+
preface := make([]byte, len(clientPreface))
74+
if _, err := io.ReadFull(ct.conn, preface); err != nil {
75+
ct.t.Errorf("Error at server-side while reading preface from client. Err: %v", err)
76+
}
77+
if !bytes.Equal(preface, clientPreface) {
78+
ct.t.Errorf("received bogus greeting from client %q", preface)
79+
}
80+
}
81+
82+
func (ct *clientTester) wantSettingsFrame() {
83+
frame, err := ct.fr.ReadFrame()
84+
if err != nil {
85+
ct.t.Errorf("error reading initial settings frame from client: %v", err)
86+
}
87+
_, ok := frame.(*http2.SettingsFrame)
88+
if !ok {
89+
ct.t.Errorf("initial frame sent from client is not a settings frame, type %T", frame)
90+
}
91+
}
92+
93+
func (ct *clientTester) writeSettingsFrame() {
94+
if err := ct.fr.WriteSettings(); err != nil {
95+
ct.t.Fatalf("Error writing initial SETTINGS frame from client to server: %v", err)
96+
}
97+
}
98+
99+
func (ct *clientTester) writeSettingsAck() {
100+
if err := ct.fr.WriteSettingsAck(); err != nil {
101+
ct.t.Fatalf("Error writing ACK of client's SETTINGS: %v", err)
102+
}
103+
}
104+
105+
func (ct *clientTester) writeGoAway(maxStreamID uint32, code http2.ErrCode, debugData []byte) {
106+
if err := ct.fr.WriteGoAway(maxStreamID, code, debugData); err != nil {
107+
ct.t.Fatalf("Error writing GOAWAY: %v", err)
108+
}
109+
}

test/end2end_test.go

+55-1
Original file line numberDiff line numberDiff line change
@@ -7407,7 +7407,6 @@ func (s *httpServer) start(t *testing.T, lis net.Listener) {
74077407
return
74087408
}
74097409
writer.Flush() // necessary since client is expecting preface before declaring connection fully setup.
7410-
74117410
var sid uint32
74127411
// Loop until conn is closed and framer returns io.EOF
74137412
for requestNum := 0; ; requestNum = (requestNum + 1) % len(s.responses) {
@@ -8130,3 +8129,58 @@ func (s) TestRecvWhileReturningStatus(t *testing.T) {
81308129
}
81318130
}
81328131
}
8132+
8133+
// TestGoAwayStreamIDSmallerThanCreatedStreams tests the scenario where a server
8134+
// sends a goaway with a stream id that is smaller than some created streams on
8135+
// the client, while the client is simultaneously creating new streams. This
8136+
// should not induce a deadlock.
8137+
func (s) TestGoAwayStreamIDSmallerThanCreatedStreams(t *testing.T) {
8138+
lis, err := net.Listen("tcp", "localhost:0")
8139+
if err != nil {
8140+
t.Fatalf("error listening: %v", err)
8141+
}
8142+
8143+
ctCh := testutils.NewChannel()
8144+
go func() {
8145+
conn, err := lis.Accept()
8146+
if err != nil {
8147+
t.Errorf("error in lis.Accept(): %v", err)
8148+
}
8149+
ct := newClientTester(t, conn)
8150+
ctCh.Send(ct)
8151+
}()
8152+
8153+
cc, err := grpc.Dial(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
8154+
if err != nil {
8155+
t.Fatalf("error dialing: %v", err)
8156+
}
8157+
defer cc.Close()
8158+
8159+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
8160+
defer cancel()
8161+
8162+
val, err := ctCh.Receive(ctx)
8163+
if err != nil {
8164+
t.Fatalf("timeout waiting for client transport (should be given after http2 creation)")
8165+
}
8166+
ct := val.(*clientTester)
8167+
8168+
tc := testpb.NewTestServiceClient(cc)
8169+
someStreamsCreated := grpcsync.NewEvent()
8170+
goAwayWritten := grpcsync.NewEvent()
8171+
go func() {
8172+
for i := 0; i < 20; i++ {
8173+
if i == 10 {
8174+
<-goAwayWritten.Done()
8175+
}
8176+
tc.FullDuplexCall(ctx)
8177+
if i == 4 {
8178+
someStreamsCreated.Fire()
8179+
}
8180+
}
8181+
}()
8182+
8183+
<-someStreamsCreated.Done()
8184+
ct.writeGoAway(1, http2.ErrCodeNo, []byte{})
8185+
goAwayWritten.Fire()
8186+
}

0 commit comments

Comments
 (0)