Skip to content

Commit 938a9fb

Browse files
neildgopherbot
authored andcommitted
internal/http3: add request/response body transfer
For golang/go#70914 Change-Id: I372458214fe73f8156e0ec291168b043c10221e6 Reviewed-on: https://go-review.googlesource.com/c/net/+/644915 Reviewed-by: Brad Fitzpatrick <bradfitz@golang.org> LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com> Auto-Submit: Damien Neil <dneil@google.com> Reviewed-by: Jonathan Amsterdam <jba@google.com>
1 parent 145b2d7 commit 938a9fb

File tree

5 files changed

+707
-12
lines changed

5 files changed

+707
-12
lines changed

internal/http3/body.go

+142
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
// Copyright 2025 The Go Authors. All rights reserved.
2+
// Use of this source code is governed by a BSD-style
3+
// license that can be found in the LICENSE file.
4+
5+
//go:build go1.24
6+
7+
package http3
8+
9+
import (
10+
"errors"
11+
"fmt"
12+
"io"
13+
"sync"
14+
)
15+
16+
// A bodyWriter writes a request or response body to a stream
17+
// as a series of DATA frames.
18+
type bodyWriter struct {
19+
st *stream
20+
remain int64 // -1 when content-length is not known
21+
flush bool // flush the stream after every write
22+
name string // "request" or "response"
23+
}
24+
25+
func (w *bodyWriter) Write(p []byte) (n int, err error) {
26+
if w.remain >= 0 && int64(len(p)) > w.remain {
27+
return 0, &streamError{
28+
code: errH3InternalError,
29+
message: w.name + " body longer than specified content length",
30+
}
31+
}
32+
w.st.writeVarint(int64(frameTypeData))
33+
w.st.writeVarint(int64(len(p)))
34+
n, err = w.st.Write(p)
35+
if w.remain >= 0 {
36+
w.remain -= int64(n)
37+
}
38+
if w.flush && err == nil {
39+
err = w.st.Flush()
40+
}
41+
if err != nil {
42+
err = fmt.Errorf("writing %v body: %w", w.name, err)
43+
}
44+
return n, err
45+
}
46+
47+
func (w *bodyWriter) Close() error {
48+
if w.remain > 0 {
49+
return errors.New(w.name + " body shorter than specified content length")
50+
}
51+
return nil
52+
}
53+
54+
// A bodyReader reads a request or response body from a stream.
55+
type bodyReader struct {
56+
st *stream
57+
58+
mu sync.Mutex
59+
remain int64
60+
err error
61+
}
62+
63+
func (r *bodyReader) Read(p []byte) (n int, err error) {
64+
// The HTTP/1 and HTTP/2 implementations both permit concurrent reads from a body,
65+
// in the sense that the race detector won't complain.
66+
// Use a mutex here to provide the same behavior.
67+
r.mu.Lock()
68+
defer r.mu.Unlock()
69+
if r.err != nil {
70+
return 0, r.err
71+
}
72+
defer func() {
73+
if err != nil {
74+
r.err = err
75+
}
76+
}()
77+
if r.st.lim == 0 {
78+
// We've finished reading the previous DATA frame, so end it.
79+
if err := r.st.endFrame(); err != nil {
80+
return 0, err
81+
}
82+
}
83+
// Read the next DATA frame header,
84+
// if we aren't already in the middle of one.
85+
for r.st.lim < 0 {
86+
ftype, err := r.st.readFrameHeader()
87+
if err == io.EOF && r.remain > 0 {
88+
return 0, &streamError{
89+
code: errH3MessageError,
90+
message: "body shorter than content-length",
91+
}
92+
}
93+
if err != nil {
94+
return 0, err
95+
}
96+
switch ftype {
97+
case frameTypeData:
98+
if r.remain >= 0 && r.st.lim > r.remain {
99+
return 0, &streamError{
100+
code: errH3MessageError,
101+
message: "body longer than content-length",
102+
}
103+
}
104+
// Fall out of the loop and process the frame body below.
105+
case frameTypeHeaders:
106+
// This HEADERS frame contains the message trailers.
107+
if r.remain > 0 {
108+
return 0, &streamError{
109+
code: errH3MessageError,
110+
message: "body shorter than content-length",
111+
}
112+
}
113+
// TODO: Fill in Request.Trailer.
114+
if err := r.st.discardFrame(); err != nil {
115+
return 0, err
116+
}
117+
return 0, io.EOF
118+
default:
119+
if err := r.st.discardUnknownFrame(ftype); err != nil {
120+
return 0, err
121+
}
122+
}
123+
}
124+
// We are now reading the content of a DATA frame.
125+
// Fill the read buffer or read to the end of the frame,
126+
// whichever comes first.
127+
if int64(len(p)) > r.st.lim {
128+
p = p[:r.st.lim]
129+
}
130+
n, err = r.st.Read(p)
131+
if r.remain > 0 {
132+
r.remain -= int64(n)
133+
}
134+
return n, err
135+
}
136+
137+
func (r *bodyReader) Close() error {
138+
// Unlike the HTTP/1 and HTTP/2 body readers (at the time of this comment being written),
139+
// calling Close concurrently with Read will interrupt the read.
140+
r.st.stream.CloseRead()
141+
return nil
142+
}

