Skip to content

Commit 9c30d57

Browse files
committed
Transport: Add connection pool
This patch adds support for the "connection pool" feature, standard in official Elasticsearch clients, which maintains state for node connections passed in configuration, and removes connections with failures from the list of live connections. It periodically adds them back to the list, with an exponentional backoff delay. The main parts of the implementation are contained in the connection.go file. The implementation short-circuits the connection pooling when only a single URL has been passed in configuration — common when using Elastic Cloud or a proxy/load balancer in front of Elasticsearch. When multiple URLs are passed, they are added to the list of live connections, and returned in a round-robin fashion from the Next() method. When the connection fails to respond to a request, it is removed from the list of live connections, put into the list of dead connections, and scheduled for a "resurrect" attempt. In case no live connection is available, a dead connection is forcibly resurrected. A sync.Mutex is used for synchronization. For easier debugging, the implementation allows to use the EnableDebugLogger option to print debugging statements related to connection management to STDOUT. This patch also adds support for collecting and getting client metrics: number of requests and failures, statistics for specific response codes, and the contents of the live/dead connection lists. An example for usage with the expvar package is provided in _examples/instrumentation/expvar.go. Integration with APM and other instrumentation tools will be researched and documented in future. Closes #95 (cherry picked from commit 365fc2458d4135cdd9613cb6bcb772d41e81522f)
1 parent 8efb73c commit 9c30d57

15 files changed

+1660
-170
lines changed

_examples/instrumentation/Makefile

+1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
test:
2+
go build -o /dev/null expvar.go
23
go build -o /dev/null opencensus.go
34
go build -o /dev/null apmelasticsearch.go
45

_examples/instrumentation/expvar.go

+132
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
// Licensed to Elasticsearch B.V. under one or more agreements.
2+
// Elasticsearch B.V. licenses this file to you under the Apache 2.0 License.
3+
// See the LICENSE file in the project root for more information.
4+
5+
package main
6+
7+
import (
8+
"expvar"
9+
"fmt"
10+
"io"
11+
"log"
12+
"os"
13+
"os/signal"
14+
"runtime"
15+
"strings"
16+
"syscall"
17+
"time"
18+
19+
// Import the "expvar" and "pprof" package >>>>>>>>>>
20+
21+
"net/http"
22+
_ "net/http/pprof"
23+
// <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<
24+
25+
"golang.org/x/crypto/ssh/terminal"
26+
27+
"github.com/elastic/go-elasticsearch/v8"
28+
"github.com/elastic/go-elasticsearch/v8/estransport"
29+
)
30+
31+
var (
32+
tWidth, _, _ = terminal.GetSize(int(os.Stdout.Fd()))
33+
)
34+
35+
func init() {
36+
runtime.SetMutexProfileFraction(10)
37+
}
38+
39+
func main() {
40+
log.SetFlags(0)
41+
42+
ticker := time.NewTicker(time.Second)
43+
defer ticker.Stop()
44+
45+
aborted := make(chan os.Signal)
46+
signal.Notify(aborted, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
47+
48+
go func() {
49+
<-aborted
50+
51+
log.Println("\nDone!\n")
52+
os.Exit(0)
53+
}()
54+
55+
log.Println(strings.Repeat("─", tWidth))
56+
log.Println("Open <http://localhost:6060/debug/vars> to see all exported variables.")
57+
log.Println(strings.Repeat("─", tWidth))
58+
59+
// Start the debug server >>>>>>>>>>>>>>>>>>>>>>>>>>>
60+
go func() { log.Fatalln(http.ListenAndServe("localhost:6060", nil)) }()
61+
// <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<
62+
63+
for i := 1; i <= 2; i++ {
64+
go func(i int) {
65+
log.Printf("==> Starting server on <localhost:1000%d>", i)
66+
if err := http.ListenAndServe(
67+
fmt.Sprintf("localhost:1000%d", i),
68+
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { io.WriteString(w, "OK\n") }),
69+
); err != nil && err != http.ErrServerClosed {
70+
log.Fatalf("Unable to start server: %s", err)
71+
}
72+
}(i)
73+
}
74+
75+
es, err := elasticsearch.NewClient(
76+
elasticsearch.Config{
77+
Addresses: []string{
78+
"http://localhost:10001",
79+
"http://localhost:10002",
80+
"http://localhost:10003",
81+
},
82+
83+
Logger: &estransport.ColorLogger{Output: os.Stdout},
84+
DisableRetry: true,
85+
EnableDebugLogger: true,
86+
87+
// Enable metric collection >>>>>>>>>>>>>>>>>>>>>>>>>
88+
EnableMetrics: true,
89+
// <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<
90+
})
91+
if err != nil {
92+
log.Fatal("ERROR: %s", err)
93+
}
94+
95+
// Publish client metrics to expvar >>>>>>>>>>>>>>>>>
96+
expvar.Publish("go-elasticsearch", expvar.Func(func() interface{} { m, _ := es.Metrics(); return m }))
97+
// <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<
98+
99+
for {
100+
select {
101+
case t := <-ticker.C:
102+
103+
go func() {
104+
if res, _ := es.Info(); res != nil {
105+
res.Body.Close()
106+
}
107+
}()
108+
109+
go func() {
110+
if res, _ := es.Cluster.Health(); res != nil {
111+
res.Body.Close()
112+
}
113+
}()
114+
115+
if t.Second()%5 == 0 {
116+
m, err := es.Metrics()
117+
if err != nil {
118+
log.Printf("\x1b[31;1mUnable to get metrics: %s\x1b[0m", err)
119+
continue
120+
}
121+
log.Println("███", fmt.Sprintf("\x1b[1m%s\x1b[0m", "Metrics"), strings.Repeat("█", tWidth-12))
122+
log.Printf(
123+
""+
124+
" \x1b[2mRequests: \x1b[0m %d\n"+
125+
" \x1b[2mFailures: \x1b[0m %d\n"+
126+
" \x1b[2mLive nodes:\x1b[0m %s",
127+
m.Requests, m.Failures, m.Live)
128+
log.Println(strings.Repeat("─", tWidth))
129+
}
130+
}
131+
}
132+
}

