Skip to content

Commit 0bea191

Browse files
committed
Transport: Add support for request retries
This patch adds the configuration parameters and supporting logic for retrying a failed request: a feature common to official Elasticsearch clients. Executable example: ```golang //+build ignore package main import ( "context" "fmt" "log" "math" "net" "net/http" "os" "os/signal" "strings" "syscall" "time" "github.com/elastic/go-elasticsearch/v8" "github.com/elastic/go-elasticsearch/v8/esapi" "github.com/elastic/go-elasticsearch/v8/estransport" "github.com/cenkalti/backoff" ) var ( _ = fmt.Print _ = context.WithTimeout _ = math.Exp _ = strings.NewReader _ = http.DefaultClient ) func main() { log.SetFlags(0) ticker := time.NewTicker(time.Second) defer ticker.Stop() aborted := make(chan os.Signal) signal.Notify(aborted, os.Interrupt, syscall.SIGTERM) retryBackoff := backoff.NewExponentialBackOff() retryBackoff.InitialInterval = time.Second _ = retryBackoff cfg := elasticsearch.Config{ Addresses: []string{ "http://localhost:9200", // "http://localhost:1000", // "http://localhost:2000", // "http://localhost:3000", "http://localhost:4000", "http://localhost:5000", // "http://localhost:6000", // "http://localhost:7000", // "http://localhost:8000", // "http://localhost:9200", // "http://localhost:9000", // "http://localhost:9201", // "http://localhost:9202", }, Logger: &estransport.ColorLogger{ Output: os.Stdout, EnableRequestBody: true, EnableResponseBody: true, }, // RetryOnStatus: []int{404}, // EnableRetryOnTimeout: true, // MaxRetries: 10, // RetryBackoff: func(i int) time.Duration { // d := time.Duration(i) * time.Second // fmt.Printf("Attempt: %d | Sleeping for %s...\n", i, d) // return d // }, // RetryBackoff: func(i int) time.Duration { // d := time.Duration(math.Exp2(float64(i))) * time.Second // fmt.Printf("Attempt: %d | Sleeping for %s...\n", i, d) // return d // }, RetryBackoff: func(i int) time.Duration { if i == 1 { retryBackoff.Reset() } d := retryBackoff.NextBackOff() fmt.Printf("Attempt: %d | Sleeping for %s...\n", i, d) return d }, // Transport: &http.Transport{ // ResponseHeaderTimeout: time.Millisecond, // }, } es, err := elasticsearch.NewClient(cfg) if err != nil { log.Fatalf("Error creating the client: %s", err) } log.Println("Client ready with URLs:", es.Transport.(*estransport.Client).URLs()) go func() { <-aborted log.Println("\nDone!") os.Exit(0) }() for { select { case <-ticker.C: var ( res *esapi.Response err error ) // res, err = es.Info() // ctx, cancel := context.WithTimeout(context.Background(), time.Nanosecond) // defer cancel() // res, err = es.Info(es.Info.WithContext(ctx)) // es.Search(es.Search.WithTimeout(time.Nanosecond)) // res, err = es.Get("test", "MISSING") res, err = es.Index("test", strings.NewReader(`{"foo":"bar"}`)) if err != nil { if e, ok := err.(net.Error); ok { log.Fatalf("Error getting response: [%T]: %s (timeout:%v)", e, e, e.Timeout()) } else { log.Fatalf("Error getting response: [%T]: %s", err, err) } } if res.IsError() { log.Fatalf("Error response: %s", res.Status()) } } } } ``` Closes #67 (cherry picked from commit 7f37b7b)
1 parent 16f29dd commit 0bea191

8 files changed

+448
-73
lines changed

doc.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ Call the Elasticsearch APIs by invoking the corresponding methods on the client:
4242
4343
log.Println(res)
4444
45-
See the github.com/elastic/go-elasticsearch/esapi package for more information and examples.
45+
See the github.com/elastic/go-elasticsearch/esapi package for more information about using the API.
46+
47+
See the github.com/elastic/go-elasticsearch/estransport package for more information about configuring the transport.
4648
*/
4749
package elasticsearch

elasticsearch.go

+14
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"net/url"
1313
"os"
1414
"strings"
15+
"time"
1516