internal/http3/http3_test.go

+7
Original file line numberDiff line numberDiff line change
@@ -73,3 +73,10 @@ func unhex(s string) []byte {
7373
}
7474
return b
7575
}
76+
77+
// testReader implements io.Reader.
78+
type testReader struct {
79+
readFunc func([]byte) (int, error)
80+
}
81+
82+
func (r testReader) Read(p []byte) (n int, err error) { return r.readFunc(p) }

internal/http3/roundtrip.go

+112-12
Original file line numberDiff line numberDiff line change
@@ -7,31 +7,74 @@
77
package http3
88

99
import (
10+
"errors"
1011
"io"
1112
"net/http"
1213
"strconv"
14+
"sync"
1315

1416
"golang.org/x/net/internal/httpcommon"
1517
)
1618

19+
type roundTripState struct {
20+
cc *ClientConn
21+
st *stream
22+
23+
// Request body, provided by the caller.
24+
onceCloseReqBody sync.Once
25+
reqBody io.ReadCloser
26+
27+
reqBodyWriter bodyWriter
28+
29+
// Response.Body, provided to the caller.
30+
respBody bodyReader
31+
32+
errOnce sync.Once
33+
err error
34+
}
35+
36+
// abort terminates the RoundTrip.
37+
// It returns the first fatal error encountered by the RoundTrip call.
38+
func (rt *roundTripState) abort(err error) error {
39+
rt.errOnce.Do(func() {
40+
rt.err = err
41+
switch e := err.(type) {
42+
case *connectionError:
43+
rt.cc.abort(e)
44+
case *streamError:
45+
rt.st.stream.CloseRead()
46+
rt.st.stream.Reset(uint64(e.code))
47+
default:
48+
rt.st.stream.CloseRead()
49+
rt.st.stream.Reset(uint64(errH3NoError))
50+
}
51+
})
52+
return rt.err
53+
}
54+
55+
// closeReqBody closes the Request.Body, at most once.
56+
func (rt *roundTripState) closeReqBody() {
57+
if rt.reqBody != nil {
58+
rt.onceCloseReqBody.Do(func() {
59+
rt.reqBody.Close()
60+
})
61+
}
62+
}
63+
1764
// RoundTrip sends a request on the connection.
1865
func (cc *ClientConn) RoundTrip(req *http.Request) (_ *http.Response, err error) {
1966
// Each request gets its own QUIC stream.
2067
st, err := newConnStream(req.Context(), cc.qconn, streamTypeRequest)
2168
if err != nil {
2269
return nil, err
2370
}
71+
rt := &roundTripState{
72+
cc: cc,
73+
st: st,
74+
}
2475
defer func() {
25-
switch e := err.(type) {
26-
case nil:
27-
case *connectionError:
28-
cc.abort(e)
29-
case *streamError:
30-
st.stream.CloseRead()
31-
st.stream.Reset(uint64(e.code))
32-
default:
33-
st.stream.CloseRead()
34-
st.stream.Reset(uint64(errH3NoError))
76+
if err != nil {
77+
err = rt.abort(err)
3578
}
3679
}()
3780

@@ -64,7 +107,13 @@ func (cc *ClientConn) RoundTrip(req *http.Request) (_ *http.Response, err error)
64107
}
65108

66109
if encr.HasBody {
67-
// TODO: Send the request body.
110+
// TODO: Defer sending the request body when "Expect: 100-continue" is set.
111+
rt.reqBody = req.Body
112+
rt.reqBodyWriter.st = st
113+
rt.reqBodyWriter.remain = httpcommon.ActualContentLength(req)
114+
rt.reqBodyWriter.flush = true
115+
rt.reqBodyWriter.name = "request"
116+
go copyRequestBody(rt)
68117
}
69118

70119
// Read the response headers.
@@ -91,14 +140,16 @@ func (cc *ClientConn) RoundTrip(req *http.Request) (_ *http.Response, err error)
91140
if err != nil {
92141
return nil, err
93142
}
143+
rt.respBody.st = st
144+
rt.respBody.remain = contentLength
94145
resp := &http.Response{
95146
Proto: "HTTP/3.0",
96147
ProtoMajor: 3,
97148
Header: h,
98149
StatusCode: statusCode,
99150
Status: strconv.Itoa(statusCode) + " " + http.StatusText(statusCode),
100151
ContentLength: contentLength,
101-
Body: io.NopCloser(nil), // TODO: read the response body
152+
Body: (*transportResponseBody)(rt),
102153
}
103154
// TODO: Automatic Content-Type: gzip decoding.
104155
return resp, nil
@@ -114,6 +165,55 @@ func (cc *ClientConn) RoundTrip(req *http.Request) (_ *http.Response, err error)
114165
}
115166
}
116167

