Skip to content

Commit 9842725

Browse files
authored
Add Stream option to PerformRequestOptions (#1478)
Add a `Stream` option to `PerformRequestOptions` which allows direct access to the HTTP response body. Close #1444
1 parent 233b883 commit 9842725

File tree

4 files changed

+51
-8
lines changed

4 files changed

+51
-8
lines changed

client.go

+10-3
Original file line numberDiff line numberDiff line change
@@ -1286,6 +1286,7 @@ type PerformRequestOptions struct {
12861286
RetryStatusCodes []int
12871287
Headers http.Header
12881288
MaxResponseSize int64
1289+
Stream bool
12891290
}
12901291

12911292
// PerformRequest does a HTTP request to Elasticsearch.
@@ -1294,6 +1295,9 @@ type PerformRequestOptions struct {
12941295
// Optionally, a list of HTTP error codes to ignore can be passed.
12951296
// This is necessary for services that expect e.g. HTTP status 404 as a
12961297
// valid outcome (Exists, IndicesExists, IndicesTypeExists).
1298+
//
1299+
// If Stream is set, the returned BodyReader field must be closed, even
1300+
// if PerformRequest returns an error.
12971301
func (c *Client) PerformRequest(ctx context.Context, opt PerformRequestOptions) (*Response, error) {
12981302
start := time.Now().UTC()
12991303

@@ -1448,7 +1452,10 @@ func (c *Client) PerformRequest(ctx context.Context, opt PerformRequestOptions)
14481452
continue // try again
14491453
}
14501454
}
1451-
defer res.Body.Close()
1455+
1456+
if !opt.Stream {
1457+
defer res.Body.Close()
1458+
}
14521459

14531460
// Tracing
14541461
c.dumpResponse(res)
@@ -1465,14 +1472,14 @@ func (c *Client) PerformRequest(ctx context.Context, opt PerformRequestOptions)
14651472
if err := checkResponse((*http.Request)(req), res, opt.IgnoreErrors...); err != nil {
14661473
// No retry if request succeeded
14671474
// We still try to return a response.
1468-
resp, _ = c.newResponse(res, opt.MaxResponseSize)
1475+
resp, _ = c.newResponse(res, opt.MaxResponseSize, opt.Stream)
14691476
return resp, err
14701477
}
14711478

14721479
// We successfully made a request with this connection
14731480
conn.MarkAsHealthy()
14741481

1475-
resp, err = c.newResponse(res, opt.MaxResponseSize)
1482+
resp, err = c.newResponse(res, opt.MaxResponseSize, opt.Stream)
14761483
if err != nil {
14771484
return nil, err
14781485
}

client_test.go

+33-1
Original file line numberDiff line numberDiff line change
@@ -859,7 +859,7 @@ func TestClientSniffTimeoutLeak(t *testing.T) {
859859
cli := &Client{
860860
c: &http.Client{},
861861
conns: []*conn{
862-
&conn{
862+
{
863863
url: "http://" + addr + "/",
864864
},
865865
},
@@ -1161,6 +1161,38 @@ func TestPerformRequest(t *testing.T) {
11611161
}
11621162
}
11631163

1164+
func TestPerformRequestWithStream(t *testing.T) {
1165+
client, err := NewClient()
1166+
if err != nil {
1167+
t.Fatal(err)
1168+
}
1169+
res, err := client.PerformRequest(context.TODO(), PerformRequestOptions{
1170+
Method: "GET",
1171+
Path: "/",
1172+
Stream: true,
1173+
})
1174+
if err != nil {
1175+
t.Fatal(err)
1176+
}
1177+
if res == nil {
1178+
t.Fatal("expected response to be != nil")
1179+
}
1180+
if res.BodyReader == nil {
1181+
t.Fatal("expected res.BodyReader to be != nil")
1182+
}
1183+
if res.Body != nil {
1184+
t.Fatal("expected res.Body to be == nil")
1185+
}
1186+
1187+
ret := new(PingResult)
1188+
if err := json.NewDecoder(res.BodyReader).Decode(ret); err != nil {
1189+
t.Fatalf("expected no error on decode; got: %v", err)
1190+
}
1191+
if ret.ClusterName == "" {
1192+
t.Errorf("expected cluster name; got: %q", ret.ClusterName)
1193+
}
1194+
}
1195+
11641196
func TestPerformRequestWithSimpleClient(t *testing.T) {
11651197
client, err := NewSimpleClient()
11661198
if err != nil {

response.go

+7-3
Original file line numberDiff line numberDiff line change
@@ -24,21 +24,25 @@ type Response struct {
2424
// Header is the HTTP header from the HTTP response.
2525
// Keys in the map are canonicalized (see http.CanonicalHeaderKey).
2626
Header http.Header
27-
// Body is the deserialized response body.
27+
// Body is the deserialized response body. Only available if streaming is disabled.
2828
Body json.RawMessage
2929
// DeprecationWarnings lists all deprecation warnings returned from
3030
// Elasticsearch.
3131
DeprecationWarnings []string
32+
// BodyReader is the body as a reader. Only available if streaming is enabled.
33+
BodyReader io.ReadCloser
3234
}
3335

3436
// newResponse creates a new response from the HTTP response.
35-
func (c *Client) newResponse(res *http.Response, maxBodySize int64) (*Response, error) {
37+
func (c *Client) newResponse(res *http.Response, maxBodySize int64, stream bool) (*Response, error) {
3638
r := &Response{
3739
StatusCode: res.StatusCode,
3840
Header: res.Header,
3941
DeprecationWarnings: res.Header["Warning"],
4042
}
41-
if res.Body != nil {
43+
if stream {
44+
r.BodyReader = res.Body
45+
} else if res.Body != nil {
4246
body := io.Reader(res.Body)
4347
if maxBodySize > 0 {
4448
if res.ContentLength > maxBodySize {

response_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ func BenchmarkResponse(b *testing.B) {
2929
StatusCode: http.StatusOK,
3030
}
3131
var err error
32-
resp, err = c.newResponse(res, 0)
32+
resp, err = c.newResponse(res, 0, false)
3333
if err != nil {
3434
b.Fatal(err)
3535
}

0 commit comments

Comments
 (0)