1617
"github.com/elastic/go-elasticsearch/v6/esapi"
1718
"github.com/elastic/go-elasticsearch/v6/estransport"
@@ -36,6 +37,13 @@ type Config struct {
3637
CloudID string // Endpoint for the Elastic Service (https://elastic.co/cloud).
3738
APIKey string // Base64-encoded token for authorization; if set, overrides username and password.
3839

40+
RetryOnStatus []int // List of status codes for retry. Default: 502, 503, 504.
41+
DisableRetry bool // Default: false.
42+
EnableRetryOnTimeout bool // Default: false.
43+
MaxRetries int // Default: 3.
44+
45+
RetryBackoff func(attempt int) time.Duration // Optional backoff duration. Default: nil.
46+
3947
Transport http.RoundTripper // The HTTP transport object.
4048
Logger estransport.Logger // The logger object.
4149
}
@@ -116,6 +124,12 @@ func NewClient(cfg Config) (*Client, error) {
116124
Password: cfg.Password,
117125
APIKey: cfg.APIKey,
118126

127+
RetryOnStatus: cfg.RetryOnStatus,
128+
DisableRetry: cfg.DisableRetry,
129+
EnableRetryOnTimeout: cfg.EnableRetryOnTimeout,
130+
MaxRetries: cfg.MaxRetries,
131+
RetryBackoff: cfg.RetryBackoff,
132+
119133
Transport: cfg.Transport,
120134
Logger: cfg.Logger,
121135
})

esapi/doc.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -85,10 +85,11 @@ about the API endpoints and parameters.
8585
The Go API is generated from the Elasticsearch JSON specification at
8686
https://github.com/elastic/elasticsearch/tree/master/rest-api-spec/src/main/resources/rest-api-spec/api
8787
by the internal package available at
88-
https://github.com/elastic/go-elasticsearch/tree/master/internal/cmd/generate/commands.
88+
https://github.com/elastic/go-elasticsearch/tree/master/internal/cmd/generate/commands/gensource.
8989
9090
The API is tested by integration tests common to all Elasticsearch official clients, generated from the
91-
source at https://github.com/elastic/elasticsearch/tree/master/rest-api-spec/src/main/resources/rest-api-spec/test. The generator is provided by the internal package internal/cmd/generate.
91+
source at https://github.com/elastic/elasticsearch/tree/master/rest-api-spec/src/main/resources/rest-api-spec/test.
92+
The generator is provided by the internal package available at internal/cmd/generate/commands/gentests.
9293
9394
*/
9495
package esapi

esapi/esapi.request.go

+1-30
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,9 @@
55
package esapi
66

77
import (
8-
"bytes"
98
"context"
109
"io"
11-
"io/ioutil"
1210
"net/http"
13-
"net/url"
14-
"strings"
1511
)
1612