elasticsearch.go

+15
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,9 @@ type Config struct {
4242
EnableRetryOnTimeout bool // Default: false.
4343
MaxRetries int // Default: 3.
4444

45+
EnableMetrics bool // Enable the metrics collection.
46+
EnableDebugLogger bool // Enable the debug logging.
47+
4548
RetryBackoff func(attempt int) time.Duration // Optional backoff duration. Default: nil.
4649

4750
Transport http.RoundTripper // The HTTP transport object.
@@ -130,6 +133,9 @@ func NewClient(cfg Config) (*Client, error) {
130133
MaxRetries: cfg.MaxRetries,
131134
RetryBackoff: cfg.RetryBackoff,
132135

136+
EnableMetrics: cfg.EnableMetrics,
137+
EnableDebugLogger: cfg.EnableDebugLogger,
138+
133139
Transport: cfg.Transport,
134140
Logger: cfg.Logger,
135141
})
@@ -143,6 +149,15 @@ func (c *Client) Perform(req *http.Request) (*http.Response, error) {
143149
return c.Transport.Perform(req)
144150
}
145151

152+
// Metrics returns the client metrics.
153+
//
154+
func (c *Client) Metrics() (estransport.Metrics, error) {
155+
if mt, ok := c.Transport.(estransport.Measurable); ok {
156+
return mt.Metrics()
157+
}
158+
return estransport.Metrics{}, errors.New("transport is missing method Metrics()")
159+
}
160+
146161
// addrsFromEnvironment returns a list of addresses by splitting
147162
// the ELASTICSEARCH_URL environment variable with comma, or an empty list.
148163
//

elasticsearch_internal_test.go

+13
Original file line numberDiff line numberDiff line change
@@ -279,3 +279,16 @@ func TestVersion(t *testing.T) {
279279
t.Error("Version is empty")
280280
}
281281
}
282+
283+
func TestClientMetrics(t *testing.T) {
284+
c, _ := NewClient(Config{EnableMetrics: true})
285+
286+
m, err := c.Metrics()
287+
if err != nil {
288+
t.Fatalf("Unexpected error: %v", err)
289+
}
290+
291+
if m.Requests != 0 {
292+
t.Errorf("Unexpected output: %s", m)
293+
}
294+
}

0 commit comments

Comments
 (0)