-
Notifications
You must be signed in to change notification settings - Fork 1.2k
/
Copy pathconn.go
133 lines (121 loc) · 3.39 KB
/
conn.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
// Copyright 2025 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
//go:build go1.24
package http3
import (
"context"
"io"
"sync"
"golang.org/x/net/quic"
)
type streamHandler interface {
handleControlStream(*stream) error
handlePushStream(*stream) error
handleEncoderStream(*stream) error
handleDecoderStream(*stream) error
handleRequestStream(*stream) error
abort(error)
}
type genericConn struct {
mu sync.Mutex
// The peer may create exactly one control, encoder, and decoder stream.
// streamsCreated is a bitset of streams created so far.
// Bits are 1 << streamType.
streamsCreated uint8
}
func (c *genericConn) acceptStreams(qconn *quic.Conn, h streamHandler) {
for {
// Use context.Background: This blocks until a stream is accepted
// or the connection closes.
st, err := qconn.AcceptStream(context.Background())
if err != nil {
return // connection closed
}
if st.IsReadOnly() {
go c.handleUnidirectionalStream(newStream(st), h)
} else {
go c.handleRequestStream(newStream(st), h)
}
}
}
func (c *genericConn) handleUnidirectionalStream(st *stream, h streamHandler) {
// Unidirectional stream header: One varint with the stream type.
v, err := st.readVarint()
if err != nil {
h.abort(&connectionError{
code: errH3StreamCreationError,
message: "error reading unidirectional stream header",
})
return
}
stype := streamType(v)
if err := c.checkStreamCreation(stype); err != nil {
h.abort(err)
return
}
switch stype {
case streamTypeControl:
err = h.handleControlStream(st)
case streamTypePush:
err = h.handlePushStream(st)
case streamTypeEncoder:
err = h.handleEncoderStream(st)
case streamTypeDecoder:
err = h.handleDecoderStream(st)
default:
// "Recipients of unknown stream types MUST either abort reading
// of the stream or discard incoming data without further processing."
// https://www.rfc-editor.org/rfc/rfc9114.html#section-6.2-7
//
// We should send the H3_STREAM_CREATION_ERROR error code,
// but the quic package currently doesn't allow setting error codes
// for STOP_SENDING frames.
// TODO: Should CloseRead take an error code?
err = nil
}
if err == io.EOF {
err = &connectionError{
code: errH3ClosedCriticalStream,
message: streamType(stype).String() + " stream closed",
}
}
c.handleStreamError(st, h, err)
}
func (c *genericConn) handleRequestStream(st *stream, h streamHandler) {
c.handleStreamError(st, h, h.handleRequestStream(st))
}
func (c *genericConn) handleStreamError(st *stream, h streamHandler, err error) {
switch err := err.(type) {
case *connectionError:
h.abort(err)
case nil:
st.stream.CloseRead()
st.stream.CloseWrite()
case *streamError:
st.stream.CloseRead()
st.stream.Reset(uint64(err.code))
default:
st.stream.CloseRead()
st.stream.Reset(uint64(errH3InternalError))
}
}
func (c *genericConn) checkStreamCreation(stype streamType) error {
switch stype {
case streamTypeControl, streamTypeEncoder, streamTypeDecoder:
// The peer may create exactly one control, encoder, and decoder stream.
default:
return nil
}
c.mu.Lock()
defer c.mu.Unlock()
bit := uint8(1) << stype
if c.streamsCreated&bit != 0 {
return &connectionError{
code: errH3StreamCreationError,
message: "multiple " + stype.String() + " streams created",
}
}
c.streamsCreated |= bit
return nil
}