1713
const (
@@ -31,30 +27,5 @@ type Request interface {
3127
// newRequest creates an HTTP request.
3228
//
3329
func newRequest(method, path string, body io.Reader) (*http.Request, error) {
34-
r := http.Request{
35-
Method: method,
36-
URL: &url.URL{Path: path},
37-
Proto: "HTTP/1.1",
38-
ProtoMajor: 1,
39-
ProtoMinor: 1,
40-
Header: make(http.Header),
41-
}
42-
43-
if body != nil {
44-
switch b := body.(type) {
45-
case *bytes.Buffer:
46-
r.Body = ioutil.NopCloser(body)
47-
r.ContentLength = int64(b.Len())
48-
case *bytes.Reader:
49-
r.Body = ioutil.NopCloser(body)
50-
r.ContentLength = int64(b.Len())
51-
case *strings.Reader:
52-
r.Body = ioutil.NopCloser(body)
53-
r.ContentLength = int64(b.Len())
54-
default:
55-
r.Body = ioutil.NopCloser(body)
56-
}
57-
}
58-
59-
return &r, nil
30+
return http.NewRequest(method, path, body)
6031
}

estransport/doc.go

+12-1
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,23 @@ Package estransport provides the transport layer for the Elasticsearch client.
44
It is automatically included in the client provided by the github.com/elastic/go-elasticsearch package
55
and is not intended for direct use: to configure the client, use the elasticsearch.Config struct.
66
7-
The default HTTP transport of the client is http.Transport.
7+
The default HTTP transport of the client is http.Transport; use the Transport option to customize it;
8+
see the _examples/customization.go file in this repository for information.
89
910
The package defines the "Selector" interface for getting a URL from the list. At the moment,
1011
the implementation is rather minimal: the client takes a slice of url.URL pointers,
1112
and round-robins across them when performing the request.
1213
14+
The package will automatically retry requests on network-related errors, and on specific
15+
response status codes (by default 502, 503, 504). Use the RetryOnStatus option to customize the list.
16+
The transport will not retry a timeout network error, unless enabled by setting EnableRetryOnTimeout to true.
17+
18+
Use the MaxRetries option to configure the number of retries, and set DisableRetry to true
19+
to disable the retry behaviour altogether.
20+
21+
By default, the retry will be performed without any delay; to configure a backoff interval,
22+
implement the RetryBackoff option function; see an example in the package unit tests for information.
23+
1324
The package defines the "Logger" interface for logging information about request and response.
1425
It comes with several bundled loggers for logging in text and JSON.
1526

estransport/estransport.go

+105-30
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"fmt"
1010
"io"
1111
"io/ioutil"
12+
"net"
1213
"net/http"
1314
"net/url"
1415
"regexp"
@@ -26,6 +27,9 @@ const Version = version.Client
2627
var (
2728
userAgent string
2829
reGoVersion = regexp.MustCompile(`go(\d+\.\d+\..+)`)
30+
31+
defaultMaxRetries = 3
32+
defaultRetryOnStatus = [...]int{502, 503, 504}
2933
)
3034

3135
func init() {
@@ -46,6 +50,12 @@ type Config struct {
4650
Password string
4751
APIKey string
4852

53+
RetryOnStatus []int
54+
DisableRetry bool
55+
EnableRetryOnTimeout bool
56+
MaxRetries int
57+
RetryBackoff func(attempt int) time.Duration
58+
4959
Transport http.RoundTripper
5060
Logger Logger
5161
}
@@ -58,6 +68,12 @@ type Client struct {
5868
password string
5969
apikey string
6070

71+
retryOnStatus []int
72+
disableRetry bool
73+
enableRetryOnTimeout bool
74+
maxRetries int
75+
retryBackoff func(attempt int) time.Duration
76+
6177
transport http.RoundTripper
6278
selector Selector
6379
logger Logger
@@ -72,12 +88,26 @@ func New(cfg Config) *Client {
7288
cfg.Transport = http.DefaultTransport
7389
}
7490

91+
if len(cfg.RetryOnStatus) == 0 {
92+
cfg.RetryOnStatus = defaultRetryOnStatus[:]
93+
}
94+
95+
if cfg.MaxRetries == 0 {
96+
cfg.MaxRetries = defaultMaxRetries
97+
}
98+
7599
return &Client{
76100
urls: cfg.URLs,
77101
username: cfg.Username,
78102
password: cfg.Password,
79103
apikey: cfg.APIKey,
80104

105+
retryOnStatus: cfg.RetryOnStatus,
106+
disableRetry: cfg.DisableRetry,
107+
enableRetryOnTimeout: cfg.EnableRetryOnTimeout,
108+
maxRetries: cfg.MaxRetries,
109+
retryBackoff: cfg.RetryBackoff,
110+
81111
transport: cfg.Transport,
82112
selector: NewRoundRobinSelector(cfg.URLs...),
83113
logger: cfg.Logger,
@@ -88,41 +118,86 @@ func New(cfg Config) *Client {
88118
//
89119
func (c *Client) Perform(req *http.Request) (*http.Response, error) {
90120
var (
91-
dupReqBody io.Reader
92-
)
121+
res *http.Response
122+
err error
93123

94-
// Get URL from the Selector
95-
//
96-
u, err := c.getURL()
97-
if err != nil {
98-
// TODO(karmi): Log error
99-
return nil, fmt.Errorf("cannot get URL: %s", err)
100-
}
124+
dupReqBodyForLog io.ReadCloser
125+
)
101126

102127
// Update request
103128
//
104-
c.setURL(u, req)
105-
c.setUserAgent(req)
106-
c.setAuthorization(u, req)
129+
c.setReqUserAgent(req)
130+
131+
for i := 1; i <= c.maxRetries; i++ {
132+
var (
133+
nodeURL *url.URL
134+
shouldRetry bool
135+
)
136+
137+
// Get URL from the Selector
138+
//
139+
nodeURL, err = c.getURL()
140+
if err != nil {
141+
// TODO(karmi): Log error
142+
return nil, fmt.Errorf("cannot get URL: %s", err)
143+
}
107144

108-
// Duplicate request body for logger
109-
//
110-
if c.logger != nil && c.logger.RequestBodyEnabled() {
111-
if req.Body != nil && req.Body != http.NoBody {
112-
dupReqBody, req.Body, _ = duplicateBody(req.Body)
145+
// Update request
146+
//
147+
c.setReqURL(nodeURL, req)
148+
c.setReqAuth(nodeURL, req)
149+
150+
// Duplicate request body for logger
151+
//
152+
if c.logger != nil && c.logger.RequestBodyEnabled() {
153+
if req.Body != nil && req.Body != http.NoBody {
154+
dupReqBodyForLog, req.Body, _ = duplicateBody(req.Body)
155+
}
113156
}
114-
}
115157

116-
// Set up time measures and execute the request
117-
//
118-
start := time.Now().UTC()
119-
res, err := c.transport.RoundTrip(req)
120-
dur := time.Since(start)
158+
// Set up time measures and execute the request
159+
//
160+
start := time.Now().UTC()
161+
res, err = c.transport.RoundTrip(req)
162+
dur := time.Since(start)
121163

122-
// Log request and response
123-
//
124-
if c.logger != nil {
125-
c.logRoundTrip(req, res, dupReqBody, err, start, dur)
164+
// Log request and response
165+
//
166+
if c.logger != nil {
167+
c.logRoundTrip(req, res, dupReqBodyForLog, err, start, dur)
168+
}
169+
170+
// Retry only on network errors, but don't retry on timeout errors, unless configured
171+
//
172+
if err != nil {
173+
if err, ok := err.(net.Error); ok {
174+
if (!err.Timeout() || c.enableRetryOnTimeout) && !c.disableRetry {
175+
shouldRetry = true
176+
}
177+
}
178+
}
179+
180+
// Retry on configured response statuses
181+
//
182+
if res != nil && !c.disableRetry {
183+
for _, code := range c.retryOnStatus {
184+
if res.StatusCode == code {
185+
shouldRetry = true
186+
}
187+
}
188+
}
189+
190+
// Break if retry should not be performed
191+
//
192+
if !shouldRetry {
193+
break
194+
}
195+
196+
// Delay the retry if a backoff function is configured
197+
//
198+
if c.retryBackoff != nil {
199+
time.Sleep(c.retryBackoff(i))
200+
}
126201
}
127202

128203
// TODO(karmi): Wrap error
@@ -139,7 +214,7 @@ func (c *Client) getURL() (*url.URL, error) {
139214
return c.selector.Select()
140215
}
141216

142-
func (c *Client) setURL(u *url.URL, req *http.Request) *http.Request {
217+
func (c *Client) setReqURL(u *url.URL, req *http.Request) *http.Request {
143218
req.URL.Scheme = u.Scheme
144219
req.URL.Host = u.Host
145220

@@ -154,7 +229,7 @@ func (c *Client) setURL(u *url.URL, req *http.Request) *http.Request {
154229
return req
155230
}
156231

157-
func (c *Client) setAuthorization(u *url.URL, req *http.Request) *http.Request {
232+
func (c *Client) setReqAuth(u *url.URL, req *http.Request) *http.Request {
158233
if _, ok := req.Header["Authorization"]; !ok {
159234
if u.User != nil {
160235
password, _ := u.User.Password()
@@ -180,7 +255,7 @@ func (c *Client) setAuthorization(u *url.URL, req *http.Request) *http.Request {
180255
return req
181256
}
182257

183-
func (c *Client) setUserAgent(req *http.Request) *http.Request {
258+
func (c *Client) setReqUserAgent(req *http.Request) *http.Request {
184259
req.Header.Set("User-Agent", userAgent)
185260
return req
186261
}

0 commit comments

Comments
 (0)