168+
func copyRequestBody(rt *roundTripState) {
169+
defer rt.closeReqBody()
170+
_, err := io.Copy(&rt.reqBodyWriter, rt.reqBody)
171+
if closeErr := rt.reqBodyWriter.Close(); err == nil {
172+
err = closeErr
173+
}
174+
if err != nil {
175+
// Something went wrong writing the body.
176+
rt.abort(err)
177+
} else {
178+
// We wrote the whole body.
179+
rt.st.stream.CloseWrite()
180+
}
181+
}
182+
183+
// transportResponseBody is the Response.Body returned by RoundTrip.
184+
type transportResponseBody roundTripState
185+
186+
// Read is Response.Body.Read.
187+
func (b *transportResponseBody) Read(p []byte) (n int, err error) {
188+
return b.respBody.Read(p)
189+
}
190+
191+
var errRespBodyClosed = errors.New("response body closed")
192+
193+
// Close is Response.Body.Close.
194+
// Closing the response body is how the caller signals that they're done with a request.
195+
func (b *transportResponseBody) Close() error {
196+
rt := (*roundTripState)(b)
197+
// Close the request body, which should wake up copyRequestBody if it's
198+
// currently blocked reading the body.
199+
rt.closeReqBody()
200+
// Close the request stream, since we're done with the request.
201+
// Reset closes the sending half of the stream.
202+
rt.st.stream.Reset(uint64(errH3NoError))
203+
// respBody.Close is responsible for closing the receiving half.
204+
err := rt.respBody.Close()
205+
if err == nil {
206+
err = errRespBodyClosed
207+
}
208+
err = rt.abort(err)
209+
if err == errRespBodyClosed {
210+
// No other errors occurred before closing Response.Body,
211+
// so consider this a successful request.
212+
return nil
213+
}
214+
return err
215+
}
216+
117217
func parseResponseContentLength(method string, statusCode int, h http.Header) (int64, error) {
118218
clens := h["Content-Length"]
119219
if len(clens) == 0 {

0 commit comments

Comments
 (0)