Skip to content

Commit c54007a

Browse files
committed
Transport: Add support for node discovery
This patch adds support for discovering nodes in the cluster automatically ("sniffing") and using their hostnames to replace the connections in the pool. It updates the metrics support to contain node metadata. Closes #101 (cherry picked from commit 4135834)
1 parent 98d0e04 commit c54007a

10 files changed

+577
-14
lines changed

elasticsearch.go

+28-1
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,9 @@ type Config struct {
4242
EnableRetryOnTimeout bool // Default: false.
4343
MaxRetries int // Default: 3.
4444

45+
DiscoverNodesOnStart bool // Discover nodes when initializing the client. Default: false.
46+
DiscoverNodesInterval time.Duration // Discover nodes periodically. Default: disabled.
47+
4548
EnableMetrics bool // Enable the metrics collection.
4649
EnableDebugLogger bool // Enable the debug logging.
4750

@@ -124,6 +127,13 @@ func NewClient(cfg Config) (*Client, error) {
124127
urls = append(urls, u)
125128
}
126129

130+
// TODO(karmi): Refactor
131+
if urls[0].User != nil {
132+
cfg.Username = urls[0].User.Username()
133+
pw, _ := urls[0].User.Password()
134+
cfg.Password = pw
135+
}
136+
127137
tp := estransport.New(estransport.Config{
128138
URLs: urls,
129139
Username: cfg.Username,
@@ -139,13 +149,21 @@ func NewClient(cfg Config) (*Client, error) {
139149
EnableMetrics: cfg.EnableMetrics,
140150
EnableDebugLogger: cfg.EnableDebugLogger,
141151

152+
DiscoverNodesInterval: cfg.DiscoverNodesInterval,
153+
142154
Transport: cfg.Transport,
143155
Logger: cfg.Logger,
144156
Selector: cfg.Selector,
145157
ConnectionPoolFunc: cfg.ConnectionPoolFunc,
146158
})
147159

148-
return &Client{Transport: tp, API: esapi.New(tp)}, nil
160+
client := &Client{Transport: tp, API: esapi.New(tp)}
161+
162+
if cfg.DiscoverNodesOnStart {
163+
go client.DiscoverNodes()
164+
}
165+
166+
return client, nil
149167
}
150168

151169
// Perform delegates to Transport to execute a request and return a response.
@@ -163,6 +181,15 @@ func (c *Client) Metrics() (estransport.Metrics, error) {
163181
return estransport.Metrics{}, errors.New("transport is missing method Metrics()")
164182
}
165183

184+
// DiscoverNodes reloads the client connections by fetching information from the cluster.
185+
//
186+
func (c *Client) DiscoverNodes() error {
187+
if dt, ok := c.Transport.(estransport.Discoverable); ok {
188+
return dt.DiscoverNodes()
189+
}
190+
return errors.New("transport is missing method DiscoverNodes()")
191+
}
192+
166193
// addrsFromEnvironment returns a list of addresses by splitting
167194
// the ELASTICSEARCH_URL environment variable with comma, or an empty list.
168195
//

estransport/connection.go

+6-1
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,11 @@ type Connection struct {
4343
IsDead bool
4444
DeadSince time.Time
4545
Failures int
46+
47+
ID string
48+
Name string
49+
Roles []string
50+
Attributes map[string]interface{}
4651
}
4752

4853
type singleConnectionPool struct {
@@ -222,8 +227,8 @@ func (cp *statusConnectionPool) resurrect(c *Connection, removeDead bool) error
222227
}
223228

224229
c.markAsLive()
225-
226230
cp.live = append(cp.live, c)
231+
227232
if removeDead {
228233
index := -1
229234
for i, conn := range cp.dead {

estransport/discovery.go

+199
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
// Licensed to Elasticsearch B.V. under one or more agreements.
2+
// Elasticsearch B.V. licenses this file to you under the Apache 2.0 License.
3+
// See the LICENSE file in the project root for more information.
4+
5+
package estransport
6+
7+
import (
8+
"encoding/json"
9+
"fmt"
10+
"io/ioutil"
11+
"net/http"
12+
"net/url"
13+
"sort"
14+
"strings"
15+
"sync"
16+
"time"
17+
)
18+
19+
// Discoverable defines the interface for transports supporting node discovery.
20+
//
21+
type Discoverable interface {
22+
DiscoverNodes() error
23+
}
24+
25+
// nodeInfo represents the information about node in a cluster.
26+
//
27+
// See: https://www.elastic.co/guide/en/elasticsearch/reference/current/cluster-nodes-info.html
28+
//
29+
type nodeInfo struct {
30+
ID string
31+
Name string
32+
URL *url.URL
33+
Roles []string
34+
Attributes map[string]interface{}
35+
HTTP struct {
36+
PublishAddress string `json:"publish_address"`
37+
}
38+
}
39+
40+
// DiscoverNodes reloads the client connections by fetching information from the cluster.
41+
//
42+
func (c *Client) DiscoverNodes() error {
43+
var conns []*Connection
44+
45+
nodes, err := c.getNodesInfo()
46+
if err != nil {
47+
if debugLogger != nil {
48+
debugLogger.Logf("Error getting nodes info: %s\n", err)
49+
}
50+
return fmt.Errorf("discovery: get nodes: %s", err)
51+
}
52+
53+
for _, node := range nodes {
54+
var (
55+
isDataNode bool
56+
isIngestNode bool
57+
)
58+
59+
roles := append(node.Roles[:0:0], node.Roles...)
60+
sort.Strings(roles)
61+
62+
if i := sort.SearchStrings(roles, "data"); i < len(roles) && roles[i] == "data" {
63+
isDataNode = true
64+
}
65+
if i := sort.SearchStrings(roles, "ingest"); i < len(roles) && roles[i] == "ingest" {
66+
isIngestNode = true
67+
}
68+
69+
if debugLogger != nil {
70+
var skip string
71+
if !isDataNode || !isIngestNode {
72+
skip = "; [SKIP]"
73+
}
74+
debugLogger.Logf("Discovered node [%s]; %s; roles=%s%s\n", node.Name, node.URL, node.Roles, skip)
75+
}
76+
77+
// Skip master only nodes
78+
// TODO(karmi): Move logic to Selector?
79+
if !isDataNode || !isIngestNode {
80+
continue
81+
}
82+
83+
conns = append(conns, &Connection{
84+
URL: node.URL,
85+
ID: node.ID,
86+
Name: node.Name,
87+
Roles: node.Roles,
88+
Attributes: node.Attributes,
89+
})
90+
}
91+
92+
c.Lock()
93+
defer c.Unlock()
94+
95+
if lockable, ok := c.pool.(sync.Locker); ok {
96+
lockable.Lock()
97+
defer lockable.Unlock()
98+
}
99+
100+
if c.poolFunc != nil {
101+
c.pool = c.poolFunc(conns, c.selector)
102+
} else {
103+
// TODO(karmi): Replace only live connections, leave dead scheduled for resurrect?
104+
c.pool, err = NewConnectionPool(conns, c.selector)
105+
if err != nil {
106+
return err
107+
}
108+
}
109+
110+
return nil
111+
}
112+
113+
func (c *Client) getNodesInfo() ([]nodeInfo, error) {
114+
var (
115+
out []nodeInfo
116+
scheme = c.urls[0].Scheme
117+
)
118+
119+
req, err := http.NewRequest("GET", "/_nodes/http", nil)
120+
if err != nil {
121+
return out, err
122+
}
123+
124+
c.Lock()
125+
conn, err := c.pool.Next()
126+
c.Unlock()
127+
// TODO(karmi): If no connection is returned, fallback to original URLs
128+
if err != nil {
129+
return out, err
130+
}
131+
132+
c.setReqURL(conn.URL, req)
133+
c.setReqAuth(conn.URL, req)
134+
c.setReqUserAgent(req)
135+
136+
res, err := c.transport.RoundTrip(req)
137+
if err != nil {
138+
return out, err
139+
}
140+
defer res.Body.Close()
141+
142+
if res.StatusCode > 200 {
143+
body, _ := ioutil.ReadAll(res.Body)
144+
return out, fmt.Errorf("server error: %s: %s", res.Status, body)
145+
}
146+
147+
var env map[string]json.RawMessage
148+
if err := json.NewDecoder(res.Body).Decode(&env); err != nil {
149+
return out, err
150+
}
151+
152+
var nodes map[string]nodeInfo
153+
if err := json.Unmarshal(env["nodes"], &nodes); err != nil {
154+
return out, err
155+
}
156+
157+
for id, node := range nodes {
158+
node.ID = id
159+
u, err := c.getNodeURL(node, scheme)
160+
if err != nil {
161+
return out, err
162+
}
163+
node.URL = u
164+
out = append(out, node)
165+
}
166+
167+
return out, nil
168+
}
169+
170+
func (c *Client) getNodeURL(node nodeInfo, scheme string) (*url.URL, error) {
171+
var (
172+
host string
173+
port string
174+
175+
addrs = strings.Split(node.HTTP.PublishAddress, "/")
176+
ports = strings.Split(node.HTTP.PublishAddress, ":")
177+
)
178+
179+
if len(addrs) > 1 {
180+
host = addrs[0]
181+
} else {
182+
host = strings.Split(addrs[0], ":")[0]
183+
}
184+
port = ports[len(ports)-1]
185+
186+
u := &url.URL{
187+
Scheme: scheme,
188+
Host: host + ":" + port,
189+
}
190+
191+
return u, nil
192+
}
193+
194+
func (c *Client) scheduleDiscoverNodes(d time.Duration) {
195+
go c.DiscoverNodes()
196+
time.AfterFunc(c.discoverNodesInterval, func() {
197+
c.scheduleDiscoverNodes(c.discoverNodesInterval)
198+
})
199+
}

0 commit comments

Comments
 (0)